Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/worker.py: 24%
122 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import asyncio 1a
2import json 1a
3import os 1a
4from enum import Enum 1a
5from typing import List, Optional, Type 1a
7import typer 1a
9from prefect._internal.installation import ainstall_packages 1a
10from prefect._internal.integrations import KNOWN_EXTRAS_FOR_PACKAGES 1a
11from prefect.cli._prompts import confirm 1a
12from prefect.cli._types import PrefectTyper, SettingsOption 1a
13from prefect.cli._utilities import exit_with_error 1a
14from prefect.cli.root import app, is_interactive 1a
15from prefect.client.collections import get_collections_metadata_client 1a
16from prefect.client.orchestration import get_client 1a
17from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName 1a
18from prefect.exceptions import ObjectNotFound 1a
19from prefect.plugins import load_prefect_collections 1a
20from prefect.settings import ( 1a
21 PREFECT_WORKER_HEARTBEAT_SECONDS,
22 PREFECT_WORKER_PREFETCH_SECONDS,
23)
24from prefect.utilities.dispatch import lookup_type 1a
25from prefect.utilities.processutils import ( 1a
26 setup_signal_handlers_worker,
27)
28from prefect.workers.base import BaseWorker 1a
30worker_app: PrefectTyper = PrefectTyper( 1a
31 name="worker", help="Start and interact with workers."
32)
33app.add_typer(worker_app) 1a
36class InstallPolicy(str, Enum): 1a
37 ALWAYS = "always" 1a
38 IF_NOT_PRESENT = "if-not-present" 1a
39 NEVER = "never" 1a
40 PROMPT = "prompt" 1a
43@worker_app.command() 1a
44async def start( 1a
45 worker_name: str = typer.Option(
46 None,
47 "-n",
48 "--name",
49 help=(
50 "The name to give to the started worker. If not provided, a unique name"
51 " will be generated."
52 ),
53 ),
54 work_pool_name: str = typer.Option(
55 ...,
56 "-p",
57 "--pool",
58 help="The work pool the started worker should poll.",
59 prompt=True,
60 ),
61 work_queues: List[str] = typer.Option(
62 None,
63 "-q",
64 "--work-queue",
65 help=(
66 "One or more work queue names for the worker to pull from. If not provided,"
67 " the worker will pull from all work queues in the work pool."
68 ),
69 ),
70 worker_type: Optional[str] = typer.Option(
71 None,
72 "-t",
73 "--type",
74 help=(
75 "The type of worker to start. If not provided, the worker type will be"
76 " inferred from the work pool."
77 ),
78 ),
79 prefetch_seconds: int = SettingsOption(
80 PREFECT_WORKER_PREFETCH_SECONDS,
81 help="Number of seconds to look into the future for scheduled flow runs.",
82 ),
83 run_once: bool = typer.Option(
84 False, help="Only run worker polling once. By default, the worker runs forever."
85 ),
86 limit: int = typer.Option(
87 None,
88 "-l",
89 "--limit",
90 help="Maximum number of flow runs to execute concurrently.",
91 ),
92 with_healthcheck: bool = typer.Option(
93 False, help="Start a healthcheck server for the worker."
94 ),
95 install_policy: InstallPolicy = typer.Option(
96 InstallPolicy.PROMPT.value,
97 "--install-policy",
98 help="Install policy to use workers from Prefect integration packages.",
99 case_sensitive=False,
100 ),
101 base_job_template: typer.FileText = typer.Option(
102 None,
103 "--base-job-template",
104 help=(
105 "The path to a JSON file containing the base job template to use. If"
106 " unspecified, Prefect will use the default base job template for the given"
107 " worker type. If the work pool already exists, this will be ignored."
108 ),
109 ),
110):
111 """
112 Start a worker process to poll a work pool for flow runs.
113 """
115 is_paused = await _check_work_pool_paused(work_pool_name)
116 if is_paused:
117 app.console.print(
118 (
119 f"The work pool {work_pool_name!r} is currently paused. This worker"
120 " will not execute any flow runs until the work pool is unpaused."
121 ),
122 style="yellow",
123 )
125 is_queues_paused = await _check_work_queues_paused(
126 work_pool_name,
127 work_queues,
128 )
129 if is_queues_paused:
130 queue_scope = (
131 "All work queues" if not work_queues else "Specified work queue(s)"
132 )
133 app.console.print(
134 (
135 f"{queue_scope} in the work pool {work_pool_name!r} are currently"
136 " paused. This worker will not execute any flow runs until the work"
137 " queues are unpaused."
138 ),
139 style="yellow",
140 )
141 worker_cls = await _get_worker_class(worker_type, work_pool_name, install_policy)
143 if worker_cls is None:
144 exit_with_error(
145 "Unable to start worker. Please ensure you have the necessary dependencies"
146 " installed to run your desired worker type."
147 )
149 worker_process_id = os.getpid()
150 setup_signal_handlers_worker(
151 worker_process_id, f"the {worker_type} worker", app.console.print
152 )
154 template_contents = None
155 if base_job_template is not None:
156 template_contents = json.load(fp=base_job_template)
158 worker = worker_cls(
159 name=worker_name,
160 work_pool_name=work_pool_name,
161 work_queues=work_queues,
162 limit=limit,
163 prefetch_seconds=prefetch_seconds,
164 heartbeat_interval_seconds=int(PREFECT_WORKER_HEARTBEAT_SECONDS.value()),
165 base_job_template=template_contents,
166 )
167 try:
168 await worker.start(
169 run_once=run_once,
170 with_healthcheck=with_healthcheck,
171 printer=app.console.print,
172 )
173 except asyncio.CancelledError:
174 app.console.print(f"Worker {worker.name!r} stopped!", style="yellow")
177async def _check_work_pool_paused(work_pool_name: str) -> bool: 1a
178 try:
179 async with get_client() as client:
180 work_pool = await client.read_work_pool(work_pool_name=work_pool_name)
181 return work_pool.is_paused
182 except ObjectNotFound:
183 return False
186async def _check_work_queues_paused( 1a
187 work_pool_name: str, work_queues: Optional[List[str]]
188) -> bool:
189 """
190 Check if all work queues in the work pool are paused. If work queues are specified,
191 only those work queues are checked.
193 Args:
194 - work_pool_name (str): the name of the work pool to check
195 - work_queues (Optional[List[str]]): the names of the work queues to check
197 Returns:
198 - bool: True if work queues are paused, False otherwise
199 """
200 try:
201 work_queues_filter = (
202 WorkQueueFilter(name=WorkQueueFilterName(any_=work_queues))
203 if work_queues
204 else None
205 )
206 async with get_client() as client:
207 wqs = await client.read_work_queues(
208 work_pool_name=work_pool_name, work_queue_filter=work_queues_filter
209 )
210 return all(queue.is_paused for queue in wqs) if wqs else False
211 except ObjectNotFound:
212 return False
215async def _retrieve_worker_type_from_pool(work_pool_name: Optional[str] = None) -> str: 1a
216 try:
217 async with get_client() as client:
218 work_pool = await client.read_work_pool(work_pool_name=work_pool_name)
220 worker_type = work_pool.type
221 app.console.print(
222 f"Discovered type {worker_type!r} for work pool {work_pool.name!r}."
223 )
225 if work_pool.is_push_pool or work_pool.is_managed_pool:
226 exit_with_error(
227 "Workers are not required for push work pools. "
228 "See https://docs.prefect.io/latest/deploy/infrastructure-examples/serverless "
229 "for more details."
230 )
232 except ObjectNotFound:
233 app.console.print(
234 (
235 f"Work pool {work_pool_name!r} does not exist and no worker type was"
236 " provided. Starting a process worker..."
237 ),
238 style="yellow",
239 )
240 worker_type = "process"
241 return worker_type
244def _load_worker_class(worker_type: str) -> Optional[Type[BaseWorker]]: 1a
245 try:
246 load_prefect_collections()
247 return lookup_type(BaseWorker, worker_type)
248 except KeyError:
249 return None
252async def _install_package( 1a
253 package: str, upgrade: bool = False
254) -> Optional[Type[BaseWorker]]:
255 app.console.print(f"Installing {package}...")
256 install_package = KNOWN_EXTRAS_FOR_PACKAGES.get(package, package)
257 await ainstall_packages([install_package], stream_output=True, upgrade=upgrade)
260async def _find_package_for_worker_type(worker_type: str) -> Optional[str]: 1a
261 async with get_collections_metadata_client() as client:
262 worker_metadata = await client.read_worker_metadata()
264 worker_types_with_packages = {
265 worker_type: package_name
266 for package_name, worker_dict in worker_metadata.items()
267 for worker_type in worker_dict
268 if worker_type != "prefect-agent"
269 }
270 try:
271 return worker_types_with_packages[worker_type]
272 except KeyError:
273 app.console.print(
274 f"Could not find a package for worker type {worker_type!r}.",
275 style="yellow",
276 )
277 return None
280async def _get_worker_class( 1a
281 worker_type: Optional[str] = None,
282 work_pool_name: Optional[str] = None,
283 install_policy: InstallPolicy = InstallPolicy.PROMPT,
284) -> Optional[Type[BaseWorker]]:
285 if worker_type is None and work_pool_name is None:
286 raise ValueError("Must provide either worker_type or work_pool_name.")
288 if worker_type is None:
289 worker_type = await _retrieve_worker_type_from_pool(work_pool_name)
291 if worker_type == "prefect-agent":
292 exit_with_error(
293 "'prefect-agent' typed work pools work with Prefect Agents instead of"
294 " Workers. Please use the 'prefect agent start' to start a Prefect Agent."
295 )
297 if install_policy == InstallPolicy.ALWAYS:
298 package = await _find_package_for_worker_type(worker_type)
299 if package:
300 await _install_package(package, upgrade=True)
301 worker_cls = _load_worker_class(worker_type)
303 worker_cls = _load_worker_class(worker_type)
305 if worker_cls is None:
306 package = await _find_package_for_worker_type(worker_type)
307 # Check if the package exists
308 if package:
309 # Prompt to install if the package is not present
310 if install_policy == InstallPolicy.IF_NOT_PRESENT:
311 should_install = True
313 # Confirm with the user for installation in an interactive session
314 elif install_policy == InstallPolicy.PROMPT and is_interactive():
315 message = (
316 "Could not find the Prefect integration library for the"
317 f" {worker_type} worker in the current environment."
318 " Install the library now?"
319 )
320 should_install = confirm(message, default=True)
322 # If none of the conditions met, don't install the package
323 else:
324 should_install = False
326 # If should_install is True, install the package
327 if should_install:
328 await _install_package(package)
329 worker_cls = _load_worker_class(worker_type)
331 return worker_cls