Coverage for /usr/local/lib/python3.12/site-packages/prefect/engine.py: 23%

63 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import os 1a

4import sys 1a

5from contextlib import contextmanager 1a

6from typing import TYPE_CHECKING, Any, Callable 1a

7from uuid import UUID 1a

8 

9from prefect._internal.compatibility.migration import getattr_migration 1a

10from prefect.exceptions import ( 1a

11 Abort, 

12 Pause, 

13) 

14from prefect.logging.loggers import ( 1a

15 get_logger, 

16) 

17from prefect.utilities.asyncutils import ( 1a

18 run_coro_as_sync, 

19) 

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1a

22 import logging 

23 

24 from prefect.client.schemas.objects import FlowRun 

25 from prefect.flows import Flow 

26 from prefect.logging.loggers import LoggingAdapter 

27 

28engine_logger: "logging.Logger" = get_logger("engine") 1a

29 

30 

31@contextmanager 1a

32def handle_engine_signals(flow_run_id: UUID | None = None): 1a

33 """ 

34 Handle signals from the orchestrator to abort or pause the flow run or otherwise 

35 handle unexpected exceptions. 

36 

37 This context manager will handle exiting the process depending on the signal received. 

38 

39 Args: 

40 flow_run_id: The ID of the flow run to handle signals for. 

41 

42 Example: 

43 ```python 

44 from prefect import flow 

45 from prefect.engine import handle_engine_signals 

46 from prefect.flow_engine import run_flow 

47 

48 @flow 

49 def my_flow(): 

50 print("Hello, world!") 

51 

52 with handle_engine_signals(): 

53 run_flow(my_flow) 

54 ``` 

55 """ 

56 try: 

57 yield 

58 except Abort: 

59 if flow_run_id: 

60 msg = f"Execution of flow run '{flow_run_id}' aborted by orchestrator." 

61 else: 

62 msg = "Execution aborted by orchestrator." 

63 engine_logger.info(msg) 

64 exit(0) 

65 except Pause: 

66 if flow_run_id: 

67 msg = f"Execution of flow run '{flow_run_id}' is paused." 

68 else: 

69 msg = "Execution is paused." 

70 engine_logger.info(msg) 

71 exit(0) 

72 except Exception: 

73 if flow_run_id: 

74 msg = f"Execution of flow run '{flow_run_id}' exited with unexpected exception" 

75 else: 

76 msg = "Execution exited with unexpected exception" 

77 engine_logger.error(msg, exc_info=True) 

78 exit(1) 

79 except BaseException: 

80 if flow_run_id: 

81 msg = f"Execution of flow run '{flow_run_id}' interrupted by base exception" 

82 else: 

83 msg = "Execution interrupted by base exception" 

84 engine_logger.error(msg, exc_info=True) 

85 # Let the exit code be determined by the base exception type 

86 raise 

87 

88 

89if __name__ == "__main__": 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true1a

90 try: 

91 flow_run_id: UUID = UUID( 

92 sys.argv[1] if len(sys.argv) > 1 else os.environ.get("PREFECT__FLOW_RUN_ID") 

93 ) 

94 except Exception: 

95 engine_logger.error( 

96 f"Invalid flow run id. Received arguments: {sys.argv}", exc_info=True 

97 ) 

98 exit(1) 

99 

100 with handle_engine_signals(flow_run_id): 

101 from prefect.flow_engine import ( 

102 flow_run_logger, 

103 load_flow, 

104 load_flow_run, 

105 run_flow, 

106 ) 

107 

108 flow_run: "FlowRun" = load_flow_run(flow_run_id=flow_run_id) 

109 run_logger: "LoggingAdapter" = flow_run_logger(flow_run=flow_run) 

110 

111 try: 

112 flow: "Flow[..., Any]" = load_flow(flow_run) 

113 except Exception: 

114 run_logger.error( 

115 "Unexpected exception encountered when trying to load flow", 

116 exc_info=True, 

117 ) 

118 raise 

119 

120 # run the flow 

121 if flow.isasync: 

122 run_coro_as_sync(run_flow(flow, flow_run=flow_run, error_logger=run_logger)) 

123 else: 

124 run_flow(flow, flow_run=flow_run, error_logger=run_logger) 

125 

126 

127__getattr__: Callable[[str], Any] = getattr_migration(__name__) 1a