Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/events.py: 18%
115 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from datetime import datetime, timedelta 1a
2from typing import Any, Dict, List, MutableMapping, Optional, Set, Union 1a
3from uuid import UUID 1a
5from cachetools import TTLCache 1a
6from sqlalchemy.ext.asyncio import AsyncSession 1a
8from prefect._internal.uuid7 import uuid7 1a
9from prefect.server import models, schemas 1a
10from prefect.server.database.orm_models import ( 1a
11 ORMDeployment,
12 ORMFlow,
13 ORMFlowRun,
14 ORMFlowRunState,
15 ORMTaskRun,
16 ORMTaskRunState,
17 ORMWorkPool,
18 ORMWorkQueue,
19)
20from prefect.server.events.schemas.events import Event 1a
21from prefect.server.models import deployments 1a
22from prefect.server.schemas.statuses import DeploymentStatus 1a
23from prefect.settings import PREFECT_API_EVENTS_RELATED_RESOURCE_CACHE_TTL 1a
24from prefect.types._datetime import DateTime, now 1a
25from prefect.utilities.text import truncated_to 1a
27ResourceData = Dict[str, Dict[str, Any]] 1a
28RelatedResourceList = List[Dict[str, str]] 1a
31# Some users use state messages to convey error messages and large results; let's
32# truncate them so they don't blow out the size of a message
33TRUNCATE_STATE_MESSAGES_AT = 100_000 1a
36_flow_run_resource_data_cache: MutableMapping[UUID, ResourceData] = TTLCache( 1a
37 maxsize=1000,
38 ttl=PREFECT_API_EVENTS_RELATED_RESOURCE_CACHE_TTL.value().total_seconds(),
39)
42async def flow_run_state_change_event( 1a
43 session: AsyncSession,
44 occurred: datetime,
45 flow_run: ORMFlowRun,
46 initial_state_id: Optional[UUID],
47 initial_state: Optional[schemas.states.State],
48 validated_state_id: Optional[UUID],
49 validated_state: schemas.states.State,
50) -> Event:
51 return Event(
52 occurred=occurred,
53 event=f"prefect.flow-run.{validated_state.name}",
54 resource={
55 "prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
56 "prefect.resource.name": flow_run.name,
57 "prefect.run-count": str(flow_run.run_count),
58 "prefect.state-message": truncated_to(
59 TRUNCATE_STATE_MESSAGES_AT, validated_state.message
60 ),
61 "prefect.state-name": validated_state.name or "",
62 "prefect.state-timestamp": (
63 validated_state.timestamp.isoformat()
64 if validated_state.timestamp
65 else None
66 ),
67 "prefect.state-type": validated_state.type.value,
68 },
69 related=await _flow_run_related_resources_from_orm(
70 session=session, flow_run=flow_run
71 ),
72 payload={
73 "intended": {
74 "from": _state_type(initial_state),
75 "to": _state_type(validated_state),
76 },
77 "initial_state": state_payload(initial_state),
78 "validated_state": state_payload(validated_state),
79 },
80 # Here we use the state's ID as the ID of the event as well, in order to
81 # establish the ordering of the state-change events
82 id=validated_state_id,
83 follows=initial_state_id if _timing_is_tight(occurred, initial_state) else None,
84 )
87async def _flow_run_related_resources_from_orm( 1a
88 session: AsyncSession, flow_run: ORMFlowRun
89) -> RelatedResourceList:
90 resource_data = _flow_run_resource_data_cache.get(flow_run.id)
91 if not resource_data:
92 flow = await models.flows.read_flow(session=session, flow_id=flow_run.flow_id)
93 deployment: Optional[ORMDeployment] = None
94 if flow_run.deployment_id:
95 deployment = await deployments.read_deployment(
96 session, deployment_id=flow_run.deployment_id
97 )
99 work_queue = None
100 if flow_run.work_queue_id:
101 work_queue = await models.work_queues.read_work_queue(
102 session, work_queue_id=flow_run.work_queue_id
103 )
105 work_pool = work_queue.work_pool if work_queue is not None else None
107 task_run: Optional[ORMTaskRun] = None
108 if flow_run.parent_task_run_id:
109 task_run = await models.task_runs.read_task_run(
110 session,
111 task_run_id=flow_run.parent_task_run_id,
112 )
114 resource_data = _as_resource_data(
115 flow_run, flow, deployment, work_queue, work_pool, task_run
116 )
117 _flow_run_resource_data_cache[flow_run.id] = resource_data
119 return _resource_data_as_related_resources(
120 resource_data,
121 excluded_kinds=["flow-run"],
122 ) + _provenance_as_related_resources(flow_run.created_by)
125def _as_resource_data( 1a
126 flow_run: ORMFlowRun,
127 flow: Union[ORMFlow, schemas.core.Flow, None],
128 deployment: Union[ORMDeployment, schemas.responses.DeploymentResponse, None],
129 work_queue: Union[ORMWorkQueue, schemas.responses.WorkQueueResponse, None],
130 work_pool: Union[ORMWorkPool, schemas.core.WorkPool, None],
131 task_run: Union[ORMTaskRun, schemas.core.TaskRun, None] = None,
132) -> ResourceData:
133 return {
134 "flow-run": {
135 "id": str(flow_run.id),
136 "name": flow_run.name,
137 "tags": flow_run.tags if flow_run.tags else [],
138 "role": "flow-run",
139 },
140 "flow": (
141 {
142 "id": str(flow.id),
143 "name": flow.name,
144 "tags": flow.tags if flow.tags else [],
145 "role": "flow",
146 }
147 if flow
148 else {}
149 ),
150 "deployment": (
151 {
152 "id": str(deployment.id),
153 "name": deployment.name,
154 "tags": deployment.tags if deployment.tags else [],
155 "role": "deployment",
156 }
157 if deployment
158 else {}
159 ),
160 "work-queue": (
161 {
162 "id": str(work_queue.id),
163 "name": work_queue.name,
164 "tags": [],
165 "role": "work-queue",
166 }
167 if work_queue
168 else {}
169 ),
170 "work-pool": (
171 {
172 "id": str(work_pool.id),
173 "name": work_pool.name,
174 "tags": [],
175 "role": "work-pool",
176 "type": work_pool.type,
177 }
178 if work_pool
179 else {}
180 ),
181 "task-run": (
182 {
183 "id": str(task_run.id),
184 "name": task_run.name,
185 "tags": task_run.tags if task_run.tags else [],
186 "role": "task-run",
187 }
188 if task_run
189 else {}
190 ),
191 }
194def _resource_data_as_related_resources( 1a
195 resource_data: ResourceData,
196 excluded_kinds: Optional[List[str]] = None,
197) -> RelatedResourceList:
198 related = []
199 tags: Set[str] = set()
201 if excluded_kinds is None:
202 excluded_kinds = []
204 for kind, data in resource_data.items():
205 tags |= set(data.get("tags", []))
207 if kind in excluded_kinds or not data:
208 continue
210 related_resource = {
211 "prefect.resource.id": f"prefect.{kind}.{data['id']}",
212 "prefect.resource.role": data["role"],
213 "prefect.resource.name": data["name"],
214 }
216 if kind == "work-pool":
217 related_resource["prefect.work-pool.type"] = data["type"]
219 related.append(related_resource)
221 related += [
222 {
223 "prefect.resource.id": f"prefect.tag.{tag}",
224 "prefect.resource.role": "tag",
225 }
226 for tag in sorted(tags)
227 ]
229 return related
232def _provenance_as_related_resources( 1a
233 created_by: Optional[schemas.core.CreatedBy],
234) -> RelatedResourceList:
235 if not created_by:
236 return []
238 resource_id: str
240 if created_by.type == "DEPLOYMENT":
241 resource_id = f"prefect.deployment.{created_by.id}"
242 elif created_by.type == "AUTOMATION":
243 resource_id = f"prefect.automation.{created_by.id}"
244 else:
245 return []
247 related = {
248 "prefect.resource.id": resource_id,
249 "prefect.resource.role": "creator",
250 }
251 if created_by.display_value:
252 related["prefect.resource.name"] = created_by.display_value
253 return [related]
256def _state_type( 1a
257 state: Union[ORMFlowRunState, ORMTaskRunState, Optional[schemas.states.State]],
258) -> Optional[str]:
259 return str(state.type.value) if state else None
262def state_payload(state: Optional[schemas.states.State]) -> Optional[Dict[str, str]]: 1a
263 """Given a State, return the essential string parts of it for use in an
264 event payload"""
265 if not state:
266 return None
267 payload: Dict[str, str] = {"type": state.type.value}
268 if state.name:
269 payload["name"] = state.name
270 if state.message:
271 payload["message"] = truncated_to(TRUNCATE_STATE_MESSAGES_AT, state.message)
272 if state.is_paused():
273 payload["pause_reschedule"] = str(state.state_details.pause_reschedule).lower()
274 return payload
277def _timing_is_tight( 1a
278 occurred: datetime,
279 initial_state: Union[
280 ORMFlowRunState, ORMTaskRunState, Optional[schemas.states.State]
281 ],
282) -> bool:
283 # Only connect events with event.follows if the timing here is tight, which will
284 # help us resolve the order of these events if they happen to be delivered out of
285 # order. If the preceding state change happened a while back, don't worry about
286 # it because the order is very likely to be unambiguous.
287 TIGHT_TIMING = timedelta(minutes=5)
288 if initial_state and initial_state.timestamp:
289 return bool(-TIGHT_TIMING < (occurred - initial_state.timestamp) < TIGHT_TIMING)
291 return False
294async def deployment_status_event( 1a
295 session: AsyncSession,
296 deployment_id: UUID,
297 status: DeploymentStatus,
298 occurred: DateTime,
299) -> Event:
300 deployment = await models.deployments.read_deployment(
301 session=session, deployment_id=deployment_id
302 )
303 assert deployment
304 flow = await models.flows.read_flow(session=session, flow_id=deployment.flow_id)
305 work_queue = (
306 await models.workers.read_work_queue(
307 session=session,
308 work_queue_id=deployment.work_queue_id,
309 )
310 if deployment.work_queue_id
311 else None
312 )
314 work_pool = (
315 await models.workers.read_work_pool(
316 session=session,
317 work_pool_id=work_queue.work_pool_id,
318 )
319 if work_queue and work_queue.work_pool_id
320 else None
321 )
323 related_work_queue_and_pool_info = []
325 if flow is not None:
326 related_work_queue_and_pool_info.append(
327 {
328 "prefect.resource.id": f"prefect.flow.{flow.id}",
329 "prefect.resource.name": flow.name,
330 "prefect.resource.role": "flow",
331 }
332 )
334 if work_queue is not None:
335 related_work_queue_and_pool_info.append(
336 {
337 "prefect.resource.id": f"prefect.work-queue.{work_queue.id}",
338 "prefect.resource.name": work_queue.name,
339 "prefect.resource.role": "work-queue",
340 }
341 )
343 if work_pool is not None:
344 related_work_queue_and_pool_info.append(
345 {
346 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
347 "prefect.resource.name": work_pool.name,
348 "prefect.work-pool.type": work_pool.type,
349 "prefect.resource.role": "work-pool",
350 }
351 )
353 return Event(
354 occurred=occurred,
355 event=f"prefect.deployment.{status.in_kebab_case()}",
356 resource={
357 "prefect.resource.id": f"prefect.deployment.{deployment.id}",
358 "prefect.resource.name": f"{deployment.name}",
359 },
360 related=related_work_queue_and_pool_info,
361 id=uuid7(),
362 )
365async def work_queue_status_event( 1a
366 session: AsyncSession,
367 work_queue: "ORMWorkQueue",
368 occurred: DateTime,
369) -> Event:
370 related_work_pool_info: List[Dict[str, Any]] = []
372 if work_queue.work_pool_id:
373 work_pool = await models.workers.read_work_pool(
374 session=session,
375 work_pool_id=work_queue.work_pool_id,
376 )
378 if work_pool and work_pool.id and work_pool.name:
379 related_work_pool_info.append(
380 {
381 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
382 "prefect.resource.name": work_pool.name,
383 "prefect.work-pool.type": work_pool.type,
384 "prefect.resource.role": "work-pool",
385 }
386 )
388 return Event(
389 occurred=occurred,
390 event=f"prefect.work-queue.{work_queue.status.in_kebab_case()}",
391 resource={
392 "prefect.resource.id": f"prefect.work-queue.{work_queue.id}",
393 "prefect.resource.name": work_queue.name,
394 "prefect.resource.role": "work-queue",
395 },
396 related=related_work_pool_info,
397 id=uuid7(),
398 )
401async def work_pool_status_event( 1a
402 event_id: UUID,
403 occurred: DateTime,
404 pre_update_work_pool: Optional["ORMWorkPool"],
405 work_pool: "ORMWorkPool",
406) -> Event:
407 assert work_pool.status
409 return Event(
410 id=event_id,
411 occurred=occurred,
412 event=f"prefect.work-pool.{work_pool.status.in_kebab_case()}",
413 resource={
414 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
415 "prefect.resource.name": work_pool.name,
416 "prefect.work-pool.type": work_pool.type,
417 },
418 follows=_get_recent_preceding_work_pool_event_id(pre_update_work_pool),
419 )
422def _get_recent_preceding_work_pool_event_id( 1a
423 work_pool: Optional["ORMWorkPool"],
424) -> Optional[UUID]:
425 """
426 Returns the preceding event ID if the work pool transitioned status
427 recently to help ensure correct event ordering.
428 """
429 if not work_pool:
430 return None
432 time_since_last_event = timedelta(hours=24)
433 if work_pool.last_transitioned_status_at:
434 time_since_last_event = now("UTC") - work_pool.last_transitioned_status_at
436 return (
437 work_pool.last_status_event_id
438 if time_since_last_event < timedelta(minutes=10)
439 else None
440 )