Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/models/composite_trigger_child_firing.py: 58%
29 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from typing import TYPE_CHECKING, Sequence 1a
2from uuid import UUID 1a
4import sqlalchemy as sa 1a
5from sqlalchemy.dialects import postgresql 1a
6from sqlalchemy.ext.asyncio import AsyncSession 1a
8from prefect.server.database import PrefectDBInterface, db_injector 1a
9from prefect.server.events.schemas.automations import CompositeTrigger, Firing 1a
10from prefect.types._datetime import DateTime, now 1a
12if TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13 because the condition on line 12 was never true1a
13 from prefect.server.database.orm_models import ORMCompositeTriggerChildFiring
16@db_injector 1a
17async def upsert_child_firing( 1a
18 db: PrefectDBInterface,
19 session: AsyncSession,
20 firing: Firing,
21):
22 automation_id = firing.trigger.automation.id
23 parent_trigger_id = firing.trigger.parent.id
24 child_trigger_id = firing.trigger.id
26 upsert = (
27 postgresql.insert(db.CompositeTriggerChildFiring)
28 .values(
29 automation_id=automation_id,
30 parent_trigger_id=parent_trigger_id,
31 child_trigger_id=child_trigger_id,
32 child_firing_id=firing.id,
33 child_fired_at=firing.triggered,
34 child_firing=firing.model_dump(),
35 )
36 .on_conflict_do_update(
37 index_elements=[
38 db.CompositeTriggerChildFiring.automation_id,
39 db.CompositeTriggerChildFiring.parent_trigger_id,
40 db.CompositeTriggerChildFiring.child_trigger_id,
41 ],
42 set_=dict(
43 child_firing_id=firing.id,
44 child_fired_at=firing.triggered,
45 child_firing=firing.model_dump(),
46 updated=now("UTC"),
47 ),
48 )
49 )
51 await session.execute(upsert)
53 result = await session.execute(
54 sa.select(db.CompositeTriggerChildFiring).filter(
55 db.CompositeTriggerChildFiring.automation_id == automation_id,
56 db.CompositeTriggerChildFiring.parent_trigger_id == parent_trigger_id,
57 db.CompositeTriggerChildFiring.child_trigger_id == child_trigger_id,
58 )
59 )
61 return result.scalars().one()
64@db_injector 1a
65async def get_child_firings( 1a
66 db: PrefectDBInterface,
67 session: AsyncSession,
68 trigger: CompositeTrigger,
69) -> Sequence["ORMCompositeTriggerChildFiring"]:
70 result = await session.execute(
71 sa.select(db.CompositeTriggerChildFiring).filter(
72 db.CompositeTriggerChildFiring.automation_id == trigger.automation.id,
73 db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id,
74 db.CompositeTriggerChildFiring.child_trigger_id.in_(
75 trigger.child_trigger_ids
76 ),
77 )
78 )
80 return result.scalars().unique().all()
83@db_injector 1a
84async def clear_old_child_firings( 1a
85 db: PrefectDBInterface,
86 session: AsyncSession,
87 trigger: CompositeTrigger,
88 fired_before: DateTime,
89) -> None:
90 await session.execute(
91 sa.delete(db.CompositeTriggerChildFiring).filter(
92 db.CompositeTriggerChildFiring.automation_id == trigger.automation.id,
93 db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id,
94 db.CompositeTriggerChildFiring.child_fired_at < fired_before,
95 )
96 )
99@db_injector 1a
100async def clear_child_firings( 1a
101 db: PrefectDBInterface,
102 session: AsyncSession,
103 trigger: CompositeTrigger,
104 firing_ids: Sequence[UUID],
105) -> None:
106 await session.execute(
107 sa.delete(db.CompositeTriggerChildFiring).filter(
108 db.CompositeTriggerChildFiring.automation_id == trigger.automation.id,
109 db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id,
110 db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids),
111 )
112 )