Coverage for /usr/local/lib/python3.12/site-packages/prefect/runner/runner.py: 13%
652 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Runners are responsible for managing the execution of all deployments.
4When creating a deployment using either `flow.serve` or the `serve` utility,
5they also will poll for scheduled runs.
7Example:
8 ```python
9 import time
10 from prefect import flow, serve
13 @flow
14 def slow_flow(sleep: int = 60):
15 "Sleepy flow - sleeps the provided amount of time (in seconds)."
16 time.sleep(sleep)
19 @flow
20 def fast_flow():
21 "Fastest flow this side of the Mississippi."
22 return
25 if __name__ == "__main__":
26 slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
27 fast_deploy = fast_flow.to_deployment(name="fast")
29 # serve generates a Runner instance
30 serve(slow_deploy, fast_deploy)
31 ```
33"""
35from __future__ import annotations 1a
37import asyncio 1a
38import datetime 1a
39import inspect 1a
40import logging 1a
41import multiprocessing.context 1a
42import os 1a
43import shlex 1a
44import shutil 1a
45import signal 1a
46import subprocess 1a
47import sys 1a
48import tempfile 1a
49import threading 1a
50import uuid 1a
51from contextlib import AsyncExitStack 1a
52from copy import deepcopy 1a
53from functools import partial 1a
54from pathlib import Path 1a
55from typing import ( 1a
56 TYPE_CHECKING,
57 Any,
58 Callable,
59 Coroutine,
60 Dict,
61 Iterable,
62 List,
63 Optional,
64 TypedDict,
65 Union,
66)
67from uuid import UUID, uuid4 1a
69import anyio 1a
70import anyio.abc 1a
71import anyio.to_thread 1a
72from cachetools import LRUCache 1a
73from typing_extensions import Self 1a
75from prefect._experimental.bundles import ( 1a
76 SerializedBundle,
77 execute_bundle_in_subprocess,
78 extract_flow_from_bundle,
79)
80from prefect._internal.concurrency.api import ( 1a
81 create_call,
82 from_async,
83 from_sync,
84)
85from prefect.client.orchestration import PrefectClient, get_client 1a
86from prefect.client.schemas.objects import ( 1a
87 ConcurrencyLimitConfig,
88 State,
89 StateType,
90)
91from prefect.client.schemas.objects import Flow as APIFlow 1a
92from prefect.events import DeploymentTriggerTypes, TriggerTypes 1a
93from prefect.events.clients import EventsClient, get_events_client 1a
94from prefect.events.related import tags_as_related_resources 1a
95from prefect.events.schemas.events import Event, RelatedResource, Resource 1a
96from prefect.exceptions import Abort, ObjectNotFound 1a
97from prefect.flow_engine import run_flow_in_subprocess 1a
98from prefect.flows import Flow, FlowStateHook, load_flow_from_flow_run 1a
99from prefect.logging.loggers import PrefectLogAdapter, flow_run_logger, get_logger 1a
100from prefect.runner._observers import FlowRunCancellingObserver 1a
101from prefect.runner.storage import RunnerStorage 1a
102from prefect.schedules import Schedule 1a
103from prefect.settings import ( 1a
104 PREFECT_API_URL,
105 PREFECT_RUNNER_SERVER_ENABLE,
106 get_current_settings,
107)
108from prefect.states import ( 1a
109 AwaitingRetry,
110 Crashed,
111 Pending,
112 exception_to_failed_state,
113)
114from prefect.types._datetime import now 1a
115from prefect.types.entrypoint import EntrypointType 1a
116from prefect.utilities.annotations import NotSet 1a
117from prefect.utilities.asyncutils import ( 1a
118 asyncnullcontext,
119 is_async_fn,
120 sync_compatible,
121)
122from prefect.utilities.engine import propose_state, propose_state_sync 1a
123from prefect.utilities.processutils import ( 1a
124 get_sys_executable,
125 run_process,
126)
127from prefect.utilities.services import ( 1a
128 critical_service_loop,
129 start_client_metrics_server,
130)
131from prefect.utilities.slugify import slugify 1a
133if TYPE_CHECKING: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true1a
134 import concurrent.futures
136 from prefect.client.schemas.objects import FlowRun
137 from prefect.client.schemas.responses import DeploymentResponse
138 from prefect.client.types.flexible_schedule_list import FlexibleScheduleList
139 from prefect.deployments.runner import RunnerDeployment
141__all__ = ["Runner"] 1a
144class ProcessMapEntry(TypedDict): 1a
145 flow_run: "FlowRun" 1a
146 pid: int 1a
149class Runner: 1a
150 def __init__( 1a
151 self,
152 name: Optional[str] = None,
153 query_seconds: Optional[float] = None,
154 prefetch_seconds: float = 10,
155 heartbeat_seconds: Optional[float] = None,
156 limit: int | type[NotSet] | None = NotSet,
157 pause_on_shutdown: bool = True,
158 webserver: bool = False,
159 ):
160 """
161 Responsible for managing the execution of remotely initiated flow runs.
163 Args:
164 name: The name of the runner. If not provided, a random one
165 will be generated. If provided, it cannot contain '/' or '%'.
166 query_seconds: The number of seconds to wait between querying for
167 scheduled flow runs; defaults to `PREFECT_RUNNER_POLL_FREQUENCY`
168 prefetch_seconds: The number of seconds to prefetch flow runs for.
169 heartbeat_seconds: The number of seconds to wait between emitting
170 flow run heartbeats. The runner will not emit heartbeats if the value is None.
171 Defaults to `PREFECT_RUNNER_HEARTBEAT_FREQUENCY`.
172 limit: The maximum number of flow runs this runner should be running at. Provide `None` for no limit.
173 If not provided, the runner will use the value of `PREFECT_RUNNER_PROCESS_LIMIT`.
174 pause_on_shutdown: A boolean for whether or not to automatically pause
175 deployment schedules on shutdown; defaults to `True`
176 webserver: a boolean flag for whether to start a webserver for this runner
178 Examples:
179 Set up a Runner to manage the execute of scheduled flow runs for two flows:
180 ```python
181 import asyncio
182 from prefect import flow, Runner
184 @flow
185 def hello_flow(name):
186 print(f"hello {name}")
188 @flow
189 def goodbye_flow(name):
190 print(f"goodbye {name}")
192 if __name__ == "__main__"
193 runner = Runner(name="my-runner")
195 # Will be runnable via the API
196 runner.add_flow(hello_flow)
198 # Run on a cron schedule
199 runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})
201 asyncio.run(runner.start())
202 ```
203 """
204 settings = get_current_settings()
206 if name and ("/" in name or "%" in name):
207 raise ValueError("Runner name cannot contain '/' or '%'")
208 self.name: str = Path(name).stem if name is not None else f"runner-{uuid4()}"
209 self._logger: "logging.Logger" = get_logger("runner")
211 self.started: bool = False
212 self.stopping: bool = False
213 self.pause_on_shutdown: bool = pause_on_shutdown
214 self.limit: int | None = (
215 settings.runner.process_limit
216 if limit is NotSet or isinstance(limit, type)
217 else limit
218 )
219 self.webserver: bool = webserver
221 self.query_seconds: float = query_seconds or settings.runner.poll_frequency
222 self._prefetch_seconds: float = prefetch_seconds
223 self.heartbeat_seconds: float | None = (
224 heartbeat_seconds or settings.runner.heartbeat_frequency
225 )
226 if self.heartbeat_seconds is not None and self.heartbeat_seconds < 30:
227 raise ValueError("Heartbeat must be 30 seconds or greater.")
228 self._heartbeat_task: asyncio.Task[None] | None = None
229 self._events_client: EventsClient = get_events_client(checkpoint_every=1)
231 self._exit_stack = AsyncExitStack()
232 self._limiter: anyio.CapacityLimiter | None = None
233 self._cancelling_observer: FlowRunCancellingObserver | None = None
234 self._client: PrefectClient = get_client()
235 self._submitting_flow_run_ids: set[UUID] = set()
236 self._cancelling_flow_run_ids: set[UUID] = set()
237 self._scheduled_task_scopes: set[anyio.abc.CancelScope] = set()
238 self._deployment_ids: set[UUID] = set()
239 self._flow_run_process_map: dict[UUID, ProcessMapEntry] = dict()
240 self.__flow_run_process_map_lock: asyncio.Lock | None = None
241 self._flow_run_bundle_map: dict[UUID, SerializedBundle] = dict()
242 # Flip to True when we are rescheduling flow runs to avoid marking flow runs as crashed
243 self._rescheduling: bool = False
245 self._tmp_dir: Path = (
246 Path(tempfile.gettempdir()) / "runner_storage" / str(uuid4())
247 )
248 self._storage_objs: list[RunnerStorage] = []
249 self._deployment_storage_map: dict[UUID, RunnerStorage] = {}
251 self._loop: Optional[asyncio.AbstractEventLoop] = None
253 # Caching
254 self._deployment_cache: LRUCache[UUID, "DeploymentResponse"] = LRUCache(
255 maxsize=100
256 )
257 self._flow_cache: LRUCache[UUID, "APIFlow"] = LRUCache(maxsize=100)
259 # Keep track of added flows so we can run them directly in a subprocess
260 self._deployment_flow_map: dict[UUID, "Flow[Any, Any]"] = dict()
262 @property 1a
263 def _flow_run_process_map_lock(self) -> asyncio.Lock: 1a
264 if self.__flow_run_process_map_lock is None:
265 self.__flow_run_process_map_lock = asyncio.Lock()
266 return self.__flow_run_process_map_lock
268 async def _add_flow_run_process_map_entry( 1a
269 self, flow_run_id: UUID, process_map_entry: ProcessMapEntry
270 ):
271 async with self._flow_run_process_map_lock:
272 self._flow_run_process_map[flow_run_id] = process_map_entry
274 if TYPE_CHECKING:
275 assert self._cancelling_observer is not None
276 self._cancelling_observer.add_in_flight_flow_run_id(flow_run_id)
278 async def _remove_flow_run_process_map_entry(self, flow_run_id: UUID): 1a
279 async with self._flow_run_process_map_lock:
280 self._flow_run_process_map.pop(flow_run_id, None)
282 if TYPE_CHECKING:
283 assert self._cancelling_observer is not None
284 self._cancelling_observer.remove_in_flight_flow_run_id(flow_run_id)
286 @sync_compatible 1a
287 async def add_deployment( 1a
288 self,
289 deployment: "RunnerDeployment",
290 ) -> UUID:
291 """
292 Registers the deployment with the Prefect API and will monitor for work once
293 the runner is started.
295 Args:
296 deployment: A deployment for the runner to register.
297 """
298 apply_coro = deployment.apply()
299 if TYPE_CHECKING:
300 assert inspect.isawaitable(apply_coro)
301 deployment_id = await apply_coro
302 storage = deployment.storage
303 if storage is not None:
304 add_storage_coro = self._add_storage(storage)
305 if TYPE_CHECKING:
306 assert inspect.isawaitable(add_storage_coro)
307 storage = await add_storage_coro
308 self._deployment_storage_map[deployment_id] = storage
309 self._deployment_ids.add(deployment_id)
311 return deployment_id
313 @sync_compatible 1a
314 async def add_flow( 1a
315 self,
316 flow: Flow[Any, Any],
317 name: Optional[str] = None,
318 interval: Optional[
319 Union[
320 Iterable[Union[int, float, datetime.timedelta]],
321 int,
322 float,
323 datetime.timedelta,
324 ]
325 ] = None,
326 cron: Optional[Union[Iterable[str], str]] = None,
327 rrule: Optional[Union[Iterable[str], str]] = None,
328 paused: Optional[bool] = None,
329 schedule: Optional[Schedule] = None,
330 schedules: Optional["FlexibleScheduleList"] = None,
331 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
332 parameters: Optional[dict[str, Any]] = None,
333 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
334 description: Optional[str] = None,
335 tags: Optional[List[str]] = None,
336 version: Optional[str] = None,
337 enforce_parameter_schema: bool = True,
338 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH,
339 ) -> UUID:
340 """
341 Provides a flow to the runner to be run based on the provided configuration.
343 Will create a deployment for the provided flow and register the deployment
344 with the runner.
346 Args:
347 flow: A flow for the runner to run.
348 name: The name to give the created deployment. Will default to the name
349 of the runner.
350 interval: An interval on which to execute the current flow. Accepts either a number
351 or a timedelta object. If a number is given, it will be interpreted as seconds.
352 cron: A cron schedule of when to execute runs of this flow.
353 rrule: An rrule schedule of when to execute runs of this flow.
354 paused: Whether or not to set the created deployment as paused.
355 schedule: A schedule object defining when to execute runs of this deployment.
356 Used to provide additional scheduling options like `timezone` or `parameters`.
357 schedules: A list of schedule objects defining when to execute runs of this flow.
358 Used to define multiple schedules or additional scheduling options like `timezone`.
359 concurrency_limit: The maximum number of concurrent runs of this flow to allow.
360 triggers: A list of triggers that should kick of a run of this flow.
361 parameters: A dictionary of default parameter values to pass to runs of this flow.
362 description: A description for the created deployment. Defaults to the flow's
363 description if not provided.
364 tags: A list of tags to associate with the created deployment for organizational
365 purposes.
366 version: A version for the created deployment. Defaults to the flow's version.
367 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path
368 entrypoint, ensure that the module will be importable in the execution environment.
369 """
370 api = PREFECT_API_URL.value()
371 if any([interval, cron, rrule, schedule, schedules]) and not api:
372 self._logger.warning(
373 "Cannot schedule flows on an ephemeral server; run `prefect server"
374 " start` to start the scheduler."
375 )
376 name = self.name if name is None else name
378 to_deployment_coro = flow.to_deployment(
379 name=name,
380 interval=interval,
381 cron=cron,
382 rrule=rrule,
383 schedule=schedule,
384 schedules=schedules,
385 paused=paused,
386 triggers=triggers,
387 parameters=parameters,
388 description=description,
389 tags=tags,
390 version=version,
391 enforce_parameter_schema=enforce_parameter_schema,
392 entrypoint_type=entrypoint_type,
393 concurrency_limit=concurrency_limit,
394 )
395 if TYPE_CHECKING:
396 assert inspect.isawaitable(to_deployment_coro)
397 deployment = await to_deployment_coro
399 add_deployment_coro = self.add_deployment(deployment)
400 if TYPE_CHECKING:
401 assert inspect.isawaitable(add_deployment_coro)
402 deployment_id = await add_deployment_coro
404 # Only add the flow to the map if it is not loaded from storage
405 # Further work is needed to support directly running flows created using `flow.from_source`
406 if not getattr(flow, "_storage", None):
407 self._deployment_flow_map[deployment_id] = flow
408 return deployment_id
410 @sync_compatible 1a
411 async def _add_storage(self, storage: RunnerStorage) -> RunnerStorage: 1a
412 """
413 Adds a storage object to the runner. The storage object will be used to pull
414 code to the runner's working directory before the runner starts.
416 Args:
417 storage: The storage object to add to the runner.
418 Returns:
419 The updated storage object that was added to the runner.
420 """
421 if storage not in self._storage_objs:
422 storage_copy = deepcopy(storage)
423 storage_copy.set_base_path(self._tmp_dir)
425 self._logger.debug(
426 f"Adding storage {storage_copy!r} to runner at"
427 f" {str(storage_copy.destination)!r}"
428 )
429 self._storage_objs.append(storage_copy)
431 return storage_copy
432 else:
433 return next(s for s in self._storage_objs if s == storage)
435 def handle_sigterm(self, *args: Any, **kwargs: Any) -> None: 1a
436 """
437 Gracefully shuts down the runner when a SIGTERM is received.
438 """
439 self._logger.info("SIGTERM received, initiating graceful shutdown...")
440 from_sync.call_in_loop_thread(create_call(self.stop))
442 sys.exit(0)
444 async def start( 1a
445 self, run_once: bool = False, webserver: Optional[bool] = None
446 ) -> None:
447 """
448 Starts a runner.
450 The runner will begin monitoring for and executing any scheduled work for all added flows.
452 Args:
453 run_once: If True, the runner will through one query loop and then exit.
454 webserver: a boolean for whether to start a webserver for this runner. If provided,
455 overrides the default on the runner
457 Examples:
458 Initialize a Runner, add two flows, and serve them by starting the Runner:
460 ```python
461 import asyncio
462 from prefect import flow, Runner
464 @flow
465 def hello_flow(name):
466 print(f"hello {name}")
468 @flow
469 def goodbye_flow(name):
470 print(f"goodbye {name}")
472 if __name__ == "__main__"
473 runner = Runner(name="my-runner")
475 # Will be runnable via the API
476 runner.add_flow(hello_flow)
478 # Run on a cron schedule
479 runner.add_flow(goodbye_flow, schedule={"cron": "0 * * * *"})
481 asyncio.run(runner.start())
482 ```
483 """
484 from prefect.runner.server import start_webserver
486 if threading.current_thread() is threading.main_thread():
487 signal.signal(signal.SIGTERM, self.handle_sigterm)
489 webserver = webserver if webserver is not None else self.webserver
491 if webserver or PREFECT_RUNNER_SERVER_ENABLE.value():
492 # we'll start the ASGI server in a separate thread so that
493 # uvicorn does not block the main thread
494 server_thread = threading.Thread(
495 name="runner-server-thread",
496 target=partial(
497 start_webserver,
498 runner=self,
499 ),
500 daemon=True,
501 )
502 server_thread.start()
504 start_client_metrics_server()
506 async with self as runner:
507 # This task group isn't included in the exit stack because we want to
508 # stay in this function until the runner is told to stop
509 async with self._loops_task_group as loops_task_group:
510 for storage in self._storage_objs:
511 if storage.pull_interval:
512 loops_task_group.start_soon(
513 partial(
514 critical_service_loop,
515 workload=storage.pull_code,
516 interval=storage.pull_interval,
517 run_once=run_once,
518 jitter_range=0.3,
519 )
520 )
521 else:
522 loops_task_group.start_soon(storage.pull_code)
523 loops_task_group.start_soon(
524 partial(
525 critical_service_loop,
526 workload=runner._get_and_submit_flow_runs,
527 interval=self.query_seconds,
528 run_once=run_once,
529 jitter_range=0.3,
530 )
531 )
533 def execute_in_background( 1a
534 self, func: Callable[..., Any], *args: Any, **kwargs: Any
535 ) -> "concurrent.futures.Future[Any]":
536 """
537 Executes a function in the background.
538 """
539 if TYPE_CHECKING:
540 assert self._loop is not None
542 return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self._loop)
544 async def cancel_all(self) -> None: 1a
545 runs_to_cancel: list["FlowRun"] = []
547 # done to avoid dictionary size changing during iteration
548 for info in self._flow_run_process_map.values():
549 runs_to_cancel.append(info["flow_run"])
550 if runs_to_cancel:
551 for run in runs_to_cancel:
552 try:
553 await self._cancel_run(run, state_msg="Runner is shutting down.")
554 except Exception:
555 self._logger.exception(
556 f"Exception encountered while cancelling {run.id}",
557 exc_info=True,
558 )
560 @sync_compatible 1a
561 async def stop(self): 1a
562 """Stops the runner's polling cycle."""
563 if not self.started:
564 raise RuntimeError(
565 "Runner has not yet started. Please start the runner by calling"
566 " .start()"
567 )
569 self.started = False
570 self.stopping = True
571 await self.cancel_all()
572 try:
573 self._loops_task_group.cancel_scope.cancel()
574 except Exception:
575 self._logger.exception(
576 "Exception encountered while shutting down", exc_info=True
577 )
579 async def execute_flow_run( 1a
580 self,
581 flow_run_id: UUID,
582 entrypoint: str | None = None,
583 command: str | None = None,
584 cwd: Path | str | None = None,
585 env: dict[str, str | None] | None = None,
586 task_status: anyio.abc.TaskStatus[int] = anyio.TASK_STATUS_IGNORED,
587 stream_output: bool = True,
588 ) -> anyio.abc.Process | multiprocessing.context.SpawnProcess | None:
589 """
590 Executes a single flow run with the given ID.
592 Execution will wait to monitor for cancellation requests. Exits once
593 the flow run process has exited.
595 Returns:
596 The flow run process.
597 """
598 self.pause_on_shutdown = False
599 context = self if not self.started else asyncnullcontext()
601 async with context:
602 if not self._acquire_limit_slot(flow_run_id):
603 return
605 self._submitting_flow_run_ids.add(flow_run_id)
606 flow_run = await self._client.read_flow_run(flow_run_id)
608 process: (
609 anyio.abc.Process | multiprocessing.context.SpawnProcess | Exception
610 ) = await self._runs_task_group.start(
611 partial(
612 self._submit_run_and_capture_errors,
613 flow_run=flow_run,
614 entrypoint=entrypoint,
615 command=command,
616 cwd=cwd,
617 env=env,
618 stream_output=stream_output,
619 ),
620 )
621 if isinstance(process, Exception):
622 return
624 if process.pid is None:
625 raise RuntimeError("Process has no PID")
627 task_status.started(process.pid)
629 if self.heartbeat_seconds is not None:
630 await self._emit_flow_run_heartbeat(flow_run)
632 # Only add the process to the map if it is still running
633 # The process may be a multiprocessing.context.SpawnProcess, in which case it will have an `exitcode`` attribute
634 # but no `returncode` attribute
635 if (
636 getattr(process, "returncode", None)
637 or getattr(process, "exitcode", None)
638 ) is None:
639 await self._add_flow_run_process_map_entry(
640 flow_run.id, ProcessMapEntry(pid=process.pid, flow_run=flow_run)
641 )
643 while True:
644 # Wait until flow run execution is complete and the process has been removed from the map
645 await anyio.sleep(0.1)
646 if self._flow_run_process_map.get(flow_run.id) is None:
647 break
649 return process
651 async def execute_bundle( 1a
652 self,
653 bundle: SerializedBundle,
654 cwd: Path | str | None = None,
655 env: dict[str, str | None] | None = None,
656 ) -> None:
657 """
658 Executes a bundle in a subprocess.
659 """
660 from prefect.client.schemas.objects import FlowRun
662 self.pause_on_shutdown = False
663 context = self if not self.started else asyncnullcontext()
665 flow_run = FlowRun.model_validate(bundle["flow_run"])
667 async with context:
668 if not self._acquire_limit_slot(flow_run.id):
669 return
671 process = execute_bundle_in_subprocess(bundle, cwd=cwd, env=env)
673 if process.pid is None:
674 # This shouldn't happen because `execute_bundle_in_subprocess` starts the process
675 # but we'll handle it gracefully anyway
676 msg = "Failed to start process for flow execution. No PID returned."
677 await self._propose_crashed_state(flow_run, msg)
678 raise RuntimeError(msg)
680 if self.heartbeat_seconds is not None:
681 await self._emit_flow_run_heartbeat(flow_run)
683 await self._add_flow_run_process_map_entry(
684 flow_run.id, ProcessMapEntry(pid=process.pid, flow_run=flow_run)
685 )
686 self._flow_run_bundle_map[flow_run.id] = bundle
688 await anyio.to_thread.run_sync(process.join)
690 await self._remove_flow_run_process_map_entry(flow_run.id)
692 flow_run_logger = self._get_flow_run_logger(flow_run)
693 if process.exitcode is None:
694 raise RuntimeError("Process has no exit code")
696 if process.exitcode:
697 help_message = None
698 level = logging.ERROR
699 if process.exitcode == -9:
700 level = logging.INFO
701 help_message = (
702 "This indicates that the process exited due to a SIGKILL signal. "
703 "Typically, this is either caused by manual cancellation or "
704 "high memory usage causing the operating system to "
705 "terminate the process."
706 )
707 if process.exitcode == -15:
708 level = logging.INFO
709 help_message = (
710 "This indicates that the process exited due to a SIGTERM signal. "
711 "Typically, this is caused by manual cancellation."
712 )
713 elif process.exitcode == 247:
714 help_message = (
715 "This indicates that the process was terminated due to high "
716 "memory usage."
717 )
718 elif (
719 sys.platform == "win32"
720 and process.exitcode == STATUS_CONTROL_C_EXIT
721 ):
722 level = logging.INFO
723 help_message = (
724 "Process was terminated due to a Ctrl+C or Ctrl+Break signal. "
725 "Typically, this is caused by manual cancellation."
726 )
728 flow_run_logger.log(
729 level,
730 f"Process for flow run {flow_run.name!r} exited with status code:"
731 f" {process.exitcode}"
732 + (f"; {help_message}" if help_message else ""),
733 )
734 terminal_state = await self._propose_crashed_state(
735 flow_run, help_message or "Process exited with non-zero exit code"
736 )
737 if terminal_state:
738 await self._run_on_crashed_hooks(
739 flow_run=flow_run, state=terminal_state
740 )
741 else:
742 flow_run_logger.info(
743 f"Process for flow run {flow_run.name!r} exited cleanly."
744 )
746 def _get_flow_run_logger(self, flow_run: "FlowRun") -> PrefectLogAdapter: 1a
747 return flow_run_logger(flow_run=flow_run).getChild(
748 "runner",
749 extra={
750 "runner_name": self.name,
751 },
752 )
754 async def _run_process( 1a
755 self,
756 flow_run: "FlowRun",
757 task_status: anyio.abc.TaskStatus[
758 anyio.abc.Process | multiprocessing.context.SpawnProcess
759 ] = anyio.TASK_STATUS_IGNORED,
760 entrypoint: str | None = None,
761 command: str | None = None,
762 cwd: Path | str | None = None,
763 env: dict[str, str | None] | None = None,
764 stream_output: bool = True,
765 ) -> int | None:
766 """
767 Runs the given flow run in a subprocess.
769 Args:
770 flow_run: Flow run to execute via process. The ID of this flow run
771 is stored in the PREFECT__FLOW_RUN_ID environment variable to
772 allow the engine to retrieve the corresponding flow's code and
773 begin execution.
774 task_status: anyio task status used to send a message to the caller
775 than the flow run process has started.
776 """
777 # If we have an instance of the flow for this deployment, run it directly in a subprocess
778 if flow_run.deployment_id is not None:
779 flow = self._deployment_flow_map.get(flow_run.deployment_id)
780 if flow:
781 process = run_flow_in_subprocess(flow, flow_run=flow_run)
782 task_status.started(process)
783 await anyio.to_thread.run_sync(process.join)
784 return process.exitcode
786 # Otherwise, we'll need to run a `python -m prefect.engine` command to load and run the flow
787 if command is None:
788 runner_command = [get_sys_executable(), "-m", "prefect.engine"]
789 else:
790 runner_command = shlex.split(command, posix=(os.name != "nt"))
792 flow_run_logger = self._get_flow_run_logger(flow_run)
794 # We must add creationflags to a dict so it is only passed as a function
795 # parameter on Windows, because the presence of creationflags causes
796 # errors on Unix even if set to None
797 kwargs: Dict[str, object] = {}
798 if sys.platform == "win32":
799 kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
801 flow_run_logger.info("Opening process...")
803 if env is None:
804 env = {}
805 env.update(get_current_settings().to_environment_variables(exclude_unset=True))
806 env.update(
807 {
808 **{
809 "PREFECT__FLOW_RUN_ID": str(flow_run.id),
810 "PREFECT__STORAGE_BASE_PATH": str(self._tmp_dir),
811 "PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS": "false",
812 },
813 **({"PREFECT__FLOW_ENTRYPOINT": entrypoint} if entrypoint else {}),
814 }
815 )
816 env.update(**os.environ) # is this really necessary??
818 storage = (
819 self._deployment_storage_map.get(flow_run.deployment_id)
820 if flow_run.deployment_id
821 else None
822 )
823 if storage and storage.pull_interval:
824 # perform an adhoc pull of code before running the flow if an
825 # adhoc pull hasn't been performed in the last pull_interval
826 # TODO: Explore integrating this behavior with global concurrency.
827 last_adhoc_pull = getattr(storage, "last_adhoc_pull", None)
828 if (
829 last_adhoc_pull is None
830 or last_adhoc_pull
831 < datetime.datetime.now()
832 - datetime.timedelta(seconds=storage.pull_interval)
833 ):
834 self._logger.debug(
835 "Performing adhoc pull of code for flow run %s with storage %r",
836 flow_run.id,
837 storage,
838 )
839 await storage.pull_code()
840 setattr(storage, "last_adhoc_pull", datetime.datetime.now())
842 process = await run_process(
843 command=runner_command,
844 stream_output=stream_output,
845 task_status=task_status,
846 task_status_handler=lambda process: process,
847 env=env,
848 cwd=storage.destination if storage else cwd,
849 **kwargs,
850 )
852 return process.returncode
854 async def _kill_process( 1a
855 self,
856 pid: int,
857 grace_seconds: int = 30,
858 ):
859 """
860 Kills a given flow run process.
862 Args:
863 pid: ID of the process to kill
864 grace_seconds: Number of seconds to wait for the process to end.
865 """
866 # In a non-windows environment first send a SIGTERM, then, after
867 # `grace_seconds` seconds have passed subsequent send SIGKILL. In
868 # Windows we use CTRL_BREAK_EVENT as SIGTERM is useless:
869 # https://bugs.python.org/issue26350
870 if sys.platform == "win32":
871 try:
872 os.kill(pid, signal.CTRL_BREAK_EVENT)
873 except (ProcessLookupError, WindowsError):
874 raise RuntimeError(
875 f"Unable to kill process {pid!r}: The process was not found."
876 )
877 else:
878 try:
879 os.kill(pid, signal.SIGTERM)
880 except ProcessLookupError:
881 raise RuntimeError(
882 f"Unable to kill process {pid!r}: The process was not found."
883 )
885 # Throttle how often we check if the process is still alive to keep
886 # from making too many system calls in a short period of time.
887 check_interval = max(grace_seconds / 10, 1)
889 with anyio.move_on_after(grace_seconds):
890 while True:
891 await anyio.sleep(check_interval)
893 # Detect if the process is still alive. If not do an early
894 # return as the process respected the SIGTERM from above.
895 try:
896 os.kill(pid, 0)
897 except ProcessLookupError:
898 return
900 try:
901 os.kill(pid, signal.SIGKILL)
902 except OSError:
903 # We shouldn't ever end up here, but it's possible that the
904 # process ended right after the check above.
905 return
907 def reschedule_current_flow_runs( 1a
908 self,
909 ) -> None:
910 """
911 Reschedules all flow runs that are currently running.
913 This should only be called when the runner is shutting down because it kill all
914 child processes and short-circuit the crash detection logic.
915 """
916 self._rescheduling = True
917 # Create a new sync client because this will often run in a separate thread
918 # as part of a signal handler.
919 with get_client(sync_client=True) as client:
920 self._logger.info("Rescheduling flow runs...")
921 for process_info in self._flow_run_process_map.values():
922 flow_run = process_info["flow_run"]
923 run_logger = self._get_flow_run_logger(flow_run)
924 run_logger.info(
925 "Rescheduling flow run for resubmission in response to SIGTERM"
926 )
927 try:
928 propose_state_sync(client, AwaitingRetry(), flow_run_id=flow_run.id)
929 os.kill(process_info["pid"], signal.SIGTERM)
930 run_logger.info("Rescheduled flow run for resubmission")
931 except ProcessLookupError:
932 # Process may have already exited
933 pass
934 except Abort as exc:
935 run_logger.info(
936 (
937 "Aborted submission of flow run. "
938 f"Server sent an abort signal: {exc}"
939 ),
940 )
941 except Exception:
942 run_logger.exception(
943 "Failed to reschedule flow run",
944 )
946 async def _pause_schedules(self): 1a
947 """
948 Pauses all deployment schedules.
949 """
950 self._logger.info("Pausing all deployments...")
951 for deployment_id in self._deployment_ids:
952 await self._client.pause_deployment(deployment_id)
953 self._logger.debug(f"Paused deployment '{deployment_id}'")
955 self._logger.info("All deployments have been paused!")
957 async def _get_and_submit_flow_runs(self): 1a
958 if self.stopping:
959 return
960 runs_response = await self._get_scheduled_flow_runs()
961 self.last_polled: datetime.datetime = now("UTC")
962 return await self._submit_scheduled_flow_runs(flow_run_response=runs_response)
964 async def _cancel_run( 1a
965 self, flow_run: "FlowRun | uuid.UUID", state_msg: Optional[str] = None
966 ):
967 if isinstance(flow_run, uuid.UUID):
968 flow_run = await self._client.read_flow_run(flow_run)
969 run_logger = self._get_flow_run_logger(flow_run)
971 process_map_entry = self._flow_run_process_map.get(flow_run.id)
973 pid = process_map_entry.get("pid") if process_map_entry else None
974 if not pid:
975 self._logger.debug(
976 "Received cancellation request for flow run %s but no process was found.",
977 flow_run.id,
978 )
979 return
981 try:
982 await self._kill_process(pid)
983 except RuntimeError as exc:
984 self._logger.warning(f"{exc} Marking flow run as cancelled.")
985 if flow_run.state:
986 await self._run_on_cancellation_hooks(flow_run, flow_run.state)
987 await self._mark_flow_run_as_cancelled(flow_run)
988 except Exception:
989 run_logger.exception(
990 "Encountered exception while killing process for flow run "
991 f"'{flow_run.id}'. Flow run may not be cancelled."
992 )
993 # We will try again on generic exceptions
994 self._cancelling_flow_run_ids.remove(flow_run.id)
995 else:
996 if flow_run.state:
997 await self._run_on_cancellation_hooks(flow_run, flow_run.state)
998 await self._mark_flow_run_as_cancelled(
999 flow_run,
1000 state_updates={
1001 "message": state_msg or "Flow run was cancelled successfully."
1002 },
1003 )
1005 flow, deployment = await self._get_flow_and_deployment(flow_run)
1006 await self._emit_flow_run_cancelled_event(
1007 flow_run=flow_run, flow=flow, deployment=deployment
1008 )
1009 run_logger.info(f"Cancelled flow run '{flow_run.name}'!")
1011 async def _get_flow_and_deployment( 1a
1012 self, flow_run: "FlowRun"
1013 ) -> tuple[Optional["APIFlow"], Optional["DeploymentResponse"]]:
1014 deployment: Optional["DeploymentResponse"] = (
1015 self._deployment_cache.get(flow_run.deployment_id)
1016 if flow_run.deployment_id
1017 else None
1018 )
1019 flow: Optional["APIFlow"] = self._flow_cache.get(flow_run.flow_id)
1020 if not deployment and flow_run.deployment_id is not None:
1021 try:
1022 deployment = await self._client.read_deployment(flow_run.deployment_id)
1023 self._deployment_cache[flow_run.deployment_id] = deployment
1024 except ObjectNotFound:
1025 deployment = None
1026 if not flow:
1027 try:
1028 flow = await self._client.read_flow(flow_run.flow_id)
1029 self._flow_cache[flow_run.flow_id] = flow
1030 except ObjectNotFound:
1031 flow = None
1032 return flow, deployment
1034 async def _emit_flow_run_heartbeats(self): 1a
1035 coros: list[Coroutine[Any, Any, Any]] = []
1036 for entry in self._flow_run_process_map.values():
1037 coros.append(self._emit_flow_run_heartbeat(entry["flow_run"]))
1038 await asyncio.gather(*coros)
1040 async def _emit_flow_run_heartbeat(self, flow_run: "FlowRun"): 1a
1041 from prefect import __version__
1043 related: list[RelatedResource] = []
1044 tags: list[str] = []
1046 flow, deployment = await self._get_flow_and_deployment(flow_run)
1047 if deployment:
1048 related.append(deployment.as_related_resource())
1049 tags.extend(deployment.tags)
1050 if flow:
1051 related.append(
1052 RelatedResource(
1053 {
1054 "prefect.resource.id": f"prefect.flow.{flow.id}",
1055 "prefect.resource.role": "flow",
1056 "prefect.resource.name": flow.name,
1057 }
1058 )
1059 )
1060 tags.extend(flow_run.tags)
1062 related = [RelatedResource.model_validate(r) for r in related]
1063 related += tags_as_related_resources(set(tags))
1065 await self._events_client.emit(
1066 Event(
1067 event="prefect.flow-run.heartbeat",
1068 resource=Resource(
1069 {
1070 "prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
1071 "prefect.resource.name": flow_run.name,
1072 "prefect.version": __version__,
1073 }
1074 ),
1075 related=related,
1076 )
1077 )
1079 def _event_resource(self): 1a
1080 from prefect import __version__
1082 return {
1083 "prefect.resource.id": f"prefect.runner.{slugify(self.name)}",
1084 "prefect.resource.name": self.name,
1085 "prefect.version": __version__,
1086 }
1088 async def _emit_flow_run_cancelled_event( 1a
1089 self,
1090 flow_run: "FlowRun",
1091 flow: "Optional[APIFlow]",
1092 deployment: "Optional[DeploymentResponse]",
1093 ):
1094 related: list[RelatedResource] = []
1095 tags: list[str] = []
1096 if deployment:
1097 related.append(deployment.as_related_resource())
1098 tags.extend(deployment.tags)
1099 if flow:
1100 related.append(
1101 RelatedResource(
1102 {
1103 "prefect.resource.id": f"prefect.flow.{flow.id}",
1104 "prefect.resource.role": "flow",
1105 "prefect.resource.name": flow.name,
1106 }
1107 )
1108 )
1109 related.append(
1110 RelatedResource(
1111 {
1112 "prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
1113 "prefect.resource.role": "flow-run",
1114 "prefect.resource.name": flow_run.name,
1115 }
1116 )
1117 )
1118 tags.extend(flow_run.tags)
1120 related = [RelatedResource.model_validate(r) for r in related]
1121 related += tags_as_related_resources(set(tags))
1123 await self._events_client.emit(
1124 Event(
1125 event="prefect.runner.cancelled-flow-run",
1126 resource=Resource(self._event_resource()),
1127 related=related,
1128 )
1129 )
1130 self._logger.debug(f"Emitted flow run heartbeat event for {flow_run.id}")
1132 async def _get_scheduled_flow_runs( 1a
1133 self,
1134 ) -> list["FlowRun"]:
1135 """
1136 Retrieve scheduled flow runs for this runner.
1137 """
1138 scheduled_before = now("UTC") + datetime.timedelta(
1139 seconds=int(self._prefetch_seconds)
1140 )
1141 self._logger.debug(
1142 f"Querying for flow runs scheduled before {scheduled_before}"
1143 )
1145 scheduled_flow_runs = (
1146 await self._client.get_scheduled_flow_runs_for_deployments(
1147 deployment_ids=list(self._deployment_ids),
1148 scheduled_before=scheduled_before,
1149 )
1150 )
1151 self._logger.debug(f"Discovered {len(scheduled_flow_runs)} scheduled_flow_runs")
1152 return scheduled_flow_runs
1154 def has_slots_available(self) -> bool: 1a
1155 """
1156 Determine if the flow run limit has been reached.
1158 Returns:
1159 - bool: True if the limit has not been reached, False otherwise.
1160 """
1161 if not self._limiter:
1162 return False
1163 return self._limiter.available_tokens > 0
1165 def _acquire_limit_slot(self, flow_run_id: UUID) -> bool: 1a
1166 """
1167 Enforces flow run limit set on runner.
1169 Returns:
1170 - bool: True if a slot was acquired, False otherwise.
1171 """
1172 try:
1173 if self._limiter:
1174 self._limiter.acquire_on_behalf_of_nowait(flow_run_id)
1175 self._logger.debug("Limit slot acquired for flow run '%s'", flow_run_id)
1176 return True
1177 except RuntimeError as exc:
1178 if (
1179 "this borrower is already holding one of this CapacityLimiter's tokens"
1180 in str(exc)
1181 ):
1182 self._logger.warning(
1183 f"Duplicate submission of flow run '{flow_run_id}' detected. Runner"
1184 " will not re-submit flow run."
1185 )
1186 return False
1187 else:
1188 raise
1189 except anyio.WouldBlock:
1190 if TYPE_CHECKING:
1191 assert self._limiter is not None
1192 self._logger.debug(
1193 f"Flow run limit reached; {self._limiter.borrowed_tokens} flow runs"
1194 " in progress. You can control this limit by adjusting the "
1195 "PREFECT_RUNNER_PROCESS_LIMIT setting."
1196 )
1197 return False
1199 def _release_limit_slot(self, flow_run_id: UUID) -> None: 1a
1200 """
1201 Frees up a slot taken by the given flow run id.
1202 """
1203 if self._limiter:
1204 self._limiter.release_on_behalf_of(flow_run_id)
1205 self._logger.debug("Limit slot released for flow run '%s'", flow_run_id)
1207 async def _submit_scheduled_flow_runs( 1a
1208 self,
1209 flow_run_response: list["FlowRun"],
1210 entrypoints: list[str] | None = None,
1211 ) -> list["FlowRun"]:
1212 """
1213 Takes a list of FlowRuns and submits the referenced flow runs
1214 for execution by the runner.
1215 """
1216 submittable_flow_runs = sorted(
1217 flow_run_response,
1218 key=lambda run: run.next_scheduled_start_time or datetime.datetime.max,
1219 )
1221 for i, flow_run in enumerate(submittable_flow_runs):
1222 if flow_run.id in self._submitting_flow_run_ids:
1223 continue
1225 if self._acquire_limit_slot(flow_run.id):
1226 run_logger = self._get_flow_run_logger(flow_run)
1227 run_logger.info(
1228 f"Runner '{self.name}' submitting flow run '{flow_run.id}'"
1229 )
1230 self._submitting_flow_run_ids.add(flow_run.id)
1231 self._runs_task_group.start_soon(
1232 partial(
1233 self._submit_run,
1234 flow_run=flow_run,
1235 entrypoint=(
1236 entrypoints[i] if entrypoints else None
1237 ), # TODO: avoid relying on index
1238 )
1239 )
1240 else:
1241 break
1243 return list(
1244 filter(
1245 lambda run: run.id in self._submitting_flow_run_ids,
1246 submittable_flow_runs,
1247 )
1248 )
1250 async def _submit_run(self, flow_run: "FlowRun", entrypoint: Optional[str] = None): 1a
1251 """
1252 Submits a given flow run for execution by the runner.
1253 """
1254 run_logger = self._get_flow_run_logger(flow_run)
1256 ready_to_submit = await self._propose_pending_state(flow_run)
1258 if ready_to_submit:
1259 readiness_result: (
1260 anyio.abc.Process | Exception
1261 ) = await self._runs_task_group.start(
1262 partial(
1263 self._submit_run_and_capture_errors,
1264 flow_run=flow_run,
1265 entrypoint=entrypoint,
1266 ),
1267 )
1269 if readiness_result and not isinstance(readiness_result, Exception):
1270 await self._add_flow_run_process_map_entry(
1271 flow_run.id,
1272 ProcessMapEntry(pid=readiness_result.pid, flow_run=flow_run),
1273 )
1274 # Heartbeats are opt-in and only emitted if a heartbeat frequency is set
1275 if self.heartbeat_seconds is not None:
1276 await self._emit_flow_run_heartbeat(flow_run)
1278 run_logger.info(f"Completed submission of flow run '{flow_run.id}'")
1279 else:
1280 # If the run is not ready to submit, release the concurrency slot
1281 self._release_limit_slot(flow_run.id)
1283 self._submitting_flow_run_ids.discard(flow_run.id)
1285 async def _submit_run_and_capture_errors( 1a
1286 self,
1287 flow_run: "FlowRun",
1288 task_status: anyio.abc.TaskStatus[
1289 anyio.abc.Process | multiprocessing.context.SpawnProcess | Exception
1290 ],
1291 entrypoint: str | None = None,
1292 command: str | None = None,
1293 cwd: Path | str | None = None,
1294 env: dict[str, str | None] | None = None,
1295 stream_output: bool = True,
1296 ) -> Union[Optional[int], Exception]:
1297 run_logger = self._get_flow_run_logger(flow_run)
1299 try:
1300 exit_code = await self._run_process(
1301 flow_run=flow_run,
1302 task_status=task_status,
1303 entrypoint=entrypoint,
1304 command=command,
1305 cwd=cwd,
1306 env=env,
1307 stream_output=stream_output,
1308 )
1309 flow_run_logger = self._get_flow_run_logger(flow_run)
1310 if exit_code:
1311 help_message = None
1312 level = logging.ERROR
1313 if exit_code == -9:
1314 level = logging.INFO
1315 help_message = (
1316 "This indicates that the process exited due to a SIGKILL signal. "
1317 "Typically, this is either caused by manual cancellation or "
1318 "high memory usage causing the operating system to "
1319 "terminate the process."
1320 )
1321 if exit_code == -15:
1322 level = logging.INFO
1323 help_message = (
1324 "This indicates that the process exited due to a SIGTERM signal. "
1325 "Typically, this is caused by manual cancellation."
1326 )
1327 elif exit_code == 247:
1328 help_message = (
1329 "This indicates that the process was terminated due to high "
1330 "memory usage."
1331 )
1332 elif sys.platform == "win32" and exit_code == STATUS_CONTROL_C_EXIT:
1333 level = logging.INFO
1334 help_message = (
1335 "Process was terminated due to a Ctrl+C or Ctrl+Break signal. "
1336 "Typically, this is caused by manual cancellation."
1337 )
1339 flow_run_logger.log(
1340 level,
1341 f"Process for flow run {flow_run.name!r} exited with status code:"
1342 f" {exit_code}" + (f"; {help_message}" if help_message else ""),
1343 )
1344 else:
1345 flow_run_logger.info(
1346 f"Process for flow run {flow_run.name!r} exited cleanly."
1347 )
1348 except Exception as exc:
1349 if not task_status._future.done(): # type: ignore
1350 # This flow run was being submitted and did not start successfully
1351 run_logger.exception(
1352 f"Failed to start process for flow run '{flow_run.id}'."
1353 )
1354 # Mark the task as started to prevent runner crash
1355 task_status.started(exc)
1356 message = f"Flow run process could not be started:\n{exc!r}"
1357 await self._propose_crashed_state(flow_run, message)
1358 else:
1359 run_logger.exception(
1360 f"An error occurred while monitoring flow run '{flow_run.id}'. "
1361 "The flow run will not be marked as failed, but an issue may have "
1362 "occurred."
1363 )
1364 return exc
1365 finally:
1366 self._release_limit_slot(flow_run.id)
1368 await self._remove_flow_run_process_map_entry(flow_run.id)
1370 if exit_code != 0 and not self._rescheduling:
1371 await self._propose_crashed_state(
1372 flow_run,
1373 f"Flow run process exited with non-zero status code {exit_code}.",
1374 )
1376 try:
1377 api_flow_run = await self._client.read_flow_run(flow_run_id=flow_run.id)
1378 terminal_state = api_flow_run.state
1379 if terminal_state and terminal_state.is_crashed():
1380 await self._run_on_crashed_hooks(
1381 flow_run=flow_run, state=terminal_state
1382 )
1383 except ObjectNotFound:
1384 # Flow run was deleted - log it but don't crash the runner
1385 run_logger = self._get_flow_run_logger(flow_run)
1386 run_logger.debug(
1387 f"Flow run '{flow_run.id}' was deleted before final state could be checked"
1388 )
1390 return exit_code
1392 async def _propose_pending_state(self, flow_run: "FlowRun") -> bool: 1a
1393 run_logger = self._get_flow_run_logger(flow_run)
1394 state = flow_run.state
1395 try:
1396 state = await propose_state(
1397 self._client, Pending(), flow_run_id=flow_run.id
1398 )
1399 except Abort as exc:
1400 run_logger.info(
1401 (
1402 f"Aborted submission of flow run '{flow_run.id}'. "
1403 f"Server sent an abort signal: {exc}"
1404 ),
1405 )
1406 return False
1407 except Exception:
1408 run_logger.exception(
1409 f"Failed to update state of flow run '{flow_run.id}'",
1410 )
1411 return False
1413 if not state.is_pending():
1414 run_logger.info(
1415 (
1416 f"Aborted submission of flow run '{flow_run.id}': "
1417 f"Server returned a non-pending state {state.type.value!r}"
1418 ),
1419 )
1420 return False
1422 return True
1424 async def _propose_failed_state(self, flow_run: "FlowRun", exc: Exception) -> None: 1a
1425 run_logger = self._get_flow_run_logger(flow_run)
1426 try:
1427 await propose_state(
1428 self._client,
1429 await exception_to_failed_state(message="Submission failed.", exc=exc),
1430 flow_run_id=flow_run.id,
1431 )
1432 except Abort:
1433 # We've already failed, no need to note the abort but we don't want it to
1434 # raise in the agent process
1435 pass
1436 except Exception:
1437 run_logger.error(
1438 f"Failed to update state of flow run '{flow_run.id}'",
1439 exc_info=True,
1440 )
1442 async def _propose_crashed_state( 1a
1443 self, flow_run: "FlowRun", message: str
1444 ) -> State[Any] | None:
1445 run_logger = self._get_flow_run_logger(flow_run)
1446 state = None
1447 try:
1448 state = await propose_state(
1449 self._client,
1450 Crashed(message=message),
1451 flow_run_id=flow_run.id,
1452 )
1453 except Abort:
1454 # Flow run already marked as failed
1455 pass
1456 except ObjectNotFound:
1457 # Flow run was deleted - log it but don't crash the runner
1458 run_logger.debug(
1459 f"Flow run '{flow_run.id}' was deleted before state could be updated"
1460 )
1461 except Exception:
1462 run_logger.exception(f"Failed to update state of flow run '{flow_run.id}'")
1463 else:
1464 if state.is_crashed():
1465 run_logger.info(
1466 f"Reported flow run '{flow_run.id}' as crashed: {message}"
1467 )
1468 return state
1470 async def _mark_flow_run_as_cancelled( 1a
1471 self, flow_run: "FlowRun", state_updates: Optional[dict[str, Any]] = None
1472 ) -> None:
1473 state_updates = state_updates or {}
1474 state_updates.setdefault("name", "Cancelled")
1475 state_updates.setdefault("type", StateType.CANCELLED)
1476 state = (
1477 flow_run.state.model_copy(update=state_updates) if flow_run.state else None
1478 )
1479 if not state:
1480 self._logger.warning(
1481 f"Could not find state for flow run {flow_run.id} and cancellation cannot be guaranteed."
1482 )
1483 return
1485 try:
1486 await self._client.set_flow_run_state(flow_run.id, state, force=True)
1487 except ObjectNotFound:
1488 # Flow run was deleted - log it but don't crash the runner
1489 run_logger = self._get_flow_run_logger(flow_run)
1490 run_logger.debug(
1491 f"Flow run '{flow_run.id}' was deleted before it could be marked as cancelled"
1492 )
1494 async def _run_on_cancellation_hooks( 1a
1495 self,
1496 flow_run: "FlowRun",
1497 state: State,
1498 ) -> None:
1499 """
1500 Run the hooks for a flow.
1501 """
1502 run_logger = self._get_flow_run_logger(flow_run)
1503 if state.is_cancelling():
1504 try:
1505 if flow_run.id in self._flow_run_bundle_map:
1506 flow = extract_flow_from_bundle(
1507 self._flow_run_bundle_map[flow_run.id]
1508 )
1509 elif flow_run.deployment_id and self._deployment_flow_map.get(
1510 flow_run.deployment_id
1511 ):
1512 flow = self._deployment_flow_map[flow_run.deployment_id]
1513 else:
1514 run_logger.info("Loading flow to check for on_cancellation hooks")
1515 flow = await load_flow_from_flow_run(
1516 flow_run, storage_base_path=str(self._tmp_dir)
1517 )
1518 hooks = flow.on_cancellation_hooks or []
1520 await _run_hooks(hooks, flow_run, flow, state)
1521 except Exception:
1522 run_logger.warning(
1523 f"Runner failed to retrieve flow to execute on_cancellation hooks for flow run {flow_run.id!r}.",
1524 exc_info=True,
1525 )
1527 async def _run_on_crashed_hooks( 1a
1528 self,
1529 flow_run: "FlowRun",
1530 state: State,
1531 ) -> None:
1532 """
1533 Run the hooks for a flow.
1534 """
1535 run_logger = self._get_flow_run_logger(flow_run)
1536 if state.is_crashed():
1537 try:
1538 if flow_run.id in self._flow_run_bundle_map:
1539 flow = extract_flow_from_bundle(
1540 self._flow_run_bundle_map[flow_run.id]
1541 )
1542 elif flow_run.deployment_id and self._deployment_flow_map.get(
1543 flow_run.deployment_id
1544 ):
1545 flow = self._deployment_flow_map[flow_run.deployment_id]
1546 else:
1547 run_logger.info("Loading flow to check for on_crashed hooks")
1548 flow = await load_flow_from_flow_run(
1549 flow_run, storage_base_path=str(self._tmp_dir)
1550 )
1551 hooks = flow.on_crashed_hooks or []
1553 await _run_hooks(hooks, flow_run, flow, state)
1554 except Exception:
1555 run_logger.warning(
1556 f"Runner failed to retrieve flow to execute on_crashed hooks for flow run {flow_run.id!r}.",
1557 exc_info=True,
1558 )
1560 async def __aenter__(self) -> Self: 1a
1561 self._logger.debug("Starting runner...")
1562 self._client = get_client()
1563 # Be tolerant to concurrent/duplicate initialization attempts
1564 self._tmp_dir.mkdir(parents=True, exist_ok=True)
1566 self._limiter = anyio.CapacityLimiter(self.limit) if self.limit else None
1568 if not hasattr(self, "_loop") or not self._loop:
1569 self._loop = asyncio.get_event_loop()
1571 self._cancelling_observer = await self._exit_stack.enter_async_context(
1572 FlowRunCancellingObserver(
1573 on_cancelling=lambda flow_run_id: self._runs_task_group.start_soon(
1574 self._cancel_run, flow_run_id
1575 ),
1576 polling_interval=self.query_seconds,
1577 )
1578 )
1579 await self._exit_stack.enter_async_context(self._client)
1580 await self._exit_stack.enter_async_context(self._events_client)
1582 if not hasattr(self, "_runs_task_group") or not self._runs_task_group:
1583 self._runs_task_group: anyio.abc.TaskGroup = anyio.create_task_group()
1584 await self._exit_stack.enter_async_context(self._runs_task_group)
1586 if not hasattr(self, "_loops_task_group") or not self._loops_task_group:
1587 self._loops_task_group: anyio.abc.TaskGroup = anyio.create_task_group()
1589 if self.heartbeat_seconds is not None:
1590 self._heartbeat_task = asyncio.create_task(
1591 critical_service_loop(
1592 workload=self._emit_flow_run_heartbeats,
1593 interval=self.heartbeat_seconds,
1594 jitter_range=0.3,
1595 )
1596 )
1598 self.started = True
1599 return self
1601 async def __aexit__(self, *exc_info: Any) -> None: 1a
1602 self._logger.debug("Stopping runner...")
1603 if self.pause_on_shutdown:
1604 await self._pause_schedules()
1605 self.started = False
1607 for scope in self._scheduled_task_scopes:
1608 scope.cancel()
1610 await self._exit_stack.__aexit__(*exc_info)
1612 # Be tolerant to already-removed temp directories
1613 shutil.rmtree(str(self._tmp_dir), ignore_errors=True)
1614 del self._runs_task_group, self._loops_task_group
1616 if self._heartbeat_task:
1617 self._heartbeat_task.cancel()
1618 try:
1619 await self._heartbeat_task
1620 except asyncio.CancelledError:
1621 pass
1623 def __repr__(self) -> str: 1a
1624 return f"Runner(name={self.name!r})"
1627if sys.platform == "win32": 1627 ↛ 1629line 1627 didn't jump to line 1629 because the condition on line 1627 was never true1a
1628 # exit code indicating that the process was terminated by Ctrl+C or Ctrl+Break
1629 STATUS_CONTROL_C_EXIT = 0xC000013A
1632async def _run_hooks( 1a
1633 hooks: list[FlowStateHook[Any, Any]],
1634 flow_run: "FlowRun",
1635 flow: "Flow[..., Any]",
1636 state: State,
1637):
1638 logger = flow_run_logger(flow_run, flow)
1639 for hook in hooks:
1640 try:
1641 logger.info(
1642 f"Running hook {hook.__name__!r} in response to entering state"
1643 f" {state.name!r}"
1644 )
1645 if is_async_fn(hook):
1646 await hook(flow=flow, flow_run=flow_run, state=state)
1647 else:
1648 await from_async.call_in_new_thread(
1649 create_call(hook, flow=flow, flow_run=flow_run, state=state)
1650 )
1651 except Exception:
1652 logger.error(
1653 f"An error was encountered while running hook {hook.__name__!r}",
1654 exc_info=True,
1655 )
1656 else:
1657 logger.info(f"Hook {hook.__name__!r} finished running successfully")