Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/pause_expirations.py: 67%

38 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2The FailExpiredPauses service. Responsible for putting Paused flow runs in a Failed state if they are not resumed on time. 

3""" 

4 

5import asyncio 1a

6from typing import Any, Optional 1a

7 

8import sqlalchemy as sa 1a

9from sqlalchemy.ext.asyncio import AsyncSession 1a

10 

11import prefect.server.models as models 1a

12from prefect.server.database import PrefectDBInterface 1a

13from prefect.server.database.dependencies import db_injector 1a

14from prefect.server.database.orm_models import FlowRun 1a

15from prefect.server.schemas import states 1a

16from prefect.server.services.base import LoopService 1a

17from prefect.settings import PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_LOOP_SECONDS 1a

18from prefect.settings.context import get_current_settings 1a

19from prefect.settings.models.server.services import ServicesBaseSetting 1a

20from prefect.types._datetime import now 1a

21 

22 

23class FailExpiredPauses(LoopService): 1a

24 """ 

25 Fails flow runs that have been paused and never resumed 

26 """ 

27 

28 @classmethod 1a

29 def service_settings(cls) -> ServicesBaseSetting: 1a

30 return get_current_settings().server.services.pause_expirations 1b

31 

32 def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any): 1a

33 super().__init__( 1b

34 loop_seconds=loop_seconds 

35 or PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_LOOP_SECONDS.value(), 

36 **kwargs, 

37 ) 

38 

39 # query for this many runs to mark failed at once 

40 self.batch_size = 200 1b

41 

42 @db_injector 1a

43 async def run_once(self, db: PrefectDBInterface) -> None: 1a

44 """ 

45 Mark flow runs as failed by: 

46 

47 - Querying for flow runs in a Paused state that have timed out 

48 - For any runs past the "expiration" threshold, setting the flow run state to a 

49 new `Failed` state 

50 """ 

51 while True: 1b

52 async with db.session_context(begin_transaction=True) as session: 1bcd

53 query = ( 1b

54 sa.select(db.FlowRun) 

55 .where( 

56 db.FlowRun.state_type == states.StateType.PAUSED, 

57 ) 

58 .limit(self.batch_size) 

59 ) 

60 

61 result = await session.execute(query) 1bc

62 runs = result.scalars().all() 

63 

64 # mark each run as failed 

65 for run in runs: 

66 await self._mark_flow_run_as_failed(session=session, flow_run=run) 

67 

68 # if no runs were found, exit the loop 

69 if len(runs) < self.batch_size: 

70 break 

71 

72 self.logger.info("Finished monitoring for late runs.") 

73 

74 async def _mark_flow_run_as_failed( 1a

75 self, session: AsyncSession, flow_run: FlowRun 

76 ) -> None: 

77 """ 

78 Mark a flow run as failed. 

79 

80 Pass-through method for overrides. 

81 """ 

82 if ( 

83 flow_run.state is not None 

84 and flow_run.state.state_details.pause_timeout is not None 

85 and flow_run.state.state_details.pause_timeout < now("UTC") 

86 ): 

87 await models.flow_runs.set_flow_run_state( 

88 session=session, 

89 flow_run_id=flow_run.id, 

90 state=states.Failed(message="The flow was paused and never resumed."), 

91 force=True, 

92 ) 

93 

94 

95if __name__ == "__main__": 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true1a

96 asyncio.run(FailExpiredPauses(handle_signals=True).start())