Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/late_runs.py: 71%
51 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2The MarkLateRuns service. Responsible for putting flow runs in a Late state if they are not started on time.
3The threshold for a late run can be configured by changing `PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS`.
4"""
6from __future__ import annotations 1a
8import asyncio 1a
9import datetime 1a
10from typing import TYPE_CHECKING, Any 1a
12import sqlalchemy as sa 1a
13from sqlalchemy.ext.asyncio import AsyncSession 1a
15import prefect.server.models as models 1a
16from prefect.server.database import PrefectDBInterface, inject_db 1a
17from prefect.server.database.dependencies import db_injector 1a
18from prefect.server.exceptions import ObjectNotFoundError 1a
19from prefect.server.orchestration.core_policy import MarkLateRunsPolicy 1a
20from prefect.server.schemas import states 1a
21from prefect.server.services.base import LoopService 1a
22from prefect.settings import ( 1a
23 PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS,
24 PREFECT_API_SERVICES_LATE_RUNS_LOOP_SECONDS,
25)
26from prefect.settings.context import get_current_settings 1a
27from prefect.settings.models.server.services import ServicesBaseSetting 1a
28from prefect.types._datetime import DateTime, now 1a
30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a
31 from uuid import UUID
34class MarkLateRuns(LoopService): 1a
35 """
36 Finds flow runs that are later than their scheduled start time
38 A flow run is defined as "late" if has not scheduled within a certain amount
39 of time after its scheduled start time. The exact amount is configurable in
40 Prefect REST API Settings.
41 """
43 @classmethod 1a
44 def service_settings(cls) -> ServicesBaseSetting: 1a
45 return get_current_settings().server.services.late_runs 1b
47 def __init__(self, loop_seconds: float | None = None, **kwargs: Any): 1a
48 super().__init__( 1b
49 loop_seconds=loop_seconds
50 or PREFECT_API_SERVICES_LATE_RUNS_LOOP_SECONDS.value(),
51 **kwargs,
52 )
54 # mark runs late if they are this far past their expected start time
55 self.mark_late_after: datetime.timedelta = ( 1b
56 PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS.value()
57 )
59 # query for this many runs to mark as late at once
60 self.batch_size = 400 1b
62 @db_injector 1a
63 async def run_once(self, db: PrefectDBInterface) -> None: 1a
64 """
65 Mark flow runs as late by:
67 - Querying for flow runs in a scheduled state that are Scheduled to start in the past
68 - For any runs past the "late" threshold, setting the flow run state to a new `Late` state
69 """
70 scheduled_to_start_before = now("UTC") - datetime.timedelta( 1b
71 seconds=self.mark_late_after.total_seconds()
72 )
74 while True: 1b
75 async with db.session_context(begin_transaction=True) as session: 1bc
76 query = self._get_select_late_flow_runs_query( 1b
77 scheduled_to_start_before=scheduled_to_start_before, db=db
78 )
80 result = await session.execute(query) 1bc
81 runs = result.all()
83 # mark each run as late
84 for run in runs:
85 await self._mark_flow_run_as_late(session=session, flow_run=run)
87 # if no runs were found, exit the loop
88 if len(runs) < self.batch_size:
89 break
91 self.logger.info("Finished monitoring for late runs.") 1c
93 @inject_db 1a
94 def _get_select_late_flow_runs_query( 1a
95 self, scheduled_to_start_before: datetime.datetime, db: PrefectDBInterface
96 ) -> sa.Select[tuple["UUID", DateTime | None]]:
97 """
98 Returns a sqlalchemy query for late flow runs.
100 Args:
101 scheduled_to_start_before: the maximum next scheduled start time of
102 scheduled flow runs to consider in the returned query
103 """
104 query = ( 1b
105 sa.select(
106 db.FlowRun.id,
107 db.FlowRun.next_scheduled_start_time,
108 )
109 .where(
110 # The next scheduled start time is in the past, including the mark late
111 # after buffer
112 (db.FlowRun.next_scheduled_start_time <= scheduled_to_start_before),
113 db.FlowRun.state_type == states.StateType.SCHEDULED,
114 db.FlowRun.state_name == "Scheduled",
115 )
116 .limit(self.batch_size)
117 )
118 return query 1b
120 async def _mark_flow_run_as_late( 1a
121 self,
122 session: AsyncSession,
123 flow_run: sa.Row[tuple["UUID", DateTime | None]],
124 ) -> None:
125 """
126 Mark a flow run as late.
128 Pass-through method for overrides.
129 """
130 try:
131 await models.flow_runs.set_flow_run_state(
132 session=session,
133 flow_run_id=flow_run.id,
134 state=states.Late(scheduled_time=flow_run.next_scheduled_start_time),
135 flow_policy=MarkLateRunsPolicy, # type: ignore
136 )
137 except ObjectNotFoundError:
138 return # flow run was deleted, ignore it
141if __name__ == "__main__": 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true1a
142 asyncio.run(MarkLateRuns(handle_signals=True).start())