Coverage for /usr/local/lib/python3.12/site-packages/prefect/infrastructure/provisioners/coiled.py: 0%
88 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import importlib
2from copy import deepcopy
3from types import ModuleType
4from typing import TYPE_CHECKING, Any, Dict, Optional
6from anyio import run_process
7from rich.console import Console
8from rich.progress import Progress, SpinnerColumn, TextColumn
9from rich.prompt import Confirm
11from prefect._internal.installation import ainstall_packages
12from prefect.client.schemas.actions import BlockDocumentCreate
13from prefect.client.schemas.objects import BlockDocument
14from prefect.client.utilities import inject_client
15from prefect.exceptions import ObjectNotFound
16from prefect.utilities.importtools import lazy_import
18if TYPE_CHECKING:
19 from prefect.client.orchestration import PrefectClient
22coiled: ModuleType = lazy_import("coiled")
25class CoiledPushProvisioner:
26 """
27 A infrastructure provisioner for Coiled push work pools.
28 """
30 def __init__(self, client: Optional["PrefectClient"] = None):
31 self._console = Console()
33 @property
34 def console(self) -> Console:
35 return self._console
37 @console.setter
38 def console(self, value: Console) -> None:
39 self._console = value
41 @staticmethod
42 def _is_coiled_installed() -> bool:
43 """
44 Checks if the coiled package is installed.
46 Returns:
47 True if the coiled package is installed, False otherwise
48 """
49 try:
50 importlib.import_module("coiled")
51 return True
52 except ModuleNotFoundError:
53 return False
55 async def _install_coiled(self):
56 """
57 Installs the coiled package.
58 """
59 with Progress(
60 SpinnerColumn(),
61 TextColumn("[bold blue]Installing coiled..."),
62 transient=True,
63 console=self.console,
64 ) as progress:
65 task = progress.add_task("coiled install")
66 progress.start()
67 global coiled
68 await ainstall_packages(["coiled"])
69 coiled = importlib.import_module("coiled")
70 progress.advance(task)
72 async def _get_coiled_token(self) -> str:
73 """
74 Gets a Coiled API token from the current Coiled configuration.
75 """
76 import dask.config
78 return dask.config.get("coiled.token", "")
80 async def _create_new_coiled_token(self):
81 """
82 Triggers a Coiled login via the browser if no current token. Will create a new token.
83 """
84 await run_process(["coiled", "login"])
86 async def _create_coiled_credentials_block(
87 self,
88 block_document_name: str,
89 coiled_token: str,
90 client: "PrefectClient",
91 ) -> BlockDocument:
92 """
93 Creates a CoiledCredentials block containing the provided token.
95 Args:
96 block_document_name: The name of the block document to create
97 coiled_token: The Coiled API token
99 Returns:
100 The ID of the created block
101 """
102 assert client is not None, "client injection failed"
103 try:
104 credentials_block_type = await client.read_block_type_by_slug(
105 "coiled-credentials"
106 )
107 except ObjectNotFound:
108 # Shouldn't happen, but just in case
109 raise RuntimeError(
110 "Unable to find CoiledCredentials block type. Please ensure you are"
111 " using Prefect Cloud."
112 )
113 credentials_block_schema = (
114 await client.get_most_recent_block_schema_for_block_type(
115 block_type_id=credentials_block_type.id
116 )
117 )
118 assert credentials_block_schema is not None, (
119 f"Unable to find schema for block type {credentials_block_type.slug}"
120 )
122 block_doc = await client.create_block_document(
123 block_document=BlockDocumentCreate(
124 name=block_document_name,
125 data={
126 "api_token": coiled_token,
127 },
128 block_type_id=credentials_block_type.id,
129 block_schema_id=credentials_block_schema.id,
130 )
131 )
132 return block_doc
134 @inject_client
135 async def provision(
136 self,
137 work_pool_name: str,
138 base_job_template: Dict[str, Any],
139 client: Optional["PrefectClient"] = None,
140 ) -> Dict[str, Any]:
141 """
142 Provisions resources necessary for a Coiled push work pool.
144 Provisioned resources:
145 - A CoiledCredentials block containing a Coiled API token
147 Args:
148 work_pool_name: The name of the work pool to provision resources for
149 base_job_template: The base job template to update
151 Returns:
152 A copy of the provided base job template with the provisioned resources
153 """
154 credentials_block_name = f"{work_pool_name}-coiled-credentials"
155 base_job_template_copy = deepcopy(base_job_template)
156 assert client is not None, "client injection failed"
157 try:
158 block_doc = await client.read_block_document_by_name(
159 credentials_block_name, "coiled-credentials"
160 )
161 self.console.print(
162 f"Work pool [blue]{work_pool_name!r}[/] will reuse the existing Coiled"
163 f" credentials block [blue]{credentials_block_name!r}[/blue]"
164 )
165 except ObjectNotFound:
166 if self._console.is_interactive and not Confirm.ask(
167 (
168 "\n"
169 "To configure your Coiled push work pool we'll need to store a Coiled"
170 " API token with Prefect Cloud as a block. We'll pull the token from"
171 " your local Coiled configuration or create a new token if we"
172 " can't find one.\n"
173 "\n"
174 "Would you like to continue?"
175 ),
176 console=self.console,
177 default=True,
178 ):
179 self.console.print(
180 "No problem! You can always configure your Coiled push work pool"
181 " later via the Prefect UI."
182 )
183 return base_job_template
185 if not self._is_coiled_installed():
186 if self.console.is_interactive and Confirm.ask(
187 (
188 "The [blue]coiled[/] package is required to configure"
189 " authentication for your work pool.\n"
190 "\n"
191 "Would you like to install it now?"
192 ),
193 console=self.console,
194 default=True,
195 ):
196 await self._install_coiled()
198 if not self._is_coiled_installed():
199 raise RuntimeError(
200 "The coiled package is not installed.\n\nPlease try installing coiled,"
201 " or you can use the Prefect UI to create your Coiled push work pool."
202 )
204 # Get the current Coiled API token
205 coiled_api_token = await self._get_coiled_token()
206 if not coiled_api_token:
207 # Create a new token one wasn't found
208 if self.console.is_interactive and Confirm.ask(
209 "Coiled credentials not found. Would you like to create a new token?",
210 console=self.console,
211 default=True,
212 ):
213 await self._create_new_coiled_token()
214 coiled_api_token = await self._get_coiled_token()
215 else:
216 raise RuntimeError(
217 "Coiled credentials not found. Please create a new token by"
218 " running [blue]coiled login[/] and try again."
219 )
221 # Create the credentials block
222 with Progress(
223 SpinnerColumn(),
224 TextColumn("[bold blue]Saving Coiled credentials..."),
225 transient=True,
226 console=self.console,
227 ) as progress:
228 task = progress.add_task("create coiled credentials block")
229 progress.start()
230 block_doc = await self._create_coiled_credentials_block(
231 credentials_block_name,
232 coiled_api_token,
233 client=client,
234 )
235 progress.advance(task)
237 base_job_template_copy["variables"]["properties"]["credentials"]["default"] = {
238 "$ref": {"block_document_id": str(block_doc.id)}
239 }
240 if "image" in base_job_template_copy["variables"]["properties"]:
241 base_job_template_copy["variables"]["properties"]["image"]["default"] = ""
242 self.console.print(
243 f"Successfully configured Coiled push work pool {work_pool_name!r}!",
244 style="green",
245 )
246 return base_job_template_copy