Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/cancellation_cleanup.py: 37%

77 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2The CancellationCleanup service. Responsible for cancelling tasks and subflows that haven't finished. 

3""" 

4 

5import asyncio 1a

6import datetime 1a

7from typing import Any, Optional 1a

8from uuid import UUID 1a

9 

10import sqlalchemy as sa 1a

11from sqlalchemy.sql.expression import or_ 1a

12 

13import prefect.server.models as models 1a

14from prefect.server.database import PrefectDBInterface, orm_models 1a

15from prefect.server.database.dependencies import db_injector 1a

16from prefect.server.schemas import filters, states 1a

17from prefect.server.services.base import LoopService 1a

18from prefect.settings import PREFECT_API_SERVICES_CANCELLATION_CLEANUP_LOOP_SECONDS 1a

19from prefect.settings.context import get_current_settings 1a

20from prefect.settings.models.server.services import ServicesBaseSetting 1a

21from prefect.types._datetime import now 1a

22 

23NON_TERMINAL_STATES = list(set(states.StateType) - states.TERMINAL_STATES) 1a

24 

25 

26class CancellationCleanup(LoopService): 1a

27 """ 

28 Cancels tasks and subflows of flow runs that have been cancelled 

29 """ 

30 

31 @classmethod 1a

32 def service_settings(cls) -> ServicesBaseSetting: 1a

33 return get_current_settings().server.services.cancellation_cleanup 1b

34 

35 def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any): 1a

36 super().__init__( 1b

37 loop_seconds=loop_seconds 

38 or PREFECT_API_SERVICES_CANCELLATION_CLEANUP_LOOP_SECONDS.value(), 

39 **kwargs, 

40 ) 

41 

42 # query for this many runs to mark failed at once 

43 self.batch_size = 200 1b

44 

45 @db_injector 1a

46 async def run_once(self, db: PrefectDBInterface) -> None: 1a

47 """ 

48 - cancels active tasks belonging to recently cancelled flow runs 

49 - cancels any active subflow that belongs to a cancelled flow 

50 """ 

51 # cancels active tasks belonging to recently cancelled flow runs 

52 await self.clean_up_cancelled_flow_run_task_runs(db) 1b

53 

54 # cancels any active subflow run that belongs to a cancelled flow run 

55 await self.clean_up_cancelled_subflow_runs(db) 

56 

57 self.logger.info("Finished cleaning up cancelled flow runs.") 

58 

59 async def clean_up_cancelled_flow_run_task_runs( 1a

60 self, db: PrefectDBInterface 

61 ) -> None: 

62 high_water_mark = UUID(int=0) 1b

63 while True: 1b

64 cancelled_flow_query = ( 1b

65 sa.select(db.FlowRun) 

66 .where( 

67 db.FlowRun.state_type == states.StateType.CANCELLED, 

68 db.FlowRun.end_time.is_not(None), 

69 db.FlowRun.end_time >= (now("UTC") - datetime.timedelta(days=1)), 

70 db.FlowRun.id > high_water_mark, 

71 ) 

72 .order_by(db.FlowRun.id) 

73 .limit(self.batch_size) 

74 ) 

75 

76 async with db.session_context() as session: 1b

77 flow_run_result = await session.execute(cancelled_flow_query) 1b

78 flow_runs = flow_run_result.scalars().all() 

79 

80 for run in flow_runs: 

81 await self._cancel_child_runs(db=db, flow_run=run) 

82 high_water_mark = run.id 

83 

84 # if no relevant flows were found, exit the loop 

85 if len(flow_runs) < self.batch_size: 

86 break 

87 

88 async def clean_up_cancelled_subflow_runs(self, db: PrefectDBInterface) -> None: 1a

89 high_water_mark = UUID(int=0) 

90 while True: 

91 # Performance optimization: Load only required columns while maintaining ORM functionality 

92 # Required columns: 

93 # - id: for high water mark tracking 

94 # - state_type: for state validation 

95 # - parent_task_run_id: for parent task run checks 

96 # - deployment_id: for determining cancellation state type 

97 subflow_query = ( 

98 sa.select(db.FlowRun) 

99 .options( 

100 sa.orm.load_only( 

101 db.FlowRun.id, 

102 db.FlowRun.state_type, 

103 db.FlowRun.parent_task_run_id, 

104 db.FlowRun.deployment_id, 

105 ), 

106 ) 

107 .where( 

108 or_( 

109 db.FlowRun.state_type == states.StateType.PENDING, 

110 db.FlowRun.state_type == states.StateType.SCHEDULED, 

111 db.FlowRun.state_type == states.StateType.RUNNING, 

112 db.FlowRun.state_type == states.StateType.PAUSED, 

113 db.FlowRun.state_type == states.StateType.CANCELLING, 

114 ), 

115 db.FlowRun.id > high_water_mark, 

116 db.FlowRun.parent_task_run_id.is_not(None), 

117 ) 

118 .order_by(db.FlowRun.id) 

119 .limit(self.batch_size) 

120 ) 

121 

122 async with db.session_context() as session: 

123 subflow_run_result = await session.execute(subflow_query) 

124 subflow_runs = subflow_run_result.scalars().all() 

125 

126 for subflow_run in subflow_runs: 

127 await self._cancel_subflow(db=db, flow_run=subflow_run) 

128 high_water_mark = max(high_water_mark, subflow_run.id) 

129 

130 # if no relevant flows were found, exit the loop 

131 if len(subflow_runs) < self.batch_size: 

132 break 

133 

134 async def _cancel_child_runs( 1a

135 self, db: PrefectDBInterface, flow_run: orm_models.FlowRun 

136 ) -> None: 

137 async with db.session_context() as session: 

138 child_task_runs = await models.task_runs.read_task_runs( 

139 session, 

140 flow_run_filter=filters.FlowRunFilter( 

141 id=filters.FlowRunFilterId(any_=[flow_run.id]) 

142 ), 

143 task_run_filter=filters.TaskRunFilter( 

144 state=filters.TaskRunFilterState( 

145 type=filters.TaskRunFilterStateType(any_=NON_TERMINAL_STATES) 

146 ) 

147 ), 

148 limit=100, 

149 ) 

150 

151 for task_run in child_task_runs: 

152 async with db.session_context(begin_transaction=True) as session: 

153 await models.task_runs.set_task_run_state( 

154 session=session, 

155 task_run_id=task_run.id, 

156 state=states.Cancelled( 

157 message="The parent flow run was cancelled." 

158 ), 

159 force=True, 

160 ) 

161 

162 async def _cancel_subflow( 1a

163 self, db: PrefectDBInterface, flow_run: orm_models.FlowRun 

164 ) -> Optional[bool]: 

165 if not flow_run.parent_task_run_id or not flow_run.state: 

166 return False 

167 

168 if flow_run.state.type in states.TERMINAL_STATES: 

169 return False 

170 

171 async with db.session_context() as session: 

172 parent_task_run = await models.task_runs.read_task_run( 

173 session, task_run_id=flow_run.parent_task_run_id 

174 ) 

175 

176 if not parent_task_run or not parent_task_run.flow_run_id: 

177 # Global orchestration policy will prevent further orchestration 

178 return False 

179 

180 containing_flow_run = await models.flow_runs.read_flow_run( 

181 session, flow_run_id=parent_task_run.flow_run_id 

182 ) 

183 

184 if ( 

185 containing_flow_run 

186 and containing_flow_run.state 

187 and containing_flow_run.state.type != states.StateType.CANCELLED 

188 ): 

189 # Nothing to do here; the parent is not cancelled 

190 return False 

191 

192 if flow_run.deployment_id: 

193 state = states.Cancelling(message="The parent flow run was cancelled.") 

194 else: 

195 state = states.Cancelled(message="The parent flow run was cancelled.") 

196 

197 async with db.session_context(begin_transaction=True) as session: 

198 await models.flow_runs.set_flow_run_state( 

199 session=session, 

200 flow_run_id=flow_run.id, 

201 state=state, 

202 ) 

203 

204 

205if __name__ == "__main__": 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true1a

206 asyncio.run(CancellationCleanup(handle_signals=True).start())