Coverage for /usr/local/lib/python3.12/site-packages/prefect/workers/process.py: 37%
113 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Module containing the Process worker used for executing flow runs as subprocesses.
4To start a Process worker, run the following command:
6```bash
7prefect worker start --pool 'my-work-pool' --type process
8```
10Replace `my-work-pool` with the name of the work pool you want the worker
11to poll for flow runs.
13For more information about work pools and workers,
14checkout out the [Prefect docs](https://docs.prefect.io/v3/concepts/work-pools/).
15"""
17from __future__ import annotations 1a
19import contextlib 1a
20import os 1a
21import tempfile 1a
22import threading 1a
23from functools import partial 1a
24from pathlib import Path 1a
25from typing import TYPE_CHECKING, Any, Callable, Optional, TypeVar 1a
27import anyio 1a
28import anyio.abc 1a
29from pydantic import Field, field_validator 1a
31from prefect._internal.schemas.validators import validate_working_dir 1a
32from prefect.client.schemas.objects import Flow as APIFlow 1a
33from prefect.runner.runner import Runner 1a
34from prefect.settings import PREFECT_WORKER_QUERY_SECONDS 1a
35from prefect.states import Pending 1a
36from prefect.utilities.processutils import get_sys_executable 1a
37from prefect.utilities.services import ( 1a
38 critical_service_loop,
39 start_client_metrics_server,
40 stop_client_metrics_server,
41)
42from prefect.workers.base import ( 1a
43 BaseJobConfiguration,
44 BaseVariables,
45 BaseWorker,
46 BaseWorkerResult,
47)
49if TYPE_CHECKING: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true1a
50 from prefect.client.schemas.objects import FlowRun, WorkPool
51 from prefect.client.schemas.responses import DeploymentResponse
52 from prefect.flows import Flow
54FR = TypeVar("FR") # used to capture the return type of a flow 1a
57class ProcessJobConfiguration(BaseJobConfiguration): 1a
58 stream_output: bool = Field(default=True) 1a
59 working_dir: Optional[Path] = Field(default=None) 1a
61 @field_validator("working_dir") 1a
62 @classmethod 1a
63 def validate_working_dir(cls, v: Path | str | None) -> Path | None: 1a
64 if isinstance(v, str):
65 return validate_working_dir(v)
66 return v
68 def prepare_for_flow_run( 1a
69 self,
70 flow_run: "FlowRun",
71 deployment: "DeploymentResponse | None" = None,
72 flow: "APIFlow | None" = None,
73 work_pool: "WorkPool | None" = None,
74 worker_name: str | None = None,
75 ) -> None:
76 super().prepare_for_flow_run(flow_run, deployment, flow, work_pool, worker_name)
78 self.env: dict[str, str | None] = {**os.environ, **self.env}
79 self.command: str | None = (
80 f"{get_sys_executable()} -m prefect.engine"
81 if self.command == self._base_flow_run_command()
82 else self.command
83 )
85 @staticmethod 1a
86 def _base_flow_run_command() -> str: 1a
87 """
88 Override the base flow run command because enhanced cancellation doesn't
89 work with the process worker.
90 """
91 return "python -m prefect.engine"
94class ProcessVariables(BaseVariables): 1a
95 stream_output: bool = Field( 1a
96 default=True,
97 description=(
98 "If enabled, workers will stream output from flow run processes to "
99 "local standard output."
100 ),
101 )
102 working_dir: Optional[Path] = Field( 1a
103 default=None,
104 title="Working Directory",
105 description=(
106 "If provided, workers will open flow run processes within the "
107 "specified path as the working directory. Otherwise, a temporary "
108 "directory will be created."
109 ),
110 )
113class ProcessWorkerResult(BaseWorkerResult): 1a
114 """Contains information about the final state of a completed process"""
117class ProcessWorker( 1a
118 BaseWorker[ProcessJobConfiguration, ProcessVariables, ProcessWorkerResult]
119):
120 type = "process" 1a
121 job_configuration: type[ProcessJobConfiguration] = ProcessJobConfiguration 1a
122 job_configuration_variables: type[ProcessVariables] | None = ProcessVariables 1a
124 _description = ( 1a
125 "Execute flow runs as subprocesses on a worker. Works well for local execution"
126 " when first getting started."
127 )
128 _display_name = "Process" 1a
129 _documentation_url = "https://docs.prefect.io/latest/get-started/quickstart" 1a
130 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/356e6766a91baf20e1d08bbe16e8b5aaef4d8643-48x48.png" 1a
132 async def start( 1a
133 self,
134 run_once: bool = False,
135 with_healthcheck: bool = False,
136 printer: Callable[..., None] = print,
137 ) -> None:
138 """
139 Starts the worker and runs the main worker loops.
141 By default, the worker will run loops to poll for scheduled/cancelled flow
142 runs and sync with the Prefect API server.
144 If `run_once` is set, the worker will only run each loop once and then return.
146 If `with_healthcheck` is set, the worker will start a healthcheck server which
147 can be used to determine if the worker is still polling for flow runs and restart
148 the worker if necessary.
150 Args:
151 run_once: If set, the worker will only run each loop once then return.
152 with_healthcheck: If set, the worker will start a healthcheck server.
153 printer: A `print`-like function where logs will be reported.
154 """
155 healthcheck_server = None
156 healthcheck_thread = None
157 try:
158 async with self as worker:
159 # wait for an initial heartbeat to configure the worker
160 await worker.sync_with_backend()
161 # schedule the scheduled flow run polling loop
162 async with anyio.create_task_group() as loops_task_group:
163 loops_task_group.start_soon(
164 partial(
165 critical_service_loop,
166 workload=self.get_and_submit_flow_runs,
167 interval=PREFECT_WORKER_QUERY_SECONDS.value(),
168 run_once=run_once,
169 jitter_range=0.3,
170 backoff=4, # Up to ~1 minute interval during backoff
171 )
172 )
173 # schedule the sync loop
174 loops_task_group.start_soon(
175 partial(
176 critical_service_loop,
177 workload=self.sync_with_backend,
178 interval=self.heartbeat_interval_seconds,
179 run_once=run_once,
180 jitter_range=0.3,
181 backoff=4,
182 )
183 )
185 self._started_event = await self._emit_worker_started_event()
187 start_client_metrics_server()
189 if with_healthcheck:
190 from prefect.workers.server import build_healthcheck_server
192 # we'll start the ASGI server in a separate thread so that
193 # uvicorn does not block the main thread
194 healthcheck_server = build_healthcheck_server(
195 worker=worker,
196 query_interval_seconds=PREFECT_WORKER_QUERY_SECONDS.value(),
197 )
198 healthcheck_thread = threading.Thread(
199 name="healthcheck-server-thread",
200 target=healthcheck_server.run,
201 daemon=True,
202 )
203 healthcheck_thread.start()
204 printer(f"Worker {worker.name!r} started!")
206 # If running once, wait for active runs to complete before exiting
207 if run_once and self._limiter:
208 while self.limiter.borrowed_tokens > 0:
209 self._logger.debug(
210 "Waiting for %s active run(s) to finish before shutdown...",
211 self.limiter.borrowed_tokens,
212 )
213 await anyio.sleep(0.1)
214 finally:
215 stop_client_metrics_server()
217 if healthcheck_server and healthcheck_thread:
218 self._logger.debug("Stopping healthcheck server...")
219 healthcheck_server.should_exit = True
220 healthcheck_thread.join()
221 self._logger.debug("Healthcheck server stopped.")
223 printer(f"Worker {worker.name!r} stopped!")
225 async def run( 1a
226 self,
227 flow_run: "FlowRun",
228 configuration: ProcessJobConfiguration,
229 task_status: Optional[anyio.abc.TaskStatus[int]] = None,
230 ) -> ProcessWorkerResult:
231 if task_status is None:
232 task_status = anyio.TASK_STATUS_IGNORED
234 working_dir_ctx = (
235 tempfile.TemporaryDirectory(suffix="prefect")
236 if not configuration.working_dir
237 else contextlib.nullcontext(configuration.working_dir)
238 )
239 with working_dir_ctx as working_dir:
240 process = await self._runner.execute_flow_run(
241 flow_run_id=flow_run.id,
242 command=configuration.command,
243 cwd=working_dir,
244 env=configuration.env,
245 stream_output=configuration.stream_output,
246 task_status=task_status,
247 )
249 status_code = (
250 getattr(process, "returncode", None)
251 if getattr(process, "returncode", None) is not None
252 else getattr(process, "exitcode", None)
253 )
255 if process is None or status_code is None:
256 raise RuntimeError("Failed to start flow run process.")
258 return ProcessWorkerResult(status_code=status_code, identifier=str(process.pid))
260 async def _submit_adhoc_run( 1a
261 self,
262 flow: "Flow[..., FR]",
263 parameters: dict[str, Any] | None = None,
264 job_variables: dict[str, Any] | None = None,
265 task_status: anyio.abc.TaskStatus["FlowRun"] | None = None,
266 ):
267 from prefect._experimental.bundles import (
268 create_bundle_for_flow_run,
269 )
271 flow_run = await self.client.create_flow_run(
272 flow,
273 parameters=parameters,
274 state=Pending(),
275 job_variables=job_variables,
276 work_pool_name=self.work_pool.name,
277 )
278 if task_status is not None:
279 # Emit the flow run object to .submit to allow it to return a future as soon as possible
280 task_status.started(flow_run)
282 api_flow = APIFlow(id=flow_run.flow_id, name=flow.name, labels={})
283 logger = self.get_flow_run_logger(flow_run)
285 configuration = await self.job_configuration.from_template_and_values(
286 base_job_template=self.work_pool.base_job_template,
287 values=job_variables or {},
288 client=self._client,
289 )
290 configuration.prepare_for_flow_run(
291 flow_run=flow_run,
292 flow=api_flow,
293 work_pool=self.work_pool,
294 worker_name=self.name,
295 )
297 bundle = create_bundle_for_flow_run(flow=flow, flow_run=flow_run)
299 logger.debug("Executing flow run bundle in subprocess...")
300 try:
301 await self._runner.execute_bundle(
302 bundle=bundle,
303 cwd=configuration.working_dir,
304 env=configuration.env,
305 )
306 except Exception:
307 logger.exception("Error executing flow run bundle in subprocess")
308 await self._propose_crashed_state(flow_run, "Flow run execution failed")
309 finally:
310 logger.debug("Flow run bundle execution complete")
312 async def __aenter__(self) -> ProcessWorker: 1a
313 await super().__aenter__()
314 self._runner = await self._exit_stack.enter_async_context(
315 Runner(pause_on_shutdown=False, limit=None)
316 )
317 return self
319 async def __aexit__(self, *exc_info: Any) -> None: 1a
320 await super().__aexit__(*exc_info)