Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/events.py: 58%
115 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from datetime import datetime, timedelta 1c
2from typing import Any, Dict, List, MutableMapping, Optional, Set, Union 1c
3from uuid import UUID 1c
5from cachetools import TTLCache 1c
6from sqlalchemy.ext.asyncio import AsyncSession 1c
8from prefect._internal.uuid7 import uuid7 1c
9from prefect.server import models, schemas 1c
10from prefect.server.database.orm_models import ( 1c
11 ORMDeployment,
12 ORMFlow,
13 ORMFlowRun,
14 ORMFlowRunState,
15 ORMTaskRun,
16 ORMTaskRunState,
17 ORMWorkPool,
18 ORMWorkQueue,
19)
20from prefect.server.events.schemas.events import Event 1c
21from prefect.server.models import deployments 1c
22from prefect.server.schemas.statuses import DeploymentStatus 1c
23from prefect.settings import PREFECT_API_EVENTS_RELATED_RESOURCE_CACHE_TTL 1c
24from prefect.types._datetime import DateTime, now 1c
25from prefect.utilities.text import truncated_to 1c
27ResourceData = Dict[str, Dict[str, Any]] 1c
28RelatedResourceList = List[Dict[str, str]] 1c
31# Some users use state messages to convey error messages and large results; let's
32# truncate them so they don't blow out the size of a message
33TRUNCATE_STATE_MESSAGES_AT = 100_000 1c
36_flow_run_resource_data_cache: MutableMapping[UUID, ResourceData] = TTLCache( 1c
37 maxsize=1000,
38 ttl=PREFECT_API_EVENTS_RELATED_RESOURCE_CACHE_TTL.value().total_seconds(),
39)
42async def flow_run_state_change_event( 1c
43 session: AsyncSession,
44 occurred: datetime,
45 flow_run: ORMFlowRun,
46 initial_state_id: Optional[UUID],
47 initial_state: Optional[schemas.states.State],
48 validated_state_id: Optional[UUID],
49 validated_state: schemas.states.State,
50) -> Event:
51 return Event( 1ab
52 occurred=occurred,
53 event=f"prefect.flow-run.{validated_state.name}",
54 resource={
55 "prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
56 "prefect.resource.name": flow_run.name,
57 "prefect.run-count": str(flow_run.run_count),
58 "prefect.state-message": truncated_to(
59 TRUNCATE_STATE_MESSAGES_AT, validated_state.message
60 ),
61 "prefect.state-name": validated_state.name or "",
62 "prefect.state-timestamp": (
63 validated_state.timestamp.isoformat()
64 if validated_state.timestamp
65 else None
66 ),
67 "prefect.state-type": validated_state.type.value,
68 },
69 related=await _flow_run_related_resources_from_orm(
70 session=session, flow_run=flow_run
71 ),
72 payload={
73 "intended": {
74 "from": _state_type(initial_state),
75 "to": _state_type(validated_state),
76 },
77 "initial_state": state_payload(initial_state),
78 "validated_state": state_payload(validated_state),
79 },
80 # Here we use the state's ID as the ID of the event as well, in order to
81 # establish the ordering of the state-change events
82 id=validated_state_id,
83 follows=initial_state_id if _timing_is_tight(occurred, initial_state) else None,
84 )
87async def _flow_run_related_resources_from_orm( 1c
88 session: AsyncSession, flow_run: ORMFlowRun
89) -> RelatedResourceList:
90 resource_data = _flow_run_resource_data_cache.get(flow_run.id) 1ab
91 if not resource_data: 91 ↛ 119line 91 didn't jump to line 119 because the condition on line 91 was always true1ab
92 flow = await models.flows.read_flow(session=session, flow_id=flow_run.flow_id) 1ab
93 deployment: Optional[ORMDeployment] = None
94 if flow_run.deployment_id:
95 deployment = await deployments.read_deployment(
96 session, deployment_id=flow_run.deployment_id
97 )
99 work_queue = None
100 if flow_run.work_queue_id:
101 work_queue = await models.work_queues.read_work_queue(
102 session, work_queue_id=flow_run.work_queue_id
103 )
105 work_pool = work_queue.work_pool if work_queue is not None else None
107 task_run: Optional[ORMTaskRun] = None
108 if flow_run.parent_task_run_id:
109 task_run = await models.task_runs.read_task_run(
110 session,
111 task_run_id=flow_run.parent_task_run_id,
112 )
114 resource_data = _as_resource_data(
115 flow_run, flow, deployment, work_queue, work_pool, task_run
116 )
117 _flow_run_resource_data_cache[flow_run.id] = resource_data
119 return _resource_data_as_related_resources(
120 resource_data,
121 excluded_kinds=["flow-run"],
122 ) + _provenance_as_related_resources(flow_run.created_by)
125def _as_resource_data( 1c
126 flow_run: ORMFlowRun,
127 flow: Union[ORMFlow, schemas.core.Flow, None],
128 deployment: Union[ORMDeployment, schemas.responses.DeploymentResponse, None],
129 work_queue: Union[ORMWorkQueue, schemas.responses.WorkQueueResponse, None],
130 work_pool: Union[ORMWorkPool, schemas.core.WorkPool, None],
131 task_run: Union[ORMTaskRun, schemas.core.TaskRun, None] = None,
132) -> ResourceData:
133 return { 1ab
134 "flow-run": {
135 "id": str(flow_run.id),
136 "name": flow_run.name,
137 "tags": flow_run.tags if flow_run.tags else [],
138 "role": "flow-run",
139 },
140 "flow": (
141 {
142 "id": str(flow.id),
143 "name": flow.name,
144 "tags": flow.tags if flow.tags else [],
145 "role": "flow",
146 }
147 if flow
148 else {}
149 ),
150 "deployment": (
151 {
152 "id": str(deployment.id),
153 "name": deployment.name,
154 "tags": deployment.tags if deployment.tags else [],
155 "role": "deployment",
156 }
157 if deployment
158 else {}
159 ),
160 "work-queue": (
161 {
162 "id": str(work_queue.id),
163 "name": work_queue.name,
164 "tags": [],
165 "role": "work-queue",
166 }
167 if work_queue
168 else {}
169 ),
170 "work-pool": (
171 {
172 "id": str(work_pool.id),
173 "name": work_pool.name,
174 "tags": [],
175 "role": "work-pool",
176 "type": work_pool.type,
177 }
178 if work_pool
179 else {}
180 ),
181 "task-run": (
182 {
183 "id": str(task_run.id),
184 "name": task_run.name,
185 "tags": task_run.tags if task_run.tags else [],
186 "role": "task-run",
187 }
188 if task_run
189 else {}
190 ),
191 }
194def _resource_data_as_related_resources( 1c
195 resource_data: ResourceData,
196 excluded_kinds: Optional[List[str]] = None,
197) -> RelatedResourceList:
198 related = [] 1ab
199 tags: Set[str] = set() 1ab
201 if excluded_kinds is None: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true1ab
202 excluded_kinds = []
204 for kind, data in resource_data.items(): 1ab
205 tags |= set(data.get("tags", [])) 1ab
207 if kind in excluded_kinds or not data: 1ab
208 continue 1ab
210 related_resource = { 1ab
211 "prefect.resource.id": f"prefect.{kind}.{data['id']}",
212 "prefect.resource.role": data["role"],
213 "prefect.resource.name": data["name"],
214 }
216 if kind == "work-pool": 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true1ab
217 related_resource["prefect.work-pool.type"] = data["type"]
219 related.append(related_resource) 1ab
221 related += [ 1ab
222 {
223 "prefect.resource.id": f"prefect.tag.{tag}",
224 "prefect.resource.role": "tag",
225 }
226 for tag in sorted(tags)
227 ]
229 return related 1ab
232def _provenance_as_related_resources( 1c
233 created_by: Optional[schemas.core.CreatedBy],
234) -> RelatedResourceList:
235 if not created_by: 235 ↛ 240line 235 didn't jump to line 240 because the condition on line 235 was always true1ab
236 return [] 1ab
238 resource_id: str
240 if created_by.type == "DEPLOYMENT":
241 resource_id = f"prefect.deployment.{created_by.id}"
242 elif created_by.type == "AUTOMATION":
243 resource_id = f"prefect.automation.{created_by.id}"
244 else:
245 return []
247 related = {
248 "prefect.resource.id": resource_id,
249 "prefect.resource.role": "creator",
250 }
251 if created_by.display_value:
252 related["prefect.resource.name"] = created_by.display_value
253 return [related]
256def _state_type( 1c
257 state: Union[ORMFlowRunState, ORMTaskRunState, Optional[schemas.states.State]],
258) -> Optional[str]:
259 return str(state.type.value) if state else None 1ab
262def state_payload(state: Optional[schemas.states.State]) -> Optional[Dict[str, str]]: 1c
263 """Given a State, return the essential string parts of it for use in an
264 event payload"""
265 if not state: 1ab
266 return None 1ab
267 payload: Dict[str, str] = {"type": state.type.value} 1ab
268 if state.name: 268 ↛ 270line 268 didn't jump to line 270 because the condition on line 268 was always true1ab
269 payload["name"] = state.name 1ab
270 if state.message: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true1ab
271 payload["message"] = truncated_to(TRUNCATE_STATE_MESSAGES_AT, state.message)
272 if state.is_paused(): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true1ab
273 payload["pause_reschedule"] = str(state.state_details.pause_reschedule).lower()
274 return payload 1ab
277def _timing_is_tight( 1c
278 occurred: datetime,
279 initial_state: Union[
280 ORMFlowRunState, ORMTaskRunState, Optional[schemas.states.State]
281 ],
282) -> bool:
283 # Only connect events with event.follows if the timing here is tight, which will
284 # help us resolve the order of these events if they happen to be delivered out of
285 # order. If the preceding state change happened a while back, don't worry about
286 # it because the order is very likely to be unambiguous.
287 TIGHT_TIMING = timedelta(minutes=5) 1ab
288 if initial_state and initial_state.timestamp: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true1ab
289 return bool(-TIGHT_TIMING < (occurred - initial_state.timestamp) < TIGHT_TIMING)
291 return False 1ab
294async def deployment_status_event( 1c
295 session: AsyncSession,
296 deployment_id: UUID,
297 status: DeploymentStatus,
298 occurred: DateTime,
299) -> Event:
300 deployment = await models.deployments.read_deployment(
301 session=session, deployment_id=deployment_id
302 )
303 assert deployment
304 flow = await models.flows.read_flow(session=session, flow_id=deployment.flow_id)
305 work_queue = (
306 await models.workers.read_work_queue(
307 session=session,
308 work_queue_id=deployment.work_queue_id,
309 )
310 if deployment.work_queue_id
311 else None
312 )
314 work_pool = (
315 await models.workers.read_work_pool(
316 session=session,
317 work_pool_id=work_queue.work_pool_id,
318 )
319 if work_queue and work_queue.work_pool_id
320 else None
321 )
323 related_work_queue_and_pool_info = []
325 if flow is not None:
326 related_work_queue_and_pool_info.append(
327 {
328 "prefect.resource.id": f"prefect.flow.{flow.id}",
329 "prefect.resource.name": flow.name,
330 "prefect.resource.role": "flow",
331 }
332 )
334 if work_queue is not None:
335 related_work_queue_and_pool_info.append(
336 {
337 "prefect.resource.id": f"prefect.work-queue.{work_queue.id}",
338 "prefect.resource.name": work_queue.name,
339 "prefect.resource.role": "work-queue",
340 }
341 )
343 if work_pool is not None:
344 related_work_queue_and_pool_info.append(
345 {
346 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
347 "prefect.resource.name": work_pool.name,
348 "prefect.work-pool.type": work_pool.type,
349 "prefect.resource.role": "work-pool",
350 }
351 )
353 return Event(
354 occurred=occurred,
355 event=f"prefect.deployment.{status.in_kebab_case()}",
356 resource={
357 "prefect.resource.id": f"prefect.deployment.{deployment.id}",
358 "prefect.resource.name": f"{deployment.name}",
359 },
360 related=related_work_queue_and_pool_info,
361 id=uuid7(),
362 )
365async def work_queue_status_event( 1c
366 session: AsyncSession,
367 work_queue: "ORMWorkQueue",
368 occurred: DateTime,
369) -> Event:
370 related_work_pool_info: List[Dict[str, Any]] = [] 1a
372 if work_queue.work_pool_id: 372 ↛ 388line 372 didn't jump to line 388 because the condition on line 372 was always true1a
373 work_pool = await models.workers.read_work_pool( 1a
374 session=session,
375 work_pool_id=work_queue.work_pool_id,
376 )
378 if work_pool and work_pool.id and work_pool.name: 378 ↛ 388line 378 didn't jump to line 388 because the condition on line 378 was always true1a
379 related_work_pool_info.append( 1a
380 {
381 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
382 "prefect.resource.name": work_pool.name,
383 "prefect.work-pool.type": work_pool.type,
384 "prefect.resource.role": "work-pool",
385 }
386 )
388 return Event( 1a
389 occurred=occurred,
390 event=f"prefect.work-queue.{work_queue.status.in_kebab_case()}",
391 resource={
392 "prefect.resource.id": f"prefect.work-queue.{work_queue.id}",
393 "prefect.resource.name": work_queue.name,
394 "prefect.resource.role": "work-queue",
395 },
396 related=related_work_pool_info,
397 id=uuid7(),
398 )
401async def work_pool_status_event( 1c
402 event_id: UUID,
403 occurred: DateTime,
404 pre_update_work_pool: Optional["ORMWorkPool"],
405 work_pool: "ORMWorkPool",
406) -> Event:
407 assert work_pool.status 1ab
409 return Event( 1ab
410 id=event_id,
411 occurred=occurred,
412 event=f"prefect.work-pool.{work_pool.status.in_kebab_case()}",
413 resource={
414 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
415 "prefect.resource.name": work_pool.name,
416 "prefect.work-pool.type": work_pool.type,
417 },
418 follows=_get_recent_preceding_work_pool_event_id(pre_update_work_pool),
419 )
422def _get_recent_preceding_work_pool_event_id( 1c
423 work_pool: Optional["ORMWorkPool"],
424) -> Optional[UUID]:
425 """
426 Returns the preceding event ID if the work pool transitioned status
427 recently to help ensure correct event ordering.
428 """
429 if not work_pool: 1ab
430 return None 1a
432 time_since_last_event = timedelta(hours=24) 1ab
433 if work_pool.last_transitioned_status_at: 1ab
434 time_since_last_event = now("UTC") - work_pool.last_transitioned_status_at
436 return ( 1ab
437 work_pool.last_status_event_id
438 if time_since_last_event < timedelta(minutes=10)
439 else None
440 )