Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/late_runs.py: 71%

51 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2The MarkLateRuns service. Responsible for putting flow runs in a Late state if they are not started on time. 

3The threshold for a late run can be configured by changing `PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS`. 

4""" 

5 

6from __future__ import annotations 1a

7 

8import asyncio 1a

9import datetime 1a

10from typing import TYPE_CHECKING, Any 1a

11 

12import sqlalchemy as sa 1a

13from sqlalchemy.ext.asyncio import AsyncSession 1a

14 

15import prefect.server.models as models 1a

16from prefect.server.database import PrefectDBInterface, inject_db 1a

17from prefect.server.database.dependencies import db_injector 1a

18from prefect.server.exceptions import ObjectNotFoundError 1a

19from prefect.server.orchestration.core_policy import MarkLateRunsPolicy 1a

20from prefect.server.schemas import states 1a

21from prefect.server.services.base import LoopService 1a

22from prefect.settings import ( 1a

23 PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS, 

24 PREFECT_API_SERVICES_LATE_RUNS_LOOP_SECONDS, 

25) 

26from prefect.settings.context import get_current_settings 1a

27from prefect.settings.models.server.services import ServicesBaseSetting 1a

28from prefect.types._datetime import DateTime, now 1a

29 

30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a

31 from uuid import UUID 

32 

33 

34class MarkLateRuns(LoopService): 1a

35 """ 

36 Finds flow runs that are later than their scheduled start time 

37 

38 A flow run is defined as "late" if has not scheduled within a certain amount 

39 of time after its scheduled start time. The exact amount is configurable in 

40 Prefect REST API Settings. 

41 """ 

42 

43 @classmethod 1a

44 def service_settings(cls) -> ServicesBaseSetting: 1a

45 return get_current_settings().server.services.late_runs 1b

46 

47 def __init__(self, loop_seconds: float | None = None, **kwargs: Any): 1a

48 super().__init__( 1b

49 loop_seconds=loop_seconds 

50 or PREFECT_API_SERVICES_LATE_RUNS_LOOP_SECONDS.value(), 

51 **kwargs, 

52 ) 

53 

54 # mark runs late if they are this far past their expected start time 

55 self.mark_late_after: datetime.timedelta = ( 1b

56 PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS.value() 

57 ) 

58 

59 # query for this many runs to mark as late at once 

60 self.batch_size = 400 1b

61 

62 @db_injector 1a

63 async def run_once(self, db: PrefectDBInterface) -> None: 1a

64 """ 

65 Mark flow runs as late by: 

66 

67 - Querying for flow runs in a scheduled state that are Scheduled to start in the past 

68 - For any runs past the "late" threshold, setting the flow run state to a new `Late` state 

69 """ 

70 scheduled_to_start_before = now("UTC") - datetime.timedelta( 1b

71 seconds=self.mark_late_after.total_seconds() 

72 ) 

73 

74 while True: 1b

75 async with db.session_context(begin_transaction=True) as session: 1bc

76 query = self._get_select_late_flow_runs_query( 1b

77 scheduled_to_start_before=scheduled_to_start_before, db=db 

78 ) 

79 

80 result = await session.execute(query) 1bc

81 runs = result.all() 

82 

83 # mark each run as late 

84 for run in runs: 

85 await self._mark_flow_run_as_late(session=session, flow_run=run) 

86 

87 # if no runs were found, exit the loop 

88 if len(runs) < self.batch_size: 

89 break 

90 

91 self.logger.info("Finished monitoring for late runs.") 1c

92 

93 @inject_db 1a

94 def _get_select_late_flow_runs_query( 1a

95 self, scheduled_to_start_before: datetime.datetime, db: PrefectDBInterface 

96 ) -> sa.Select[tuple["UUID", DateTime | None]]: 

97 """ 

98 Returns a sqlalchemy query for late flow runs. 

99 

100 Args: 

101 scheduled_to_start_before: the maximum next scheduled start time of 

102 scheduled flow runs to consider in the returned query 

103 """ 

104 query = ( 1b

105 sa.select( 

106 db.FlowRun.id, 

107 db.FlowRun.next_scheduled_start_time, 

108 ) 

109 .where( 

110 # The next scheduled start time is in the past, including the mark late 

111 # after buffer 

112 (db.FlowRun.next_scheduled_start_time <= scheduled_to_start_before), 

113 db.FlowRun.state_type == states.StateType.SCHEDULED, 

114 db.FlowRun.state_name == "Scheduled", 

115 ) 

116 .limit(self.batch_size) 

117 ) 

118 return query 1b

119 

120 async def _mark_flow_run_as_late( 1a

121 self, 

122 session: AsyncSession, 

123 flow_run: sa.Row[tuple["UUID", DateTime | None]], 

124 ) -> None: 

125 """ 

126 Mark a flow run as late. 

127 

128 Pass-through method for overrides. 

129 """ 

130 try: 

131 await models.flow_runs.set_flow_run_state( 

132 session=session, 

133 flow_run_id=flow_run.id, 

134 state=states.Late(scheduled_time=flow_run.next_scheduled_start_time), 

135 flow_policy=MarkLateRunsPolicy, # type: ignore 

136 ) 

137 except ObjectNotFoundError: 

138 return # flow run was deleted, ignore it 

139 

140 

141if __name__ == "__main__": 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true1a

142 asyncio.run(MarkLateRuns(handle_signals=True).start())