Coverage for /usr/local/lib/python3.12/site-packages/prefect/infrastructure/provisioners/modal.py: 0%

89 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import importlib 

2import shlex 

3import sys 

4from copy import deepcopy 

5from types import ModuleType 

6from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple 

7 

8from anyio import run_process 

9from rich.console import Console 

10from rich.progress import Progress, SpinnerColumn, TextColumn 

11from rich.prompt import Confirm 

12 

13from prefect._internal.installation import ainstall_packages 

14from prefect.client.schemas.actions import BlockDocumentCreate 

15from prefect.client.schemas.objects import BlockDocument 

16from prefect.client.utilities import inject_client 

17from prefect.exceptions import ObjectNotFound 

18from prefect.utilities.importtools import lazy_import 

19 

20if TYPE_CHECKING: 

21 from prefect.client.orchestration import PrefectClient 

22 

23 

24modal: ModuleType = lazy_import("modal") 

25 

26 

27class ModalPushProvisioner: 

28 """ 

29 A infrastructure provisioner for Modal push work pools. 

30 """ 

31 

32 def __init__(self, client: Optional["PrefectClient"] = None): 

33 self._console: Console = Console() 

34 

35 @property 

36 def console(self) -> Console: 

37 return self._console 

38 

39 @console.setter 

40 def console(self, value: Console) -> None: 

41 self._console = value 

42 

43 @staticmethod 

44 def _is_modal_installed() -> bool: 

45 """ 

46 Checks if the modal package is installed. 

47 

48 Returns: 

49 True if the modal package is installed, False otherwise 

50 """ 

51 try: 

52 importlib.import_module("modal") 

53 return True 

54 except ModuleNotFoundError: 

55 return False 

56 

57 async def _install_modal(self): 

58 """ 

59 Installs the modal package. 

60 """ 

61 with Progress( 

62 SpinnerColumn(), 

63 TextColumn("[bold blue]Installing modal..."), 

64 transient=True, 

65 console=self.console, 

66 ) as progress: 

67 task = progress.add_task("modal install") 

68 progress.start() 

69 global modal 

70 await ainstall_packages(["modal"]) 

71 modal = importlib.import_module("modal") 

72 progress.advance(task) 

73 

74 async def _get_modal_token_id_and_secret(self) -> Tuple[str, str]: 

75 """ 

76 Gets a Model API token ID and secret from the current Modal configuration. 

77 """ 

78 modal_config = modal.config.Config() 

79 modal_token_id = modal_config.get("token_id") 

80 modal_token_secret = modal_config.get("token_secret") 

81 

82 return modal_token_id, modal_token_secret 

83 

84 async def _create_new_modal_token(self): 

85 """ 

86 Triggers a Modal login via the browser. Will create a new token in the default Modal profile. 

87 """ 

88 await run_process([shlex.quote(sys.executable), "-m", "modal", "token", "new"]) 

89 # Reload the modal.config module to pick up the new token 

90 importlib.reload(modal.config) 

91 

92 async def _create_modal_credentials_block( 

93 self, 

94 block_document_name: str, 

95 modal_token_id: str, 

96 modal_token_secret: str, 

97 client: "PrefectClient", 

98 ) -> BlockDocument: 

99 """ 

100 Creates a ModalCredentials block containing the provided token ID and secret. 

101 

102 Args: 

103 block_document_name: The name of the block document to create 

104 modal_token_id: The Modal token ID 

105 modal_token_secret: The Modal token secret 

106 

107 Returns: 

108 The ID of the created block 

109 """ 

110 assert client is not None, "client injection failed" 

111 try: 

112 credentials_block_type = await client.read_block_type_by_slug( 

113 "modal-credentials" 

114 ) 

115 except ObjectNotFound: 

116 # Shouldn't happen, but just in case 

117 raise RuntimeError( 

118 "Unable to find ModalCredentials block type. Please ensure you are" 

119 " using Prefect Cloud." 

120 ) 

121 credentials_block_schema = ( 

122 await client.get_most_recent_block_schema_for_block_type( 

123 block_type_id=credentials_block_type.id 

124 ) 

125 ) 

126 assert credentials_block_schema is not None, ( 

127 f"Unable to find schema for block type {credentials_block_type.slug}" 

128 ) 

129 

130 block_doc = await client.create_block_document( 

131 block_document=BlockDocumentCreate( 

132 name=block_document_name, 

133 data={ 

134 "token_id": modal_token_id, 

135 "token_secret": modal_token_secret, 

136 }, 

137 block_type_id=credentials_block_type.id, 

138 block_schema_id=credentials_block_schema.id, 

139 ) 

140 ) 

141 return block_doc 

142 

143 @inject_client 

144 async def provision( 

145 self, 

146 work_pool_name: str, 

147 base_job_template: Dict[str, Any], 

148 client: Optional["PrefectClient"] = None, 

149 ) -> Dict[str, Any]: 

150 """ 

151 Provisions resources necessary for a Modal push work pool. 

152 

153 Provisioned resources: 

154 - A ModalCredentials block containing a Modal API token 

155 

156 Args: 

157 work_pool_name: The name of the work pool to provision resources for 

158 base_job_template: The base job template to update 

159 

160 Returns: 

161 A copy of the provided base job template with the provisioned resources 

162 """ 

163 credentials_block_name = f"{work_pool_name}-modal-credentials" 

164 base_job_template_copy = deepcopy(base_job_template) 

165 assert client is not None, "client injection failed" 

166 try: 

167 block_doc = await client.read_block_document_by_name( 

168 credentials_block_name, "modal-credentials" 

169 ) 

170 self.console.print( 

171 f"Work pool [blue]{work_pool_name!r}[/] will reuse the existing Modal" 

172 f" credentials block [blue]{credentials_block_name!r}[/blue]" 

173 ) 

174 except ObjectNotFound: 

175 if self._console.is_interactive and not Confirm.ask( 

176 ( 

177 "To configure your Modal push work pool we'll need to store a Modal" 

178 " token with Prefect Cloud as a block. We'll pull the token from" 

179 " your local Modal configuration or create a new token if we" 

180 " can't find one. Would you like to continue?" 

181 ), 

182 console=self.console, 

183 ): 

184 self.console.print( 

185 "No problem! You can always configure your Modal push work pool" 

186 " later via the Prefect UI." 

187 ) 

188 return base_job_template 

189 

190 if not self._is_modal_installed(): 

191 if self.console.is_interactive and Confirm.ask( 

192 ( 

193 "The [blue]modal[/] package is required to configure" 

194 " authentication for your work pool. Would you like to install" 

195 " it now?" 

196 ), 

197 console=self.console, 

198 ): 

199 await self._install_modal() 

200 

201 # Get the current Modal token ID and secret 

202 ( 

203 modal_token_id, 

204 modal_token_secret, 

205 ) = await self._get_modal_token_id_and_secret() 

206 if not modal_token_id or not modal_token_secret: 

207 # Create a new token one wasn't found 

208 if self.console.is_interactive and Confirm.ask( 

209 ( 

210 "Modal credentials not found. Would you like to create a new" 

211 " token?" 

212 ), 

213 console=self.console, 

214 ): 

215 await self._create_new_modal_token() 

216 ( 

217 modal_token_id, 

218 modal_token_secret, 

219 ) = await self._get_modal_token_id_and_secret() 

220 else: 

221 raise RuntimeError( 

222 "Modal credentials not found. Please create a new token by" 

223 " running [blue]modal token new[/] and try again." 

224 ) 

225 

226 # Create the credentials block 

227 with Progress( 

228 SpinnerColumn(), 

229 TextColumn("[bold blue]Saving Modal credentials..."), 

230 transient=True, 

231 console=self.console, 

232 ) as progress: 

233 task = progress.add_task("create modal credentials block") 

234 progress.start() 

235 block_doc = await self._create_modal_credentials_block( 

236 credentials_block_name, 

237 modal_token_id, 

238 modal_token_secret, 

239 client=client, 

240 ) 

241 progress.advance(task) 

242 

243 base_job_template_copy["variables"]["properties"]["modal_credentials"][ 

244 "default" 

245 ] = {"$ref": {"block_document_id": str(block_doc.id)}} 

246 self.console.print( 

247 f"Successfully configured Modal push work pool {work_pool_name!r}!", 

248 style="green", 

249 ) 

250 return base_job_template_copy