Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/instrumentation_policies.py: 28%

24 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Orchestration rules related to instrumenting the orchestration engine for Prefect 

3Observability 

4""" 

5 

6from __future__ import annotations 1a

7 

8from sqlalchemy.ext.asyncio import AsyncSession 1a

9 

10from prefect.server.database import orm_models 1a

11from prefect.server.events.clients import PrefectServerEventsClient 1a

12from prefect.server.models.events import ( 1a

13 TRUNCATE_STATE_MESSAGES_AT, 

14 flow_run_state_change_event, 

15 truncated_to, 

16) 

17from prefect.server.orchestration.rules import ( 1a

18 FlowOrchestrationContext, 

19 FlowRunUniversalTransform, 

20 OrchestrationContext, 

21) 

22from prefect.server.schemas import core 1a

23 

24 

25class InstrumentFlowRunStateTransitions(FlowRunUniversalTransform): 1a

26 """When a Flow Run changes states, fire a Prefect Event for the state change""" 

27 

28 async def after_transition( 1a

29 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

30 ) -> None: 

31 if not context.proposed_state or not context.validated_state: 

32 return 

33 

34 if not isinstance(context, FlowOrchestrationContext): 

35 return 

36 

37 initial_state = ( 

38 context.initial_state.model_copy() if context.initial_state else None 

39 ) 

40 validated_state = context.validated_state.model_copy() 

41 

42 # Guard against passing large state payloads to arq 

43 if initial_state and context.initial_state: 

44 initial_state.timestamp = context.initial_state.timestamp 

45 initial_state.message = truncated_to( 

46 TRUNCATE_STATE_MESSAGES_AT, initial_state.message 

47 ) 

48 if validated_state: 

49 validated_state.timestamp = context.validated_state.timestamp 

50 validated_state.message = truncated_to( 

51 TRUNCATE_STATE_MESSAGES_AT, validated_state.message 

52 ) 

53 

54 assert isinstance(context.session, AsyncSession) 

55 

56 async with PrefectServerEventsClient() as events: 

57 await events.emit( 

58 await flow_run_state_change_event( 

59 session=context.session, 

60 occurred=validated_state.timestamp, 

61 flow_run=context.run, 

62 initial_state_id=initial_state.id if initial_state else None, 

63 initial_state=initial_state, 

64 validated_state_id=validated_state.id, 

65 validated_state=validated_state, 

66 ) 

67 )