Coverage for /usr/local/lib/python3.12/site-packages/prefect/infrastructure/provisioners/container_instance.py: 0%

302 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2This module defines the ContainerInstancePushProvisioner class, which is responsible for provisioning 

3infrastructure using Azure Container Instances for Prefect work pools. 

4 

5The ContainerInstancePushProvisioner class provides methods for provisioning infrastructure and 

6interacting with Azure Container Instances. 

7 

8Classes: 

9 AzureCLI: A class to handle Azure CLI commands. 

10 ContainerInstancePushProvisioner: A class for provisioning infrastructure using Azure Container Instances. 

11 

12""" 

13 

14from __future__ import annotations 

15 

16import json 

17import random 

18import shlex 

19import string 

20import subprocess 

21import time 

22from copy import deepcopy 

23from textwrap import dedent 

24from typing import TYPE_CHECKING, Any, Dict, Optional 

25from uuid import UUID 

26 

27from anyio import run_process 

28from rich.console import Console 

29from rich.panel import Panel 

30from rich.progress import Progress, SpinnerColumn, TextColumn 

31from rich.prompt import Confirm 

32from rich.syntax import Syntax 

33 

34from prefect.client.schemas.actions import BlockDocumentCreate 

35from prefect.client.utilities import inject_client 

36from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 

37from prefect.settings import ( 

38 PREFECT_DEFAULT_DOCKER_BUILD_NAMESPACE, 

39 update_current_profile, 

40) 

41 

42if TYPE_CHECKING: 

43 from prefect.client.orchestration import PrefectClient 

44 

45 

46class AzureCLI: 

47 """ 

48 A class for executing Azure CLI commands and handling their output. 

49 

50 Args: 

51 console (Console): A Rich console object for displaying messages. 

52 

53 Methods: 

54 run_command(command, success_message=None, failure_message=None, ignore_if_exists=False, return_json=False) 

55 Execute an Azure CLI command. 

56 """ 

57 

58 def __init__(self, console: Console): 

59 self._console = console 

60 

61 async def run_command( 

62 self, 

63 command: str, 

64 success_message: Optional[str] = None, 

65 failure_message: Optional[str] = None, 

66 ignore_if_exists: bool = False, 

67 return_json: bool = False, 

68 ) -> str | dict[str, Any] | None: 

69 """ 

70 Runs an Azure CLI command and processes the output. 

71 

72 Args: 

73 command (str): The Azure CLI command to execute. 

74 success_message (str, optional): Message to print on success. 

75 failure_message (str, optional): Message to print on failure. 

76 ignore_if_exists (bool): Whether to ignore errors if a resource already exists. 

77 return_json (bool): Whether to return the output as JSON. 

78 

79 Returns: 

80 tuple: A tuple with two elements: 

81 - str: Status, either 'created', 'exists', or 'error'. 

82 - str or dict or None: The command output or None if an error occurs (depends on return_json). 

83 

84 Raises: 

85 subprocess.CalledProcessError: If the command execution fails. 

86 json.JSONDecodeError: If output cannot be decoded as JSON when return_json is True. 

87 """ 

88 try: 

89 result = await run_process(shlex.split(command), check=False) 

90 output = result.stdout.decode("utf-8").strip() 

91 

92 if result.returncode != 0: 

93 error_message = result.stderr.decode("utf-8") 

94 if ignore_if_exists and "already exists" in error_message: 

95 if success_message: 

96 self._console.print( 

97 f"{success_message} (already exists)", style="yellow" 

98 ) 

99 return ("exists", None) 

100 else: 

101 if failure_message: 

102 self._console.print( 

103 f"{failure_message}: {result.stderr.decode('utf-8')}", 

104 style="red", 

105 ) 

106 raise subprocess.CalledProcessError( 

107 result.returncode, 

108 command, 

109 output=result.stdout, 

110 stderr=result.stderr, 

111 ) 

112 if success_message: 

113 self._console.print(success_message, style="green") 

114 

115 if return_json: 

116 try: 

117 return json.loads(output) 

118 except json.JSONDecodeError as e: 

119 self._console.print(f"Failed to decode JSON: {e}", style="red") 

120 raise e 

121 

122 return output 

123 

124 except subprocess.CalledProcessError as e: 

125 self._console.print(f"Command execution failed: {e}", style="red") 

126 return 

127 

128 

129class ContainerInstancePushProvisioner: 

130 """ 

131 A class responsible for provisioning Azure resources and setting up a push work pool. 

132 

133 Attributes: 

134 _console (Console): A Rich console object for displaying messages and progress. 

135 _subscription_id (str): Azure subscription ID. 

136 _subscription_name (str): Azure subscription name. 

137 _resource_group (str): Azure resource group name. 

138 _location (str): Azure resource location. 

139 azure_cli (AzureCLI): An instance of AzureCLI for running Azure commands. 

140 

141 Methods: 

142 set_location: Sets the location for Azure resource deployment. 

143 _verify_az_ready: Verifies if Azure CLI is ready and available. 

144 _select_subscription: Selects an Azure subscription interactively or automatically. 

145 _create_resource_group: Creates a resource group in Azure. 

146 _create_app_registration: Creates an app registration in Azure AD. 

147 _generate_secret_for_app: Generates a secret for the app registration. 

148 _get_service_principal_object_id: Retrieves the object ID of the service principal associated with the app registration. 

149 _assign_contributor_role: Assigns the Contributor role to the service account. 

150 _create_aci_credentials_block: Creates an Azure Container Instance credentials block. 

151 provision: Orchestrates the provisioning of Azure resources and setup for the push work pool. 

152 """ 

153 

154 def __init__(self): 

155 self._console = Console() 

156 self._subscription_id = None 

157 self._subscription_name = None 

158 self._location = "eastus" 

159 self._identity_name = "prefect-acr-identity" 

160 self.azure_cli: AzureCLI = AzureCLI(self.console) 

161 self._credentials_block_name = None 

162 self._resource_group_name = "prefect-aci-push-pool-rg" 

163 self._app_registration_name = "prefect-aci-push-pool-app" 

164 self._registry_name_prefix = "prefect" 

165 

166 @property 

167 def console(self) -> Console: 

168 return self._console 

169 

170 @console.setter 

171 def console(self, value: Console) -> None: 

172 self._console = value 

173 

174 async def set_location(self) -> None: 

175 """ 

176 Set the Azure resource deployment location to the default or 'eastus' on failure. 

177 

178 Raises: 

179 RuntimeError: If unable to execute the Azure CLI command. 

180 """ 

181 try: 

182 command = ( 

183 'az account list-locations --query "[?isDefault].name" --output tsv' 

184 ) 

185 output = await self.azure_cli.run_command(command) 

186 if output: 

187 self._location = output 

188 except subprocess.CalledProcessError as e: 

189 raise RuntimeError("Failed to get default location.") from e 

190 

191 async def _verify_az_ready(self) -> None: 

192 """ 

193 Verifies if Azure CLI is installed and ready to use. 

194 

195 Raises: 

196 RuntimeError: If Azure CLI is not installed. 

197 subprocess.CalledProcessError: If the Azure CLI command execution fails. 

198 """ 

199 try: 

200 await self.azure_cli.run_command("az --version", ignore_if_exists=True) 

201 

202 except subprocess.CalledProcessError as e: 

203 raise RuntimeError( 

204 "Azure CLI is not installed. Please see" 

205 " https://docs.microsoft.com/en-us/cli/azure/install-azure-cli" 

206 ) from e 

207 

208 accounts = await self.azure_cli.run_command( 

209 "az account list --output json", 

210 return_json=True, 

211 ) 

212 if not accounts: 

213 raise RuntimeError( 

214 "No Azure accounts found. Please run `az login` to log in to Azure." 

215 ) 

216 

217 async def _select_subscription(self) -> str: 

218 """ 

219 Selects an Azure subscription for use. If running in interactive mode, 

220 the user will be prompted to select a subscription. Otherwise, the current subscription is used. 

221 

222 Returns: 

223 str: The ID of the selected Azure subscription. 

224 

225 Raises: 

226 RuntimeError: If no Azure subscriptions are found or the Azure CLI command execution fails. 

227 """ 

228 from prefect.cli._prompts import prompt_select_from_table 

229 

230 if self._console.is_interactive: 

231 with Progress( 

232 SpinnerColumn(), 

233 TextColumn("Fetching Azure subscriptions..."), 

234 transient=True, 

235 console=self._console, 

236 ) as progress: 

237 list_projects_task = progress.add_task( 

238 "Fetching subscriptions...", total=1 

239 ) 

240 subscriptions_list = await self.azure_cli.run_command( 

241 "az account list --output json", 

242 failure_message=( 

243 "No Azure subscriptions found. Please create an Azure subscription" 

244 " and try again." 

245 ), 

246 ignore_if_exists=True, 

247 return_json=True, 

248 ) 

249 progress.update(list_projects_task, completed=1) 

250 if subscriptions_list: 

251 selected_subscription = prompt_select_from_table( 

252 self._console, 

253 "Please select which Azure subscription to use:", 

254 [ 

255 {"header": "Name", "key": "name"}, 

256 {"header": "Subscription ID", "key": "id"}, 

257 ], 

258 subscriptions_list, 

259 ) 

260 self._subscription_id = selected_subscription["id"] 

261 self._subscription_name = selected_subscription["name"] 

262 

263 else: 

264 subscriptions_list = await self.azure_cli.run_command( 

265 "az account list --output json", 

266 failure_message=( 

267 "No Azure subscriptions found. Please create an Azure subscription" 

268 " and try again." 

269 ), 

270 ignore_if_exists=True, 

271 return_json=True, 

272 ) 

273 if subscriptions_list: 

274 self._subscription_id = subscriptions_list[0]["id"] 

275 self._subscription_name = subscriptions_list[0]["name"] 

276 else: 

277 raise RuntimeError( 

278 "No Azure subscriptions found. Please create an Azure subscription" 

279 " and try again." 

280 ) 

281 

282 async def _create_resource_group(self): 

283 """ 

284 Creates a resource group in Azure using predefined names and locations. 

285 

286 Raises: 

287 subprocess.CalledProcessError: If the Azure CLI command execution fails. 

288 """ 

289 check_exists_command = ( 

290 f"az group exists --name {self._resource_group_name} --subscription" 

291 f" {self._subscription_id}" 

292 ) 

293 exists_result = await self.azure_cli.run_command( 

294 check_exists_command, return_json=True 

295 ) 

296 if exists_result is True: 

297 self._console.print( 

298 ( 

299 f"Resource group '{self._resource_group_name}' already exists in" 

300 f" subscription {self._subscription_name}." 

301 ), 

302 style="yellow", 

303 ) 

304 return 

305 

306 resource_group_command = ( 

307 f"az group create --name '{self._resource_group_name}' --location" 

308 f" '{self._location}' --subscription '{self._subscription_id}'" 

309 ) 

310 await self.azure_cli.run_command( 

311 resource_group_command, 

312 success_message=( 

313 f"Resource group '{self._resource_group_name}' created successfully" 

314 ), 

315 failure_message=( 

316 f"Failed to create resource group '{self._resource_group_name}' in" 

317 f" subscription '{self._subscription_name}'" 

318 ), 

319 ignore_if_exists=True, 

320 ) 

321 

322 async def _create_app_registration(self) -> str: 

323 """ 

324 Creates an app registration in Azure Active Directory. 

325 

326 Returns: 

327 str: The client ID of the newly created app registration. 

328 

329 Raises: 

330 subprocess.CalledProcessError: If the Azure CLI command execution fails. 

331 """ 

332 # Check if the app registration already exists 

333 check_exists_command = ( 

334 f"az ad app list --display-name {self._app_registration_name} --output json" 

335 ) 

336 app_registrations = await self.azure_cli.run_command( 

337 check_exists_command, 

338 ) 

339 if app_registrations: 

340 if isinstance(app_registrations, str): 

341 app_registrations = json.loads(app_registrations) 

342 

343 existing_app_registration = next( 

344 ( 

345 app 

346 for app in app_registrations 

347 if app["displayName"] == self._app_registration_name 

348 ), 

349 None, 

350 ) 

351 if existing_app_registration: 

352 self._console.print( 

353 f"App registration '{self._app_registration_name}' already exists.", 

354 style="yellow", 

355 ) 

356 return existing_app_registration["appId"] 

357 

358 app_registration_command = ( 

359 f"az ad app create --display-name {self._app_registration_name} " 

360 "--output json" 

361 ) 

362 app_registration = await self.azure_cli.run_command( 

363 app_registration_command, 

364 success_message=( 

365 f"App registration '{self._app_registration_name}' created successfully" 

366 ), 

367 failure_message=( 

368 "Failed to create app registration with name" 

369 f" '{self._app_registration_name}'" 

370 ), 

371 ignore_if_exists=True, 

372 ) 

373 if app_registration: 

374 if isinstance(app_registration, str): 

375 app_registration = json.loads(app_registration) 

376 return app_registration["appId"] 

377 

378 else: 

379 raise RuntimeError("Failed to create app registration.") 

380 

381 async def _generate_secret_for_app(self, app_id: str) -> tuple: 

382 """ 

383 Generates a secret for the app registration. 

384 

385 Args: 

386 app_id (str): The client ID of the app registration for which to generate the secret. 

387 

388 Returns: 

389 tuple: A tuple containing the tenant ID and the generated secret. 

390 

391 Raises: 

392 subprocess.CalledProcessError: If the Azure CLI command execution fails. 

393 """ 

394 secret_command = ( 

395 f"az ad app credential reset --id {app_id} --append --output json" 

396 ) 

397 app_secret = await self.azure_cli.run_command( 

398 secret_command, 

399 success_message=( 

400 f"Secret generated for app registration with client ID '{app_id}'" 

401 ), 

402 failure_message=( 

403 "Failed to generate secret for app registration with client ID" 

404 f" '{app_id}'. If you have already generated 2 secrets for this app" 

405 " registration, please delete one from the" 

406 f" `{self._app_registration_name}` resource and try again." 

407 ), 

408 ignore_if_exists=True, 

409 return_json=True, 

410 ) 

411 

412 try: 

413 return app_secret["tenant"], app_secret["password"] 

414 except Exception as e: 

415 raise RuntimeError( 

416 "Failed to generate a new secret for the app registration." 

417 ) from e 

418 

419 async def _get_or_create_service_principal_object_id(self, app_id: str): 

420 """ 

421 Retrieves or creates a service principal for the given app registration client ID. 

422 

423 Args: 

424 app_id (str): The client ID of the app registration. 

425 

426 Returns: 

427 str: The object ID of the service principal. 

428 """ 

429 # Try to retrieve the existing service principal 

430 command_get_sp = ( 

431 f"az ad sp list --all --query \"[?appId=='{app_id}']\" --output json" 

432 ) 

433 service_principal = await self.azure_cli.run_command( 

434 command_get_sp, 

435 return_json=True, 

436 ) 

437 

438 if service_principal: 

439 return service_principal[0] 

440 

441 # Service principal does not exist, create it 

442 command_create_sp = f"az ad sp create --id {app_id}" 

443 await self.azure_cli.run_command( 

444 command_create_sp, 

445 success_message=f"Service principal created for app ID '{app_id}'", 

446 failure_message=f"Failed to create service principal for app ID '{app_id}'", 

447 ) 

448 

449 # Retrieve the object ID of the newly created service principal 

450 new_service_principal = await self.azure_cli.run_command( 

451 command_get_sp, 

452 failure_message=( 

453 f"Failed to retrieve new service principal for app ID {app_id}" 

454 ), 

455 return_json=True, 

456 ) 

457 

458 if new_service_principal: 

459 return new_service_principal[0] 

460 else: 

461 raise Exception( 

462 f"Failed to retrieve new service principal for app ID {app_id}" 

463 ) 

464 

465 async def _get_or_create_identity( 

466 self, identity_name: str, resource_group_name: str, subscription_id: str 

467 ): 

468 """ 

469 Retrieves or creates a managed identity for the given resource group. 

470 

471 Returns: 

472 dict: Object representing the identity. 

473 """ 

474 # Try to retrieve the existing identity 

475 command_get_identity = ( 

476 f"az identity list --query \"[?name=='{identity_name}']\" --resource-group" 

477 f" {resource_group_name} --subscription {subscription_id} --output json" 

478 ) 

479 identity = await self.azure_cli.run_command( 

480 command_get_identity, 

481 return_json=True, 

482 ) 

483 

484 if identity: 

485 self._console.print( 

486 ( 

487 f"Identity '{self._identity_name}' already exists in" 

488 f" subscription '{self._subscription_name}'." 

489 ), 

490 style="yellow", 

491 ) 

492 return identity[0] 

493 

494 # Identity does not exist, create it 

495 command_create_identity = ( 

496 f"az identity create --name {identity_name} --resource-group" 

497 f" {resource_group_name} --subscription {subscription_id} --output json" 

498 ) 

499 response = await self.azure_cli.run_command( 

500 command_create_identity, 

501 success_message=f"Identity {identity_name!r} created", 

502 failure_message=f"Failed to create identity {identity_name!r}", 

503 return_json=True, 

504 ) 

505 

506 if response: 

507 return response 

508 else: 

509 raise Exception( 

510 f"Failed to retrieve new identity for identity {self._identity_name}" 

511 ) 

512 

513 @staticmethod 

514 def _generate_acr_name(base_name: str): 

515 # Ensure the base name adheres to ACR naming conventions 

516 if not base_name.isalnum() or len(base_name) > 50: 

517 raise ValueError( 

518 "ACR registry name prefix should be alphanumeric and up to 50" 

519 " characters" 

520 ) 

521 

522 # Generate a unique string 

523 timestamp = int(time.time()) 

524 random_str = "".join( 

525 random.choices(string.ascii_lowercase + string.digits, k=4) 

526 ) 

527 

528 # Combine to form the ACR name 

529 acr_name = f"{base_name}{timestamp}{random_str}" 

530 return acr_name 

531 

532 async def _get_or_create_registry( 

533 self, 

534 registry_name: str, 

535 resource_group_name: str, 

536 location: str, 

537 subscription_id: str, 

538 ): 

539 """ 

540 Retrieves or creates an Azure Container Registry. 

541 

542 Args: 

543 registry_name: The name of the registry. 

544 resource_group_name: The name of the resource group to use for the registry. 

545 location: Where to create the registry. 

546 subscription_id: The ID of the subscription to use for the registry. 

547 

548 Returns: 

549 dict: Object representing the registry. 

550 """ 

551 # check to see if there are any registries starting with 'prefect' 

552 command_get_registries = ( 

553 'az acr list --query "[?starts_with(name,' 

554 f" '{self._registry_name_prefix}')]\" --subscription" 

555 f" {subscription_id} --output json" 

556 ) 

557 response = await self.azure_cli.run_command( 

558 command_get_registries, 

559 return_json=True, 

560 ) 

561 

562 # acr names must be globally unique, so if there are any matches, use the first one 

563 if response: 

564 self._console.print( 

565 ( 

566 f"Registry with prefix {self._registry_name_prefix!r} already" 

567 f" exists in subscription '{subscription_id}'." 

568 ), 

569 style="yellow", 

570 ) 

571 return response[0] 

572 

573 command_create_repository = ( 

574 f"az acr create --name {registry_name} --resource-group" 

575 f" {resource_group_name} --subscription {subscription_id} --location" 

576 f" {location} --sku Basic" 

577 ) 

578 response = await self.azure_cli.run_command( 

579 command_create_repository, 

580 success_message="Registry created", 

581 failure_message="Failed to create registry", 

582 return_json=True, 

583 ) 

584 

585 if response: 

586 return response 

587 else: 

588 raise Exception(f"Failed to create registry {registry_name}") 

589 

590 async def _log_into_registry(self, login_server: str, subscription_id: str): 

591 """ 

592 Logs into the given Azure Container Registry. 

593 

594 Args: 

595 registry_name: The name of the registry to log into. 

596 

597 Raises: 

598 subprocess.CalledProcessError: If the Azure CLI command execution fails. 

599 """ 

600 command_login = ( 

601 f"az acr login --name {login_server} --subscription {subscription_id}" 

602 ) 

603 await self.azure_cli.run_command( 

604 command_login, 

605 success_message=f"Logged into registry {login_server}", 

606 failure_message=f"Failed to log into registry {login_server}", 

607 ) 

608 

609 async def _assign_contributor_role(self, app_id: str, subscription_id: str) -> None: 

610 """ 

611 Assigns the 'Contributor' role to the service principal associated with a given app ID. 

612 

613 Args: 

614 app_id (str): The client ID of the app registration. 

615 """ 

616 service_principal = await self._get_or_create_service_principal_object_id( 

617 app_id 

618 ) 

619 

620 service_principal_id = service_principal["id"] 

621 

622 if service_principal_id: 

623 role = "Contributor" 

624 scope = f"/subscriptions/{self._subscription_id}/resourceGroups/{self._resource_group_name}" 

625 

626 # Check if the role is already assigned 

627 check_role_command = ( 

628 f"az role assignment list --assignee {service_principal_id} --role" 

629 f" {role} --scope {scope} --subscription {subscription_id} --output" 

630 " json" 

631 ) 

632 role_assignments = await self.azure_cli.run_command( 

633 check_role_command, return_json=True 

634 ) 

635 if role_assignments and any( 

636 ra 

637 for ra in role_assignments 

638 if ra["roleDefinitionName"] == role and ra["scope"] == scope 

639 ): 

640 self._console.print( 

641 ( 

642 f"Service principal with object ID '{service_principal_id}'" 

643 f" already has the '{role}' role assigned in '{scope}'." 

644 ), 

645 style="yellow", 

646 ) 

647 return 

648 

649 assign_command = ( 

650 f"az role assignment create --role {role} --assignee-object-id" 

651 f" {service_principal_id} --scope {scope} --subscription" 

652 f" {subscription_id}" 

653 ) 

654 await self.azure_cli.run_command( 

655 assign_command, 

656 success_message=( 

657 "Contributor role assigned to service principal with object ID" 

658 f" '{service_principal_id}'" 

659 ), 

660 failure_message=( 

661 "Failed to assign Contributor role to service principal with" 

662 f" object ID '{service_principal_id}'" 

663 ), 

664 ignore_if_exists=True, 

665 ) 

666 

667 async def _assign_acr_pull_role( 

668 self, identity: Dict[str, Any], registry: Dict[str, Any], subscription_id: str 

669 ) -> None: 

670 """ 

671 Assigns the AcrPull role to the specified identity for the given registry. 

672 

673 Args: 

674 identity: The identity to assign the role to. 

675 registry: The registry to grant access to. 

676 """ 

677 command = ( 

678 "az role assignment create --assignee-object-id" 

679 f" {identity['principalId']} --assignee-principal-type ServicePrincipal" 

680 f" --scope {registry['id']} --role AcrPull --subscription {subscription_id}" 

681 ) 

682 await self.azure_cli.run_command( 

683 command, 

684 ignore_if_exists=True, 

685 ) 

686 

687 async def _create_aci_credentials_block( 

688 self, 

689 work_pool_name: str, 

690 client_id: str, 

691 tenant_id: str, 

692 client_secret: str, 

693 client: "PrefectClient", 

694 ) -> UUID: 

695 """ 

696 Creates a credentials block for Azure Container Instance. 

697 

698 Args: 

699 work_pool_name (str): The name of the work pool. 

700 client_id (str): The client ID obtained from app registration. 

701 tenant_id (str): The tenant ID obtained from the secret generation. 

702 client_secret (str): The client secret obtained from the secret generation. 

703 client (PrefectClient): An instance of PrefectClient. 

704 

705 Returns: 

706 UUID: The ID of the created credentials block. 

707 

708 Raises: 

709 ObjectAlreadyExists: If a credentials block with the same name already exists. 

710 """ 

711 credentials_block_name = self._credentials_block_name 

712 credentials_block_type = await client.read_block_type_by_slug( 

713 "azure-container-instance-credentials" 

714 ) 

715 

716 credentials_block_schema = ( 

717 await client.get_most_recent_block_schema_for_block_type( 

718 block_type_id=credentials_block_type.id 

719 ) 

720 ) 

721 

722 try: 

723 block_doc = await client.create_block_document( 

724 block_document=BlockDocumentCreate( 

725 name=credentials_block_name, 

726 data={ 

727 "client_id": client_id, 

728 "tenant_id": tenant_id, 

729 "client_secret": client_secret, 

730 }, 

731 block_type_id=credentials_block_type.id, 

732 block_schema_id=credentials_block_schema.id, 

733 ) 

734 ) 

735 

736 self._console.print( 

737 ( 

738 f"ACI credentials block '{credentials_block_name}' created in" 

739 " Prefect Cloud" 

740 ), 

741 style="green", 

742 ) 

743 return block_doc.id 

744 

745 except ObjectAlreadyExists: 

746 self._console.print( 

747 f"ACI credentials block '{credentials_block_name}' already exists", 

748 style="yellow", 

749 ) 

750 block_doc = await client.read_block_document_by_name( 

751 name=credentials_block_name, 

752 block_type_slug="azure-container-instance-credentials", 

753 ) 

754 return block_doc.id 

755 

756 except Exception as e: 

757 self._console.print( 

758 f"Failed to create ACI credentials block: {e}", 

759 style="red", 

760 ) 

761 raise e 

762 

763 async def _aci_credentials_block_exists( 

764 self, block_name: str, client: "PrefectClient" 

765 ) -> bool: 

766 """ 

767 Checks if an ACI credentials block with the given name already exists. 

768 

769 Args: 

770 block_name (str): The name of the ACI credentials block. 

771 client (PrefectClient): An instance of PrefectClient. 

772 

773 Returns: 

774 bool: True if the credentials block exists, False otherwise. 

775 """ 

776 try: 

777 await client.read_block_document_by_name( 

778 name=block_name, 

779 block_type_slug="azure-container-instance-credentials", 

780 ) 

781 return True 

782 except ObjectNotFound: 

783 return False 

784 

785 def _validate_user_input(self, name): 

786 if 2 < len(name) < 40 and name.isalnum(): 

787 return True 

788 else: 

789 return False 

790 

791 async def _create_provision_table( 

792 self, work_pool_name: str, client: "PrefectClient" 

793 ): 

794 return Panel( 

795 dedent( 

796 f"""\ 

797 Provisioning infrastructure for your work pool [blue]{work_pool_name}[/] will require: 

798 

799 Updates in subscription: [blue]{self._subscription_name}[/] 

800 

801 - Create a resource group in location: [blue]{self._location}[/] 

802 - Create an app registration in Azure AD: [blue]{self._app_registration_name}[/] 

803 - Create/use a service principal for app registration 

804 - Generate a secret for app registration 

805 - Create an Azure Container Registry with prefix [blue]{self._registry_name_prefix}[/] 

806 - Create an identity [blue]{self._identity_name}[/] to allow access to the created registry 

807 - Assign Contributor role to service account 

808 - Create an ACR registry for image hosting 

809 - Create an identity for Azure Container Instance to allow access to the registry 

810 

811 Updates in Prefect workspace 

812 

813 - Create Azure Container Instance credentials block: [blue]{self._credentials_block_name}[/] 

814 """ 

815 ), 

816 expand=False, 

817 ) 

818 

819 async def _customize_resource_names( 

820 self, work_pool_name: str, client: "PrefectClient" 

821 ) -> bool: 

822 from prefect.cli._prompts import prompt 

823 

824 self._resource_group_name = prompt( 

825 "Please enter a name for the resource group", 

826 default=self._resource_group_name, 

827 ) 

828 self._app_registration_name = prompt( 

829 "Please enter a name for the app registration", 

830 default=self._app_registration_name, 

831 ) 

832 while True: 

833 self._registry_name_prefix = prompt( 

834 "Please enter a prefix for the Azure Container Registry", 

835 default=self._registry_name_prefix, 

836 ) 

837 if self._validate_user_input(self._registry_name_prefix): 

838 break 

839 else: 

840 self._console.print( 

841 "The prefix must be alphanumeric and between 3-50 characters.", 

842 style="red", 

843 ) 

844 self._identity_name = prompt( 

845 "Please enter a name for the identity (used for ACR access)", 

846 default=self._identity_name, 

847 ) 

848 self._credentials_block_name = prompt( 

849 "Please enter a name for the ACI credentials block", 

850 default=self._credentials_block_name, 

851 ) 

852 table = await self._create_provision_table(work_pool_name, client) 

853 self._console.print(table) 

854 

855 return Confirm.ask( 

856 "Proceed with infrastructure provisioning?", console=self._console 

857 ) 

858 

859 @inject_client 

860 async def provision( 

861 self, 

862 work_pool_name: str, 

863 base_job_template: Dict[str, Any], 

864 client: Optional["PrefectClient"] = None, 

865 ) -> Dict[str, Any]: 

866 """ 

867 Orchestrates the provisioning of Azure resources and setup for the push work pool. 

868 

869 Args: 

870 work_pool_name (str): The name of the work pool. 

871 base_job_template (Dict[str, Any]): The base template for job creation. 

872 client (Optional[PrefectClient]): An instance of PrefectClient. If None, it will be injected. 

873 

874 Returns: 

875 Dict[str, Any]: The updated job template with necessary references and configurations. 

876 

877 Raises: 

878 RuntimeError: If client injection fails or the Azure CLI command execution fails. 

879 """ 

880 from prefect.cli._prompts import prompt_select_from_table 

881 

882 if not client: 

883 self._console.print( 

884 "Client injection failed, cannot proceed with provisioning.", 

885 style="red", 

886 ) 

887 return base_job_template 

888 

889 await self._verify_az_ready() 

890 await self._select_subscription() 

891 await self.set_location() 

892 self._credentials_block_name = f"{work_pool_name}-push-pool-credentials" 

893 

894 table = await self._create_provision_table(work_pool_name, client) 

895 self._console.print(table) 

896 if self._console.is_interactive: 

897 chosen_option = prompt_select_from_table( 

898 self._console, 

899 "Proceed with infrastructure provisioning with default resource names?", 

900 [ 

901 {"header": "Options:", "key": "option"}, 

902 ], 

903 [ 

904 { 

905 "option": ( 

906 "Yes, proceed with infrastructure provisioning with default" 

907 " resource names" 

908 ) 

909 }, 

910 {"option": "Customize resource names"}, 

911 {"option": "Do not proceed with infrastructure provisioning"}, 

912 ], 

913 ) 

914 if chosen_option["option"] == "Customize resource names": 

915 if not await self._customize_resource_names(work_pool_name, client): 

916 return base_job_template 

917 

918 elif ( 

919 chosen_option["option"] 

920 == "Do not proceed with infrastructure provisioning" 

921 ): 

922 return base_job_template 

923 elif ( 

924 chosen_option["option"] 

925 != "Yes, proceed with infrastructure provisioning with default" 

926 " resource names" 

927 ): 

928 # basically, we should never hit this. i'm concerned that we might change 

929 # the options in the future and forget to update this check 

930 raise ValueError(f"Invalid option selected: {chosen_option['option']}") 

931 

932 credentials_block_exists = await self._aci_credentials_block_exists( 

933 block_name=self._credentials_block_name, client=client 

934 ) 

935 

936 if not credentials_block_exists: 

937 total_tasks = 7 

938 else: 

939 total_tasks = 6 

940 

941 with Progress(console=self._console) as progress: 

942 self.azure_cli._console = progress.console 

943 task = progress.add_task("Provisioning infrastructure.", total=total_tasks) 

944 progress.console.print("Creating resource group") 

945 await self._create_resource_group() 

946 progress.advance(task) 

947 

948 progress.console.print("Creating app registration") 

949 client_id = await self._create_app_registration() 

950 progress.advance(task) 

951 

952 credentials_block_exists = await self._aci_credentials_block_exists( 

953 block_name=self._credentials_block_name, client=client 

954 ) 

955 

956 if not credentials_block_exists: 

957 progress.console.print("Generating secret for app registration") 

958 tenant_id, client_secret = await self._generate_secret_for_app( 

959 app_id=client_id, 

960 ) 

961 progress.advance(task) 

962 

963 progress.console.print("Creating ACI credentials block") 

964 block_doc_id = await self._create_aci_credentials_block( 

965 work_pool_name, client_id, tenant_id, client_secret, client 

966 ) 

967 progress.advance(task) 

968 else: 

969 progress.console.print( 

970 "ACI credentials block already exists.", style="yellow" 

971 ) 

972 block_doc = await client.read_block_document_by_name( 

973 name=self._credentials_block_name, 

974 block_type_slug="azure-container-instance-credentials", 

975 ) 

976 block_doc_id = block_doc.id 

977 progress.advance(task) 

978 

979 progress.console.print("Assigning Contributor role to service account") 

980 await self._assign_contributor_role( 

981 app_id=client_id, subscription_id=self._subscription_id 

982 ) 

983 progress.advance(task) 

984 

985 progress.console.print( 

986 "Creating Azure Container Registry (this make take a few minutes)" 

987 ) 

988 

989 registry_name = self._generate_acr_name(self._registry_name_prefix) 

990 registry = await self._get_or_create_registry( 

991 registry_name=registry_name, 

992 resource_group_name=self._resource_group_name, 

993 location=self._location, 

994 subscription_id=self._subscription_id, 

995 ) 

996 await self._log_into_registry( 

997 login_server=registry["loginServer"], 

998 subscription_id=self._subscription_id, 

999 ) 

1000 update_current_profile( 

1001 {PREFECT_DEFAULT_DOCKER_BUILD_NAMESPACE: registry["loginServer"]} 

1002 ) 

1003 progress.advance(task) 

1004 

1005 progress.console.print("Creating identity") 

1006 identity = await self._get_or_create_identity( 

1007 identity_name=self._identity_name, 

1008 resource_group_name=self._resource_group_name, 

1009 subscription_id=self._subscription_id, 

1010 ) 

1011 await self._assign_acr_pull_role( 

1012 identity=identity, 

1013 registry=registry, 

1014 subscription_id=self._subscription_id, 

1015 ) 

1016 progress.advance(task) 

1017 

1018 base_job_template_copy = deepcopy(base_job_template) 

1019 base_job_template_copy["variables"]["properties"]["aci_credentials"][ 

1020 "default" 

1021 ] = {"$ref": {"block_document_id": str(block_doc_id)}} 

1022 

1023 base_job_template_copy["variables"]["properties"]["resource_group_name"][ 

1024 "default" 

1025 ] = self._resource_group_name 

1026 

1027 base_job_template_copy["variables"]["properties"]["subscription_id"][ 

1028 "default" 

1029 ] = self._subscription_id 

1030 base_job_template_copy["variables"]["properties"]["image_registry"][ 

1031 "default" 

1032 ] = { 

1033 "registry_url": registry["loginServer"], 

1034 "identity": identity["id"], 

1035 } 

1036 base_job_template_copy["variables"]["properties"]["identities"]["default"] = [ 

1037 identity["id"] 

1038 ] 

1039 

1040 self._console.print( 

1041 dedent( 

1042 f"""\ 

1043 Your default Docker build namespace has been set to [blue]{registry["loginServer"]!r}[/]. 

1044 Use any image name to build and push to this registry by default: 

1045 """ 

1046 ), 

1047 Panel( 

1048 Syntax( 

1049 dedent( 

1050 f"""\ 

1051 from prefect import flow 

1052 from prefect.docker import DockerImage 

1053 

1054 

1055 @flow(log_prints=True) 

1056 def my_flow(name: str = "world"): 

1057 print(f"Hello {{name}}! I'm a flow running on an Azure Container Instance!") 

1058 

1059 

1060 if __name__ == "__main__": 

1061 my_flow.deploy( 

1062 name="my-deployment", 

1063 work_pool_name="{work_pool_name}", 

1064 image=DockerImage( 

1065 name="my-image:latest", 

1066 platform="linux/amd64", 

1067 ) 

1068 )""" 

1069 ), 

1070 "python", 

1071 background_color="default", 

1072 ), 

1073 title="example_deploy_script.py", 

1074 expand=False, 

1075 ), 

1076 ) 

1077 

1078 self._console.print( 

1079 ( 

1080 f"Infrastructure successfully provisioned for '{work_pool_name}' work" 

1081 " pool!" 

1082 ), 

1083 style="green", 

1084 ) 

1085 return base_job_template_copy