Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/instrumentation_policies.py: 75%

24 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Orchestration rules related to instrumenting the orchestration engine for Prefect 

3Observability 

4""" 

5 

6from __future__ import annotations 1c

7 

8from sqlalchemy.ext.asyncio import AsyncSession 1c

9 

10from prefect.server.database import orm_models 1c

11from prefect.server.events.clients import PrefectServerEventsClient 1c

12from prefect.server.models.events import ( 1c

13 TRUNCATE_STATE_MESSAGES_AT, 

14 flow_run_state_change_event, 

15 truncated_to, 

16) 

17from prefect.server.orchestration.rules import ( 1c

18 FlowOrchestrationContext, 

19 FlowRunUniversalTransform, 

20 OrchestrationContext, 

21) 

22from prefect.server.schemas import core 1c

23 

24 

25class InstrumentFlowRunStateTransitions(FlowRunUniversalTransform): 1c

26 """When a Flow Run changes states, fire a Prefect Event for the state change""" 

27 

28 async def after_transition( 1c

29 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

30 ) -> None: 

31 if not context.proposed_state or not context.validated_state: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true1ab

32 return 

33 

34 if not isinstance(context, FlowOrchestrationContext): 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true1ab

35 return 

36 

37 initial_state = ( 1ab

38 context.initial_state.model_copy() if context.initial_state else None 

39 ) 

40 validated_state = context.validated_state.model_copy() 1ab

41 

42 # Guard against passing large state payloads to arq 

43 if initial_state and context.initial_state: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true1ab

44 initial_state.timestamp = context.initial_state.timestamp 

45 initial_state.message = truncated_to( 

46 TRUNCATE_STATE_MESSAGES_AT, initial_state.message 

47 ) 

48 if validated_state: 48 ↛ 54line 48 didn't jump to line 54 because the condition on line 48 was always true1ab

49 validated_state.timestamp = context.validated_state.timestamp 1ab

50 validated_state.message = truncated_to( 1ab

51 TRUNCATE_STATE_MESSAGES_AT, validated_state.message 

52 ) 

53 

54 assert isinstance(context.session, AsyncSession) 1ab

55 

56 async with PrefectServerEventsClient() as events: 1ab

57 await events.emit( 1ab

58 await flow_run_state_change_event( 

59 session=context.session, 

60 occurred=validated_state.timestamp, 

61 flow_run=context.run, 

62 initial_state_id=initial_state.id if initial_state else None, 

63 initial_state=initial_state, 

64 validated_state_id=validated_state.id, 

65 validated_state=validated_state, 

66 ) 

67 )