Coverage for /usr/local/lib/python3.12/site-packages/prefect/infrastructure/provisioners/coiled.py: 0%

88 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import importlib 

2from copy import deepcopy 

3from types import ModuleType 

4from typing import TYPE_CHECKING, Any, Dict, Optional 

5 

6from anyio import run_process 

7from rich.console import Console 

8from rich.progress import Progress, SpinnerColumn, TextColumn 

9from rich.prompt import Confirm 

10 

11from prefect._internal.installation import ainstall_packages 

12from prefect.client.schemas.actions import BlockDocumentCreate 

13from prefect.client.schemas.objects import BlockDocument 

14from prefect.client.utilities import inject_client 

15from prefect.exceptions import ObjectNotFound 

16from prefect.utilities.importtools import lazy_import 

17 

18if TYPE_CHECKING: 

19 from prefect.client.orchestration import PrefectClient 

20 

21 

22coiled: ModuleType = lazy_import("coiled") 

23 

24 

25class CoiledPushProvisioner: 

26 """ 

27 A infrastructure provisioner for Coiled push work pools. 

28 """ 

29 

30 def __init__(self, client: Optional["PrefectClient"] = None): 

31 self._console = Console() 

32 

33 @property 

34 def console(self) -> Console: 

35 return self._console 

36 

37 @console.setter 

38 def console(self, value: Console) -> None: 

39 self._console = value 

40 

41 @staticmethod 

42 def _is_coiled_installed() -> bool: 

43 """ 

44 Checks if the coiled package is installed. 

45 

46 Returns: 

47 True if the coiled package is installed, False otherwise 

48 """ 

49 try: 

50 importlib.import_module("coiled") 

51 return True 

52 except ModuleNotFoundError: 

53 return False 

54 

55 async def _install_coiled(self): 

56 """ 

57 Installs the coiled package. 

58 """ 

59 with Progress( 

60 SpinnerColumn(), 

61 TextColumn("[bold blue]Installing coiled..."), 

62 transient=True, 

63 console=self.console, 

64 ) as progress: 

65 task = progress.add_task("coiled install") 

66 progress.start() 

67 global coiled 

68 await ainstall_packages(["coiled"]) 

69 coiled = importlib.import_module("coiled") 

70 progress.advance(task) 

71 

72 async def _get_coiled_token(self) -> str: 

73 """ 

74 Gets a Coiled API token from the current Coiled configuration. 

75 """ 

76 import dask.config 

77 

78 return dask.config.get("coiled.token", "") 

79 

80 async def _create_new_coiled_token(self): 

81 """ 

82 Triggers a Coiled login via the browser if no current token. Will create a new token. 

83 """ 

84 await run_process(["coiled", "login"]) 

85 

86 async def _create_coiled_credentials_block( 

87 self, 

88 block_document_name: str, 

89 coiled_token: str, 

90 client: "PrefectClient", 

91 ) -> BlockDocument: 

92 """ 

93 Creates a CoiledCredentials block containing the provided token. 

94 

95 Args: 

96 block_document_name: The name of the block document to create 

97 coiled_token: The Coiled API token 

98 

99 Returns: 

100 The ID of the created block 

101 """ 

102 assert client is not None, "client injection failed" 

103 try: 

104 credentials_block_type = await client.read_block_type_by_slug( 

105 "coiled-credentials" 

106 ) 

107 except ObjectNotFound: 

108 # Shouldn't happen, but just in case 

109 raise RuntimeError( 

110 "Unable to find CoiledCredentials block type. Please ensure you are" 

111 " using Prefect Cloud." 

112 ) 

113 credentials_block_schema = ( 

114 await client.get_most_recent_block_schema_for_block_type( 

115 block_type_id=credentials_block_type.id 

116 ) 

117 ) 

118 assert credentials_block_schema is not None, ( 

119 f"Unable to find schema for block type {credentials_block_type.slug}" 

120 ) 

121 

122 block_doc = await client.create_block_document( 

123 block_document=BlockDocumentCreate( 

124 name=block_document_name, 

125 data={ 

126 "api_token": coiled_token, 

127 }, 

128 block_type_id=credentials_block_type.id, 

129 block_schema_id=credentials_block_schema.id, 

130 ) 

131 ) 

132 return block_doc 

133 

134 @inject_client 

135 async def provision( 

136 self, 

137 work_pool_name: str, 

138 base_job_template: Dict[str, Any], 

139 client: Optional["PrefectClient"] = None, 

140 ) -> Dict[str, Any]: 

141 """ 

142 Provisions resources necessary for a Coiled push work pool. 

143 

144 Provisioned resources: 

145 - A CoiledCredentials block containing a Coiled API token 

146 

147 Args: 

148 work_pool_name: The name of the work pool to provision resources for 

149 base_job_template: The base job template to update 

150 

151 Returns: 

152 A copy of the provided base job template with the provisioned resources 

153 """ 

154 credentials_block_name = f"{work_pool_name}-coiled-credentials" 

155 base_job_template_copy = deepcopy(base_job_template) 

156 assert client is not None, "client injection failed" 

157 try: 

158 block_doc = await client.read_block_document_by_name( 

159 credentials_block_name, "coiled-credentials" 

160 ) 

161 self.console.print( 

162 f"Work pool [blue]{work_pool_name!r}[/] will reuse the existing Coiled" 

163 f" credentials block [blue]{credentials_block_name!r}[/blue]" 

164 ) 

165 except ObjectNotFound: 

166 if self._console.is_interactive and not Confirm.ask( 

167 ( 

168 "\n" 

169 "To configure your Coiled push work pool we'll need to store a Coiled" 

170 " API token with Prefect Cloud as a block. We'll pull the token from" 

171 " your local Coiled configuration or create a new token if we" 

172 " can't find one.\n" 

173 "\n" 

174 "Would you like to continue?" 

175 ), 

176 console=self.console, 

177 default=True, 

178 ): 

179 self.console.print( 

180 "No problem! You can always configure your Coiled push work pool" 

181 " later via the Prefect UI." 

182 ) 

183 return base_job_template 

184 

185 if not self._is_coiled_installed(): 

186 if self.console.is_interactive and Confirm.ask( 

187 ( 

188 "The [blue]coiled[/] package is required to configure" 

189 " authentication for your work pool.\n" 

190 "\n" 

191 "Would you like to install it now?" 

192 ), 

193 console=self.console, 

194 default=True, 

195 ): 

196 await self._install_coiled() 

197 

198 if not self._is_coiled_installed(): 

199 raise RuntimeError( 

200 "The coiled package is not installed.\n\nPlease try installing coiled," 

201 " or you can use the Prefect UI to create your Coiled push work pool." 

202 ) 

203 

204 # Get the current Coiled API token 

205 coiled_api_token = await self._get_coiled_token() 

206 if not coiled_api_token: 

207 # Create a new token one wasn't found 

208 if self.console.is_interactive and Confirm.ask( 

209 "Coiled credentials not found. Would you like to create a new token?", 

210 console=self.console, 

211 default=True, 

212 ): 

213 await self._create_new_coiled_token() 

214 coiled_api_token = await self._get_coiled_token() 

215 else: 

216 raise RuntimeError( 

217 "Coiled credentials not found. Please create a new token by" 

218 " running [blue]coiled login[/] and try again." 

219 ) 

220 

221 # Create the credentials block 

222 with Progress( 

223 SpinnerColumn(), 

224 TextColumn("[bold blue]Saving Coiled credentials..."), 

225 transient=True, 

226 console=self.console, 

227 ) as progress: 

228 task = progress.add_task("create coiled credentials block") 

229 progress.start() 

230 block_doc = await self._create_coiled_credentials_block( 

231 credentials_block_name, 

232 coiled_api_token, 

233 client=client, 

234 ) 

235 progress.advance(task) 

236 

237 base_job_template_copy["variables"]["properties"]["credentials"]["default"] = { 

238 "$ref": {"block_document_id": str(block_doc.id)} 

239 } 

240 if "image" in base_job_template_copy["variables"]["properties"]: 

241 base_job_template_copy["variables"]["properties"]["image"]["default"] = "" 

242 self.console.print( 

243 f"Successfully configured Coiled push work pool {work_pool_name!r}!", 

244 style="green", 

245 ) 

246 return base_job_template_copy