Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/worker.py: 24%

122 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import asyncio 1a

2import json 1a

3import os 1a

4from enum import Enum 1a

5from typing import List, Optional, Type 1a

6 

7import typer 1a

8 

9from prefect._internal.installation import ainstall_packages 1a

10from prefect._internal.integrations import KNOWN_EXTRAS_FOR_PACKAGES 1a

11from prefect.cli._prompts import confirm 1a

12from prefect.cli._types import PrefectTyper, SettingsOption 1a

13from prefect.cli._utilities import exit_with_error 1a

14from prefect.cli.root import app, is_interactive 1a

15from prefect.client.collections import get_collections_metadata_client 1a

16from prefect.client.orchestration import get_client 1a

17from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName 1a

18from prefect.exceptions import ObjectNotFound 1a

19from prefect.plugins import load_prefect_collections 1a

20from prefect.settings import ( 1a

21 PREFECT_WORKER_HEARTBEAT_SECONDS, 

22 PREFECT_WORKER_PREFETCH_SECONDS, 

23) 

24from prefect.utilities.dispatch import lookup_type 1a

25from prefect.utilities.processutils import ( 1a

26 setup_signal_handlers_worker, 

27) 

28from prefect.workers.base import BaseWorker 1a

29 

30worker_app: PrefectTyper = PrefectTyper( 1a

31 name="worker", help="Start and interact with workers." 

32) 

33app.add_typer(worker_app) 1a

34 

35 

36class InstallPolicy(str, Enum): 1a

37 ALWAYS = "always" 1a

38 IF_NOT_PRESENT = "if-not-present" 1a

39 NEVER = "never" 1a

40 PROMPT = "prompt" 1a

41 

42 

43@worker_app.command() 1a

44async def start( 1a

45 worker_name: str = typer.Option( 

46 None, 

47 "-n", 

48 "--name", 

49 help=( 

50 "The name to give to the started worker. If not provided, a unique name" 

51 " will be generated." 

52 ), 

53 ), 

54 work_pool_name: str = typer.Option( 

55 ..., 

56 "-p", 

57 "--pool", 

58 help="The work pool the started worker should poll.", 

59 prompt=True, 

60 ), 

61 work_queues: List[str] = typer.Option( 

62 None, 

63 "-q", 

64 "--work-queue", 

65 help=( 

66 "One or more work queue names for the worker to pull from. If not provided," 

67 " the worker will pull from all work queues in the work pool." 

68 ), 

69 ), 

70 worker_type: Optional[str] = typer.Option( 

71 None, 

72 "-t", 

73 "--type", 

74 help=( 

75 "The type of worker to start. If not provided, the worker type will be" 

76 " inferred from the work pool." 

77 ), 

78 ), 

79 prefetch_seconds: int = SettingsOption( 

80 PREFECT_WORKER_PREFETCH_SECONDS, 

81 help="Number of seconds to look into the future for scheduled flow runs.", 

82 ), 

83 run_once: bool = typer.Option( 

84 False, help="Only run worker polling once. By default, the worker runs forever." 

85 ), 

86 limit: int = typer.Option( 

87 None, 

88 "-l", 

89 "--limit", 

90 help="Maximum number of flow runs to execute concurrently.", 

91 ), 

92 with_healthcheck: bool = typer.Option( 

93 False, help="Start a healthcheck server for the worker." 

94 ), 

95 install_policy: InstallPolicy = typer.Option( 

96 InstallPolicy.PROMPT.value, 

97 "--install-policy", 

98 help="Install policy to use workers from Prefect integration packages.", 

99 case_sensitive=False, 

100 ), 

101 base_job_template: typer.FileText = typer.Option( 

102 None, 

103 "--base-job-template", 

104 help=( 

105 "The path to a JSON file containing the base job template to use. If" 

106 " unspecified, Prefect will use the default base job template for the given" 

107 " worker type. If the work pool already exists, this will be ignored." 

108 ), 

109 ), 

110): 

111 """ 

112 Start a worker process to poll a work pool for flow runs. 

113 """ 

114 

115 is_paused = await _check_work_pool_paused(work_pool_name) 

116 if is_paused: 

117 app.console.print( 

118 ( 

119 f"The work pool {work_pool_name!r} is currently paused. This worker" 

120 " will not execute any flow runs until the work pool is unpaused." 

121 ), 

122 style="yellow", 

123 ) 

124 

125 is_queues_paused = await _check_work_queues_paused( 

126 work_pool_name, 

127 work_queues, 

128 ) 

129 if is_queues_paused: 

130 queue_scope = ( 

131 "All work queues" if not work_queues else "Specified work queue(s)" 

132 ) 

133 app.console.print( 

134 ( 

135 f"{queue_scope} in the work pool {work_pool_name!r} are currently" 

136 " paused. This worker will not execute any flow runs until the work" 

137 " queues are unpaused." 

138 ), 

139 style="yellow", 

140 ) 

141 worker_cls = await _get_worker_class(worker_type, work_pool_name, install_policy) 

142 

143 if worker_cls is None: 

144 exit_with_error( 

145 "Unable to start worker. Please ensure you have the necessary dependencies" 

146 " installed to run your desired worker type." 

147 ) 

148 

149 worker_process_id = os.getpid() 

150 setup_signal_handlers_worker( 

151 worker_process_id, f"the {worker_type} worker", app.console.print 

152 ) 

153 

154 template_contents = None 

155 if base_job_template is not None: 

156 template_contents = json.load(fp=base_job_template) 

157 

158 worker = worker_cls( 

159 name=worker_name, 

160 work_pool_name=work_pool_name, 

161 work_queues=work_queues, 

162 limit=limit, 

163 prefetch_seconds=prefetch_seconds, 

164 heartbeat_interval_seconds=int(PREFECT_WORKER_HEARTBEAT_SECONDS.value()), 

165 base_job_template=template_contents, 

166 ) 

167 try: 

168 await worker.start( 

169 run_once=run_once, 

170 with_healthcheck=with_healthcheck, 

171 printer=app.console.print, 

172 ) 

173 except asyncio.CancelledError: 

174 app.console.print(f"Worker {worker.name!r} stopped!", style="yellow") 

175 

176 

177async def _check_work_pool_paused(work_pool_name: str) -> bool: 1a

178 try: 

179 async with get_client() as client: 

180 work_pool = await client.read_work_pool(work_pool_name=work_pool_name) 

181 return work_pool.is_paused 

182 except ObjectNotFound: 

183 return False 

184 

185 

186async def _check_work_queues_paused( 1a

187 work_pool_name: str, work_queues: Optional[List[str]] 

188) -> bool: 

189 """ 

190 Check if all work queues in the work pool are paused. If work queues are specified, 

191 only those work queues are checked. 

192 

193 Args: 

194 - work_pool_name (str): the name of the work pool to check 

195 - work_queues (Optional[List[str]]): the names of the work queues to check 

196 

197 Returns: 

198 - bool: True if work queues are paused, False otherwise 

199 """ 

200 try: 

201 work_queues_filter = ( 

202 WorkQueueFilter(name=WorkQueueFilterName(any_=work_queues)) 

203 if work_queues 

204 else None 

205 ) 

206 async with get_client() as client: 

207 wqs = await client.read_work_queues( 

208 work_pool_name=work_pool_name, work_queue_filter=work_queues_filter 

209 ) 

210 return all(queue.is_paused for queue in wqs) if wqs else False 

211 except ObjectNotFound: 

212 return False 

213 

214 

215async def _retrieve_worker_type_from_pool(work_pool_name: Optional[str] = None) -> str: 1a

216 try: 

217 async with get_client() as client: 

218 work_pool = await client.read_work_pool(work_pool_name=work_pool_name) 

219 

220 worker_type = work_pool.type 

221 app.console.print( 

222 f"Discovered type {worker_type!r} for work pool {work_pool.name!r}." 

223 ) 

224 

225 if work_pool.is_push_pool or work_pool.is_managed_pool: 

226 exit_with_error( 

227 "Workers are not required for push work pools. " 

228 "See https://docs.prefect.io/latest/deploy/infrastructure-examples/serverless " 

229 "for more details." 

230 ) 

231 

232 except ObjectNotFound: 

233 app.console.print( 

234 ( 

235 f"Work pool {work_pool_name!r} does not exist and no worker type was" 

236 " provided. Starting a process worker..." 

237 ), 

238 style="yellow", 

239 ) 

240 worker_type = "process" 

241 return worker_type 

242 

243 

244def _load_worker_class(worker_type: str) -> Optional[Type[BaseWorker]]: 1a

245 try: 

246 load_prefect_collections() 

247 return lookup_type(BaseWorker, worker_type) 

248 except KeyError: 

249 return None 

250 

251 

252async def _install_package( 1a

253 package: str, upgrade: bool = False 

254) -> Optional[Type[BaseWorker]]: 

255 app.console.print(f"Installing {package}...") 

256 install_package = KNOWN_EXTRAS_FOR_PACKAGES.get(package, package) 

257 await ainstall_packages([install_package], stream_output=True, upgrade=upgrade) 

258 

259 

260async def _find_package_for_worker_type(worker_type: str) -> Optional[str]: 1a

261 async with get_collections_metadata_client() as client: 

262 worker_metadata = await client.read_worker_metadata() 

263 

264 worker_types_with_packages = { 

265 worker_type: package_name 

266 for package_name, worker_dict in worker_metadata.items() 

267 for worker_type in worker_dict 

268 if worker_type != "prefect-agent" 

269 } 

270 try: 

271 return worker_types_with_packages[worker_type] 

272 except KeyError: 

273 app.console.print( 

274 f"Could not find a package for worker type {worker_type!r}.", 

275 style="yellow", 

276 ) 

277 return None 

278 

279 

280async def _get_worker_class( 1a

281 worker_type: Optional[str] = None, 

282 work_pool_name: Optional[str] = None, 

283 install_policy: InstallPolicy = InstallPolicy.PROMPT, 

284) -> Optional[Type[BaseWorker]]: 

285 if worker_type is None and work_pool_name is None: 

286 raise ValueError("Must provide either worker_type or work_pool_name.") 

287 

288 if worker_type is None: 

289 worker_type = await _retrieve_worker_type_from_pool(work_pool_name) 

290 

291 if worker_type == "prefect-agent": 

292 exit_with_error( 

293 "'prefect-agent' typed work pools work with Prefect Agents instead of" 

294 " Workers. Please use the 'prefect agent start' to start a Prefect Agent." 

295 ) 

296 

297 if install_policy == InstallPolicy.ALWAYS: 

298 package = await _find_package_for_worker_type(worker_type) 

299 if package: 

300 await _install_package(package, upgrade=True) 

301 worker_cls = _load_worker_class(worker_type) 

302 

303 worker_cls = _load_worker_class(worker_type) 

304 

305 if worker_cls is None: 

306 package = await _find_package_for_worker_type(worker_type) 

307 # Check if the package exists 

308 if package: 

309 # Prompt to install if the package is not present 

310 if install_policy == InstallPolicy.IF_NOT_PRESENT: 

311 should_install = True 

312 

313 # Confirm with the user for installation in an interactive session 

314 elif install_policy == InstallPolicy.PROMPT and is_interactive(): 

315 message = ( 

316 "Could not find the Prefect integration library for the" 

317 f" {worker_type} worker in the current environment." 

318 " Install the library now?" 

319 ) 

320 should_install = confirm(message, default=True) 

321 

322 # If none of the conditions met, don't install the package 

323 else: 

324 should_install = False 

325 

326 # If should_install is True, install the package 

327 if should_install: 

328 await _install_package(package) 

329 worker_cls = _load_worker_class(worker_type) 

330 

331 return worker_cls