Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/models/composite_trigger_child_firing.py: 58%

29 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from typing import TYPE_CHECKING, Sequence 1a

2from uuid import UUID 1a

3 

4import sqlalchemy as sa 1a

5from sqlalchemy.dialects import postgresql 1a

6from sqlalchemy.ext.asyncio import AsyncSession 1a

7 

8from prefect.server.database import PrefectDBInterface, db_injector 1a

9from prefect.server.events.schemas.automations import CompositeTrigger, Firing 1a

10from prefect.types._datetime import DateTime, now 1a

11 

12if TYPE_CHECKING: 12 ↛ 13line 12 didn't jump to line 13 because the condition on line 12 was never true1a

13 from prefect.server.database.orm_models import ORMCompositeTriggerChildFiring 

14 

15 

16@db_injector 1a

17async def upsert_child_firing( 1a

18 db: PrefectDBInterface, 

19 session: AsyncSession, 

20 firing: Firing, 

21): 

22 automation_id = firing.trigger.automation.id 

23 parent_trigger_id = firing.trigger.parent.id 

24 child_trigger_id = firing.trigger.id 

25 

26 upsert = ( 

27 postgresql.insert(db.CompositeTriggerChildFiring) 

28 .values( 

29 automation_id=automation_id, 

30 parent_trigger_id=parent_trigger_id, 

31 child_trigger_id=child_trigger_id, 

32 child_firing_id=firing.id, 

33 child_fired_at=firing.triggered, 

34 child_firing=firing.model_dump(), 

35 ) 

36 .on_conflict_do_update( 

37 index_elements=[ 

38 db.CompositeTriggerChildFiring.automation_id, 

39 db.CompositeTriggerChildFiring.parent_trigger_id, 

40 db.CompositeTriggerChildFiring.child_trigger_id, 

41 ], 

42 set_=dict( 

43 child_firing_id=firing.id, 

44 child_fired_at=firing.triggered, 

45 child_firing=firing.model_dump(), 

46 updated=now("UTC"), 

47 ), 

48 ) 

49 ) 

50 

51 await session.execute(upsert) 

52 

53 result = await session.execute( 

54 sa.select(db.CompositeTriggerChildFiring).filter( 

55 db.CompositeTriggerChildFiring.automation_id == automation_id, 

56 db.CompositeTriggerChildFiring.parent_trigger_id == parent_trigger_id, 

57 db.CompositeTriggerChildFiring.child_trigger_id == child_trigger_id, 

58 ) 

59 ) 

60 

61 return result.scalars().one() 

62 

63 

64@db_injector 1a

65async def get_child_firings( 1a

66 db: PrefectDBInterface, 

67 session: AsyncSession, 

68 trigger: CompositeTrigger, 

69) -> Sequence["ORMCompositeTriggerChildFiring"]: 

70 result = await session.execute( 

71 sa.select(db.CompositeTriggerChildFiring).filter( 

72 db.CompositeTriggerChildFiring.automation_id == trigger.automation.id, 

73 db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id, 

74 db.CompositeTriggerChildFiring.child_trigger_id.in_( 

75 trigger.child_trigger_ids 

76 ), 

77 ) 

78 ) 

79 

80 return result.scalars().unique().all() 

81 

82 

83@db_injector 1a

84async def clear_old_child_firings( 1a

85 db: PrefectDBInterface, 

86 session: AsyncSession, 

87 trigger: CompositeTrigger, 

88 fired_before: DateTime, 

89) -> None: 

90 await session.execute( 

91 sa.delete(db.CompositeTriggerChildFiring).filter( 

92 db.CompositeTriggerChildFiring.automation_id == trigger.automation.id, 

93 db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id, 

94 db.CompositeTriggerChildFiring.child_fired_at < fired_before, 

95 ) 

96 ) 

97 

98 

99@db_injector 1a

100async def clear_child_firings( 1a

101 db: PrefectDBInterface, 

102 session: AsyncSession, 

103 trigger: CompositeTrigger, 

104 firing_ids: Sequence[UUID], 

105) -> None: 

106 await session.execute( 

107 sa.delete(db.CompositeTriggerChildFiring).filter( 

108 db.CompositeTriggerChildFiring.automation_id == trigger.automation.id, 

109 db.CompositeTriggerChildFiring.parent_trigger_id == trigger.id, 

110 db.CompositeTriggerChildFiring.child_firing_id.in_(firing_ids), 

111 ) 

112 )