Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/cancellation_cleanup.py: 37%
77 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2The CancellationCleanup service. Responsible for cancelling tasks and subflows that haven't finished.
3"""
5import asyncio 1a
6import datetime 1a
7from typing import Any, Optional 1a
8from uuid import UUID 1a
10import sqlalchemy as sa 1a
11from sqlalchemy.sql.expression import or_ 1a
13import prefect.server.models as models 1a
14from prefect.server.database import PrefectDBInterface, orm_models 1a
15from prefect.server.database.dependencies import db_injector 1a
16from prefect.server.schemas import filters, states 1a
17from prefect.server.services.base import LoopService 1a
18from prefect.settings import PREFECT_API_SERVICES_CANCELLATION_CLEANUP_LOOP_SECONDS 1a
19from prefect.settings.context import get_current_settings 1a
20from prefect.settings.models.server.services import ServicesBaseSetting 1a
21from prefect.types._datetime import now 1a
23NON_TERMINAL_STATES = list(set(states.StateType) - states.TERMINAL_STATES) 1a
26class CancellationCleanup(LoopService): 1a
27 """
28 Cancels tasks and subflows of flow runs that have been cancelled
29 """
31 @classmethod 1a
32 def service_settings(cls) -> ServicesBaseSetting: 1a
33 return get_current_settings().server.services.cancellation_cleanup 1b
35 def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any): 1a
36 super().__init__( 1b
37 loop_seconds=loop_seconds
38 or PREFECT_API_SERVICES_CANCELLATION_CLEANUP_LOOP_SECONDS.value(),
39 **kwargs,
40 )
42 # query for this many runs to mark failed at once
43 self.batch_size = 200 1b
45 @db_injector 1a
46 async def run_once(self, db: PrefectDBInterface) -> None: 1a
47 """
48 - cancels active tasks belonging to recently cancelled flow runs
49 - cancels any active subflow that belongs to a cancelled flow
50 """
51 # cancels active tasks belonging to recently cancelled flow runs
52 await self.clean_up_cancelled_flow_run_task_runs(db) 1b
54 # cancels any active subflow run that belongs to a cancelled flow run
55 await self.clean_up_cancelled_subflow_runs(db)
57 self.logger.info("Finished cleaning up cancelled flow runs.")
59 async def clean_up_cancelled_flow_run_task_runs( 1a
60 self, db: PrefectDBInterface
61 ) -> None:
62 high_water_mark = UUID(int=0) 1b
63 while True: 1b
64 cancelled_flow_query = ( 1b
65 sa.select(db.FlowRun)
66 .where(
67 db.FlowRun.state_type == states.StateType.CANCELLED,
68 db.FlowRun.end_time.is_not(None),
69 db.FlowRun.end_time >= (now("UTC") - datetime.timedelta(days=1)),
70 db.FlowRun.id > high_water_mark,
71 )
72 .order_by(db.FlowRun.id)
73 .limit(self.batch_size)
74 )
76 async with db.session_context() as session: 1b
77 flow_run_result = await session.execute(cancelled_flow_query) 1b
78 flow_runs = flow_run_result.scalars().all()
80 for run in flow_runs:
81 await self._cancel_child_runs(db=db, flow_run=run)
82 high_water_mark = run.id
84 # if no relevant flows were found, exit the loop
85 if len(flow_runs) < self.batch_size:
86 break
88 async def clean_up_cancelled_subflow_runs(self, db: PrefectDBInterface) -> None: 1a
89 high_water_mark = UUID(int=0)
90 while True:
91 # Performance optimization: Load only required columns while maintaining ORM functionality
92 # Required columns:
93 # - id: for high water mark tracking
94 # - state_type: for state validation
95 # - parent_task_run_id: for parent task run checks
96 # - deployment_id: for determining cancellation state type
97 subflow_query = (
98 sa.select(db.FlowRun)
99 .options(
100 sa.orm.load_only(
101 db.FlowRun.id,
102 db.FlowRun.state_type,
103 db.FlowRun.parent_task_run_id,
104 db.FlowRun.deployment_id,
105 ),
106 )
107 .where(
108 or_(
109 db.FlowRun.state_type == states.StateType.PENDING,
110 db.FlowRun.state_type == states.StateType.SCHEDULED,
111 db.FlowRun.state_type == states.StateType.RUNNING,
112 db.FlowRun.state_type == states.StateType.PAUSED,
113 db.FlowRun.state_type == states.StateType.CANCELLING,
114 ),
115 db.FlowRun.id > high_water_mark,
116 db.FlowRun.parent_task_run_id.is_not(None),
117 )
118 .order_by(db.FlowRun.id)
119 .limit(self.batch_size)
120 )
122 async with db.session_context() as session:
123 subflow_run_result = await session.execute(subflow_query)
124 subflow_runs = subflow_run_result.scalars().all()
126 for subflow_run in subflow_runs:
127 await self._cancel_subflow(db=db, flow_run=subflow_run)
128 high_water_mark = max(high_water_mark, subflow_run.id)
130 # if no relevant flows were found, exit the loop
131 if len(subflow_runs) < self.batch_size:
132 break
134 async def _cancel_child_runs( 1a
135 self, db: PrefectDBInterface, flow_run: orm_models.FlowRun
136 ) -> None:
137 async with db.session_context() as session:
138 child_task_runs = await models.task_runs.read_task_runs(
139 session,
140 flow_run_filter=filters.FlowRunFilter(
141 id=filters.FlowRunFilterId(any_=[flow_run.id])
142 ),
143 task_run_filter=filters.TaskRunFilter(
144 state=filters.TaskRunFilterState(
145 type=filters.TaskRunFilterStateType(any_=NON_TERMINAL_STATES)
146 )
147 ),
148 limit=100,
149 )
151 for task_run in child_task_runs:
152 async with db.session_context(begin_transaction=True) as session:
153 await models.task_runs.set_task_run_state(
154 session=session,
155 task_run_id=task_run.id,
156 state=states.Cancelled(
157 message="The parent flow run was cancelled."
158 ),
159 force=True,
160 )
162 async def _cancel_subflow( 1a
163 self, db: PrefectDBInterface, flow_run: orm_models.FlowRun
164 ) -> Optional[bool]:
165 if not flow_run.parent_task_run_id or not flow_run.state:
166 return False
168 if flow_run.state.type in states.TERMINAL_STATES:
169 return False
171 async with db.session_context() as session:
172 parent_task_run = await models.task_runs.read_task_run(
173 session, task_run_id=flow_run.parent_task_run_id
174 )
176 if not parent_task_run or not parent_task_run.flow_run_id:
177 # Global orchestration policy will prevent further orchestration
178 return False
180 containing_flow_run = await models.flow_runs.read_flow_run(
181 session, flow_run_id=parent_task_run.flow_run_id
182 )
184 if (
185 containing_flow_run
186 and containing_flow_run.state
187 and containing_flow_run.state.type != states.StateType.CANCELLED
188 ):
189 # Nothing to do here; the parent is not cancelled
190 return False
192 if flow_run.deployment_id:
193 state = states.Cancelling(message="The parent flow run was cancelled.")
194 else:
195 state = states.Cancelled(message="The parent flow run was cancelled.")
197 async with db.session_context(begin_transaction=True) as session:
198 await models.flow_runs.set_flow_run_state(
199 session=session,
200 flow_run_id=flow_run.id,
201 state=state,
202 )
205if __name__ == "__main__": 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true1a
206 asyncio.run(CancellationCleanup(handle_signals=True).start())