Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/pause_expirations.py: 87%
38 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2The FailExpiredPauses service. Responsible for putting Paused flow runs in a Failed state if they are not resumed on time.
3"""
5import asyncio 1a
6from typing import Any, Optional 1a
8import sqlalchemy as sa 1a
9from sqlalchemy.ext.asyncio import AsyncSession 1a
11import prefect.server.models as models 1a
12from prefect.server.database import PrefectDBInterface 1a
13from prefect.server.database.dependencies import db_injector 1a
14from prefect.server.database.orm_models import FlowRun 1a
15from prefect.server.schemas import states 1a
16from prefect.server.services.base import LoopService 1a
17from prefect.settings import PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_LOOP_SECONDS 1a
18from prefect.settings.context import get_current_settings 1a
19from prefect.settings.models.server.services import ServicesBaseSetting 1a
20from prefect.types._datetime import now 1a
23class FailExpiredPauses(LoopService): 1a
24 """
25 Fails flow runs that have been paused and never resumed
26 """
28 @classmethod 1a
29 def service_settings(cls) -> ServicesBaseSetting: 1a
30 return get_current_settings().server.services.pause_expirations 1d
32 def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any): 1a
33 super().__init__( 1d
34 loop_seconds=loop_seconds
35 or PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_LOOP_SECONDS.value(),
36 **kwargs,
37 )
39 # query for this many runs to mark failed at once
40 self.batch_size = 200 1d
42 @db_injector 1a
43 async def run_once(self, db: PrefectDBInterface) -> None: 1a
44 """
45 Mark flow runs as failed by:
47 - Querying for flow runs in a Paused state that have timed out
48 - For any runs past the "expiration" threshold, setting the flow run state to a
49 new `Failed` state
50 """
51 while True: 1dbec
52 async with db.session_context(begin_transaction=True) as session: 1dfbec
53 query = ( 1dbec
54 sa.select(db.FlowRun)
55 .where(
56 db.FlowRun.state_type == states.StateType.PAUSED,
57 )
58 .limit(self.batch_size)
59 )
61 result = await session.execute(query) 1dfbec
62 runs = result.scalars().all() 1bc
64 # mark each run as failed
65 for run in runs: 65 ↛ 66line 65 didn't jump to line 66 because the loop on line 65 never started1bc
66 await self._mark_flow_run_as_failed(session=session, flow_run=run)
68 # if no runs were found, exit the loop
69 if len(runs) < self.batch_size: 69 ↛ 51line 69 didn't jump to line 511bc
70 break 1bc
72 self.logger.info("Finished monitoring for late runs.") 1fbec
74 async def _mark_flow_run_as_failed( 1a
75 self, session: AsyncSession, flow_run: FlowRun
76 ) -> None:
77 """
78 Mark a flow run as failed.
80 Pass-through method for overrides.
81 """
82 if ( 82 ↛ exitline 82 didn't return from function '_mark_flow_run_as_failed' because the condition on line 82 was always true
83 flow_run.state is not None
84 and flow_run.state.state_details.pause_timeout is not None
85 and flow_run.state.state_details.pause_timeout < now("UTC")
86 ):
87 await models.flow_runs.set_flow_run_state(
88 session=session,
89 flow_run_id=flow_run.id,
90 state=states.Failed(message="The flow was paused and never resumed."),
91 force=True,
92 )
95if __name__ == "__main__": 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true1a
96 asyncio.run(FailExpiredPauses(handle_signals=True).start())