Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/instrumentation_policies.py: 28%
24 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Orchestration rules related to instrumenting the orchestration engine for Prefect
3Observability
4"""
6from __future__ import annotations 1a
8from sqlalchemy.ext.asyncio import AsyncSession 1a
10from prefect.server.database import orm_models 1a
11from prefect.server.events.clients import PrefectServerEventsClient 1a
12from prefect.server.models.events import ( 1a
13 TRUNCATE_STATE_MESSAGES_AT,
14 flow_run_state_change_event,
15 truncated_to,
16)
17from prefect.server.orchestration.rules import ( 1a
18 FlowOrchestrationContext,
19 FlowRunUniversalTransform,
20 OrchestrationContext,
21)
22from prefect.server.schemas import core 1a
25class InstrumentFlowRunStateTransitions(FlowRunUniversalTransform): 1a
26 """When a Flow Run changes states, fire a Prefect Event for the state change"""
28 async def after_transition( 1a
29 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
30 ) -> None:
31 if not context.proposed_state or not context.validated_state:
32 return
34 if not isinstance(context, FlowOrchestrationContext):
35 return
37 initial_state = (
38 context.initial_state.model_copy() if context.initial_state else None
39 )
40 validated_state = context.validated_state.model_copy()
42 # Guard against passing large state payloads to arq
43 if initial_state and context.initial_state:
44 initial_state.timestamp = context.initial_state.timestamp
45 initial_state.message = truncated_to(
46 TRUNCATE_STATE_MESSAGES_AT, initial_state.message
47 )
48 if validated_state:
49 validated_state.timestamp = context.validated_state.timestamp
50 validated_state.message = truncated_to(
51 TRUNCATE_STATE_MESSAGES_AT, validated_state.message
52 )
54 assert isinstance(context.session, AsyncSession)
56 async with PrefectServerEventsClient() as events:
57 await events.emit(
58 await flow_run_state_change_event(
59 session=context.session,
60 occurred=validated_state.timestamp,
61 flow_run=context.run,
62 initial_state_id=initial_state.id if initial_state else None,
63 initial_state=initial_state,
64 validated_state_id=validated_state.id,
65 validated_state=validated_state,
66 )
67 )