Coverage for /usr/local/lib/python3.12/site-packages/prefect/engine.py: 23%
63 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import os 1a
4import sys 1a
5from contextlib import contextmanager 1a
6from typing import TYPE_CHECKING, Any, Callable 1a
7from uuid import UUID 1a
9from prefect._internal.compatibility.migration import getattr_migration 1a
10from prefect.exceptions import ( 1a
11 Abort,
12 Pause,
13)
14from prefect.logging.loggers import ( 1a
15 get_logger,
16)
17from prefect.utilities.asyncutils import ( 1a
18 run_coro_as_sync,
19)
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1a
22 import logging
24 from prefect.client.schemas.objects import FlowRun
25 from prefect.flows import Flow
26 from prefect.logging.loggers import LoggingAdapter
28engine_logger: "logging.Logger" = get_logger("engine") 1a
31@contextmanager 1a
32def handle_engine_signals(flow_run_id: UUID | None = None): 1a
33 """
34 Handle signals from the orchestrator to abort or pause the flow run or otherwise
35 handle unexpected exceptions.
37 This context manager will handle exiting the process depending on the signal received.
39 Args:
40 flow_run_id: The ID of the flow run to handle signals for.
42 Example:
43 ```python
44 from prefect import flow
45 from prefect.engine import handle_engine_signals
46 from prefect.flow_engine import run_flow
48 @flow
49 def my_flow():
50 print("Hello, world!")
52 with handle_engine_signals():
53 run_flow(my_flow)
54 ```
55 """
56 try:
57 yield
58 except Abort:
59 if flow_run_id:
60 msg = f"Execution of flow run '{flow_run_id}' aborted by orchestrator."
61 else:
62 msg = "Execution aborted by orchestrator."
63 engine_logger.info(msg)
64 exit(0)
65 except Pause:
66 if flow_run_id:
67 msg = f"Execution of flow run '{flow_run_id}' is paused."
68 else:
69 msg = "Execution is paused."
70 engine_logger.info(msg)
71 exit(0)
72 except Exception:
73 if flow_run_id:
74 msg = f"Execution of flow run '{flow_run_id}' exited with unexpected exception"
75 else:
76 msg = "Execution exited with unexpected exception"
77 engine_logger.error(msg, exc_info=True)
78 exit(1)
79 except BaseException:
80 if flow_run_id:
81 msg = f"Execution of flow run '{flow_run_id}' interrupted by base exception"
82 else:
83 msg = "Execution interrupted by base exception"
84 engine_logger.error(msg, exc_info=True)
85 # Let the exit code be determined by the base exception type
86 raise
89if __name__ == "__main__": 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true1a
90 try:
91 flow_run_id: UUID = UUID(
92 sys.argv[1] if len(sys.argv) > 1 else os.environ.get("PREFECT__FLOW_RUN_ID")
93 )
94 except Exception:
95 engine_logger.error(
96 f"Invalid flow run id. Received arguments: {sys.argv}", exc_info=True
97 )
98 exit(1)
100 with handle_engine_signals(flow_run_id):
101 from prefect.flow_engine import (
102 flow_run_logger,
103 load_flow,
104 load_flow_run,
105 run_flow,
106 )
108 flow_run: "FlowRun" = load_flow_run(flow_run_id=flow_run_id)
109 run_logger: "LoggingAdapter" = flow_run_logger(flow_run=flow_run)
111 try:
112 flow: "Flow[..., Any]" = load_flow(flow_run)
113 except Exception:
114 run_logger.error(
115 "Unexpected exception encountered when trying to load flow",
116 exc_info=True,
117 )
118 raise
120 # run the flow
121 if flow.isasync:
122 run_coro_as_sync(run_flow(flow, flow_run=flow_run, error_logger=run_logger))
123 else:
124 run_flow(flow, flow_run=flow_run, error_logger=run_logger)
127__getattr__: Callable[[str], Any] = getattr_migration(__name__) 1a