Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/instrumentation_policies.py: 75%
24 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Orchestration rules related to instrumenting the orchestration engine for Prefect
3Observability
4"""
6from __future__ import annotations 1c
8from sqlalchemy.ext.asyncio import AsyncSession 1c
10from prefect.server.database import orm_models 1c
11from prefect.server.events.clients import PrefectServerEventsClient 1c
12from prefect.server.models.events import ( 1c
13 TRUNCATE_STATE_MESSAGES_AT,
14 flow_run_state_change_event,
15 truncated_to,
16)
17from prefect.server.orchestration.rules import ( 1c
18 FlowOrchestrationContext,
19 FlowRunUniversalTransform,
20 OrchestrationContext,
21)
22from prefect.server.schemas import core 1c
25class InstrumentFlowRunStateTransitions(FlowRunUniversalTransform): 1c
26 """When a Flow Run changes states, fire a Prefect Event for the state change"""
28 async def after_transition( 1c
29 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
30 ) -> None:
31 if not context.proposed_state or not context.validated_state: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true1ab
32 return
34 if not isinstance(context, FlowOrchestrationContext): 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true1ab
35 return
37 initial_state = ( 1ab
38 context.initial_state.model_copy() if context.initial_state else None
39 )
40 validated_state = context.validated_state.model_copy() 1ab
42 # Guard against passing large state payloads to arq
43 if initial_state and context.initial_state: 43 ↛ 44line 43 didn't jump to line 44 because the condition on line 43 was never true1ab
44 initial_state.timestamp = context.initial_state.timestamp
45 initial_state.message = truncated_to(
46 TRUNCATE_STATE_MESSAGES_AT, initial_state.message
47 )
48 if validated_state: 48 ↛ 54line 48 didn't jump to line 54 because the condition on line 48 was always true1ab
49 validated_state.timestamp = context.validated_state.timestamp 1ab
50 validated_state.message = truncated_to( 1ab
51 TRUNCATE_STATE_MESSAGES_AT, validated_state.message
52 )
54 assert isinstance(context.session, AsyncSession) 1ab
56 async with PrefectServerEventsClient() as events: 1ab
57 await events.emit( 1ab
58 await flow_run_state_change_event(
59 session=context.session,
60 occurred=validated_state.timestamp,
61 flow_run=context.run,
62 initial_state_id=initial_state.id if initial_state else None,
63 initial_state=initial_state,
64 validated_state_id=validated_state.id,
65 validated_state=validated_state,
66 )
67 )