Coverage for /usr/local/lib/python3.12/site-packages/prefect/infrastructure/provisioners/modal.py: 0%
89 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import importlib
2import shlex
3import sys
4from copy import deepcopy
5from types import ModuleType
6from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
8from anyio import run_process
9from rich.console import Console
10from rich.progress import Progress, SpinnerColumn, TextColumn
11from rich.prompt import Confirm
13from prefect._internal.installation import ainstall_packages
14from prefect.client.schemas.actions import BlockDocumentCreate
15from prefect.client.schemas.objects import BlockDocument
16from prefect.client.utilities import inject_client
17from prefect.exceptions import ObjectNotFound
18from prefect.utilities.importtools import lazy_import
20if TYPE_CHECKING:
21 from prefect.client.orchestration import PrefectClient
24modal: ModuleType = lazy_import("modal")
27class ModalPushProvisioner:
28 """
29 A infrastructure provisioner for Modal push work pools.
30 """
32 def __init__(self, client: Optional["PrefectClient"] = None):
33 self._console: Console = Console()
35 @property
36 def console(self) -> Console:
37 return self._console
39 @console.setter
40 def console(self, value: Console) -> None:
41 self._console = value
43 @staticmethod
44 def _is_modal_installed() -> bool:
45 """
46 Checks if the modal package is installed.
48 Returns:
49 True if the modal package is installed, False otherwise
50 """
51 try:
52 importlib.import_module("modal")
53 return True
54 except ModuleNotFoundError:
55 return False
57 async def _install_modal(self):
58 """
59 Installs the modal package.
60 """
61 with Progress(
62 SpinnerColumn(),
63 TextColumn("[bold blue]Installing modal..."),
64 transient=True,
65 console=self.console,
66 ) as progress:
67 task = progress.add_task("modal install")
68 progress.start()
69 global modal
70 await ainstall_packages(["modal"])
71 modal = importlib.import_module("modal")
72 progress.advance(task)
74 async def _get_modal_token_id_and_secret(self) -> Tuple[str, str]:
75 """
76 Gets a Model API token ID and secret from the current Modal configuration.
77 """
78 modal_config = modal.config.Config()
79 modal_token_id = modal_config.get("token_id")
80 modal_token_secret = modal_config.get("token_secret")
82 return modal_token_id, modal_token_secret
84 async def _create_new_modal_token(self):
85 """
86 Triggers a Modal login via the browser. Will create a new token in the default Modal profile.
87 """
88 await run_process([shlex.quote(sys.executable), "-m", "modal", "token", "new"])
89 # Reload the modal.config module to pick up the new token
90 importlib.reload(modal.config)
92 async def _create_modal_credentials_block(
93 self,
94 block_document_name: str,
95 modal_token_id: str,
96 modal_token_secret: str,
97 client: "PrefectClient",
98 ) -> BlockDocument:
99 """
100 Creates a ModalCredentials block containing the provided token ID and secret.
102 Args:
103 block_document_name: The name of the block document to create
104 modal_token_id: The Modal token ID
105 modal_token_secret: The Modal token secret
107 Returns:
108 The ID of the created block
109 """
110 assert client is not None, "client injection failed"
111 try:
112 credentials_block_type = await client.read_block_type_by_slug(
113 "modal-credentials"
114 )
115 except ObjectNotFound:
116 # Shouldn't happen, but just in case
117 raise RuntimeError(
118 "Unable to find ModalCredentials block type. Please ensure you are"
119 " using Prefect Cloud."
120 )
121 credentials_block_schema = (
122 await client.get_most_recent_block_schema_for_block_type(
123 block_type_id=credentials_block_type.id
124 )
125 )
126 assert credentials_block_schema is not None, (
127 f"Unable to find schema for block type {credentials_block_type.slug}"
128 )
130 block_doc = await client.create_block_document(
131 block_document=BlockDocumentCreate(
132 name=block_document_name,
133 data={
134 "token_id": modal_token_id,
135 "token_secret": modal_token_secret,
136 },
137 block_type_id=credentials_block_type.id,
138 block_schema_id=credentials_block_schema.id,
139 )
140 )
141 return block_doc
143 @inject_client
144 async def provision(
145 self,
146 work_pool_name: str,
147 base_job_template: Dict[str, Any],
148 client: Optional["PrefectClient"] = None,
149 ) -> Dict[str, Any]:
150 """
151 Provisions resources necessary for a Modal push work pool.
153 Provisioned resources:
154 - A ModalCredentials block containing a Modal API token
156 Args:
157 work_pool_name: The name of the work pool to provision resources for
158 base_job_template: The base job template to update
160 Returns:
161 A copy of the provided base job template with the provisioned resources
162 """
163 credentials_block_name = f"{work_pool_name}-modal-credentials"
164 base_job_template_copy = deepcopy(base_job_template)
165 assert client is not None, "client injection failed"
166 try:
167 block_doc = await client.read_block_document_by_name(
168 credentials_block_name, "modal-credentials"
169 )
170 self.console.print(
171 f"Work pool [blue]{work_pool_name!r}[/] will reuse the existing Modal"
172 f" credentials block [blue]{credentials_block_name!r}[/blue]"
173 )
174 except ObjectNotFound:
175 if self._console.is_interactive and not Confirm.ask(
176 (
177 "To configure your Modal push work pool we'll need to store a Modal"
178 " token with Prefect Cloud as a block. We'll pull the token from"
179 " your local Modal configuration or create a new token if we"
180 " can't find one. Would you like to continue?"
181 ),
182 console=self.console,
183 ):
184 self.console.print(
185 "No problem! You can always configure your Modal push work pool"
186 " later via the Prefect UI."
187 )
188 return base_job_template
190 if not self._is_modal_installed():
191 if self.console.is_interactive and Confirm.ask(
192 (
193 "The [blue]modal[/] package is required to configure"
194 " authentication for your work pool. Would you like to install"
195 " it now?"
196 ),
197 console=self.console,
198 ):
199 await self._install_modal()
201 # Get the current Modal token ID and secret
202 (
203 modal_token_id,
204 modal_token_secret,
205 ) = await self._get_modal_token_id_and_secret()
206 if not modal_token_id or not modal_token_secret:
207 # Create a new token one wasn't found
208 if self.console.is_interactive and Confirm.ask(
209 (
210 "Modal credentials not found. Would you like to create a new"
211 " token?"
212 ),
213 console=self.console,
214 ):
215 await self._create_new_modal_token()
216 (
217 modal_token_id,
218 modal_token_secret,
219 ) = await self._get_modal_token_id_and_secret()
220 else:
221 raise RuntimeError(
222 "Modal credentials not found. Please create a new token by"
223 " running [blue]modal token new[/] and try again."
224 )
226 # Create the credentials block
227 with Progress(
228 SpinnerColumn(),
229 TextColumn("[bold blue]Saving Modal credentials..."),
230 transient=True,
231 console=self.console,
232 ) as progress:
233 task = progress.add_task("create modal credentials block")
234 progress.start()
235 block_doc = await self._create_modal_credentials_block(
236 credentials_block_name,
237 modal_token_id,
238 modal_token_secret,
239 client=client,
240 )
241 progress.advance(task)
243 base_job_template_copy["variables"]["properties"]["modal_credentials"][
244 "default"
245 ] = {"$ref": {"block_document_id": str(block_doc.id)}}
246 self.console.print(
247 f"Successfully configured Modal push work pool {work_pool_name!r}!",
248 style="green",
249 )
250 return base_job_template_copy