Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/schemas/automations.py: 77%
296 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import abc 1a
4import re 1a
5import weakref 1a
6from datetime import timedelta 1a
7from typing import ( 1a
8 TYPE_CHECKING,
9 Any,
10 Dict,
11 List,
12 Literal,
13 Optional,
14 Sequence,
15 Set,
16 Tuple,
17 Type,
18 TypeVar,
19 Union,
20)
21from uuid import UUID, uuid4 1a
23from pydantic import ( 1a
24 Field,
25 PrivateAttr,
26 field_validator,
27 model_validator,
28)
29from typing_extensions import Self, TypeAlias 1a
31from prefect._internal.uuid7 import uuid7 1a
32from prefect.logging import get_logger 1a
33from prefect.server.events.actions import ServerActionTypes 1a
34from prefect.server.events.schemas.events import ( 1a
35 ReceivedEvent,
36 RelatedResource,
37 Resource,
38 ResourceSpecification,
39 matches,
40)
41from prefect.server.schemas.actions import ActionBaseModel 1a
42from prefect.server.utilities.schemas import ORMBaseModel, PrefectBaseModel 1a
43from prefect.types import DateTime 1a
44from prefect.utilities.collections import AutoEnum 1a
46if TYPE_CHECKING: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true1a
47 import logging
49logger: "logging.Logger" = get_logger(__name__) 1a
52class Posture(AutoEnum): 1a
53 Reactive = "Reactive" 1a
54 Proactive = "Proactive" 1a
55 Metric = "Metric" 1a
58class TriggerState(AutoEnum): 1a
59 Triggered = "Triggered" 1a
60 Resolved = "Resolved" 1a
63class Trigger(PrefectBaseModel, abc.ABC): 1a
64 """
65 Base class describing a set of criteria that must be satisfied in order to trigger
66 an automation.
67 """
69 type: str 1a
71 id: UUID = Field(default_factory=uuid4, description="The unique ID of this trigger") 1a
73 _automation: Optional[weakref.ref[Any]] = PrivateAttr(None) 1a
74 _parent: Optional[weakref.ref[Any]] = PrivateAttr(None) 1a
76 @property 1a
77 def automation(self) -> "Automation": 1a
78 assert self._automation is not None, "Trigger._automation has not been set" 1gceb
79 value = self._automation() 1gceb
80 assert value is not None, "Trigger._automation has been garbage collected" 1gceb
81 return value 1gceb
83 @property 1a
84 def parent(self) -> "Union[Trigger, Automation]": 1a
85 assert self._parent is not None, "Trigger._parent has not been set" 1cb
86 value = self._parent() 1cb
87 assert value is not None, "Trigger._parent has been garbage collected" 1cb
88 return value 1cb
90 def _set_parent(self, value: "Union[Trigger, Automation]"): 1a
91 if isinstance(value, Automation): 91 ↛ 94line 91 didn't jump to line 94 because the condition on line 91 was always true1dcfb
92 self._automation = weakref.ref(value) 1dcfb
93 self._parent = self._automation 1dcfb
94 elif isinstance(value, Trigger):
95 self._parent = weakref.ref(value)
96 self._automation = value._automation
97 else: # pragma: no cover
98 raise ValueError("parent must be an Automation or a Trigger")
100 def reset_ids(self) -> None: 1a
101 """Resets the ID of this trigger and all of its children"""
102 self.id = uuid4() 1dcb
103 for trigger in self.all_triggers(): 1dcb
104 trigger.id = uuid4() 1dcb
106 def all_triggers(self) -> Sequence[Trigger]: 1a
107 """Returns all triggers within this trigger"""
108 return [self] 1db
110 @abc.abstractmethod 1a
111 def create_automation_state_change_event( 111 ↛ exitline 111 didn't return from function 'create_automation_state_change_event' because 1a
112 self, firing: "Firing", trigger_state: TriggerState
113 ) -> ReceivedEvent: ...
116class CompositeTrigger(Trigger, abc.ABC): 1a
117 """
118 Requires some number of triggers to have fired within the given time period.
119 """
121 type: Literal["compound", "sequence"] 1a
122 triggers: List["ServerTriggerTypes"] 1a
123 within: Optional[timedelta] 1a
125 def create_automation_state_change_event( 1a
126 self, firing: Firing, trigger_state: TriggerState
127 ) -> ReceivedEvent:
128 """Returns a ReceivedEvent for an automation state change
129 into a triggered or resolved state."""
130 automation = firing.trigger.automation
131 triggering_event = firing.triggering_event
132 return ReceivedEvent(
133 occurred=firing.triggered,
134 event=f"prefect.automation.{trigger_state.value.lower()}",
135 resource={
136 "prefect.resource.id": f"prefect.automation.{automation.id}",
137 "prefect.resource.name": automation.name,
138 },
139 related=(
140 [
141 {
142 "prefect.resource.id": f"prefect.event.{triggering_event.id}",
143 "prefect.resource.role": "triggering-event",
144 }
145 ]
146 if triggering_event
147 else []
148 ),
149 payload={
150 "triggering_labels": firing.triggering_labels,
151 "triggering_event": (
152 triggering_event.model_dump(mode="json")
153 if triggering_event
154 else None
155 ),
156 },
157 id=uuid7(),
158 )
160 def _set_parent(self, value: "Union[Trigger , Automation]"): 1a
161 super()._set_parent(value) 1dcb
162 for trigger in self.triggers: 162 ↛ 163line 162 didn't jump to line 163 because the loop on line 162 never started1dcb
163 trigger._set_parent(self)
165 def all_triggers(self) -> Sequence[Trigger]: 1a
166 return [self] + [t for child in self.triggers for t in child.all_triggers()] 1dcb
168 @property 1a
169 def child_trigger_ids(self) -> List[UUID]: 1a
170 return [trigger.id for trigger in self.triggers]
172 @property 1a
173 def num_expected_firings(self) -> int: 1a
174 return len(self.triggers)
176 @abc.abstractmethod 1a
177 def ready_to_fire(self, firings: Sequence["Firing"]) -> bool: ... 177 ↛ exitline 177 didn't return from function 'ready_to_fire' because 1a
180class CompoundTrigger(CompositeTrigger): 1a
181 """A composite trigger that requires some number of triggers to have
182 fired within the given time period"""
184 type: Literal["compound"] = "compound" 1a
185 require: Union[int, Literal["any", "all"]] 1a
187 @property 1a
188 def num_expected_firings(self) -> int: 1a
189 if self.require == "any":
190 return 1
191 elif self.require == "all":
192 return len(self.triggers)
193 else:
194 return int(self.require)
196 def ready_to_fire(self, firings: Sequence["Firing"]) -> bool: 1a
197 return len(firings) >= self.num_expected_firings
199 @model_validator(mode="after") 1a
200 def validate_require(self) -> Self: 1a
201 if isinstance(self.require, int): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true1dcb
202 if self.require < 1:
203 raise ValueError("require must be at least 1")
204 if self.require > len(self.triggers):
205 raise ValueError(
206 "require must be less than or equal to the number of triggers"
207 )
209 return self 1dcb
212class SequenceTrigger(CompositeTrigger): 1a
213 """A composite trigger that requires some number of triggers to have fired
214 within the given time period in a specific order"""
216 type: Literal["sequence"] = "sequence" 1a
218 @property 1a
219 def expected_firing_order(self) -> List[UUID]: 1a
220 return [trigger.id for trigger in self.triggers]
222 def ready_to_fire(self, firings: Sequence["Firing"]) -> bool: 1a
223 actual_firing_order = [
224 f.trigger.id for f in sorted(firings, key=lambda f: f.triggered)
225 ]
226 return actual_firing_order == self.expected_firing_order
229class ResourceTrigger(Trigger, abc.ABC): 1a
230 """
231 Base class for triggers that may filter by the labels of resources.
232 """
234 type: str 1a
236 match: ResourceSpecification = Field( 1a
237 default_factory=lambda: ResourceSpecification.model_validate({}),
238 description="Labels for resources which this trigger will match.",
239 )
240 match_related: Union[ResourceSpecification, list[ResourceSpecification]] = Field( 1a
241 default_factory=lambda: ResourceSpecification.model_validate({}),
242 description="Labels for related resources which this trigger will match.",
243 )
245 def covers_resources( 1a
246 self, resource: Resource, related: Sequence[RelatedResource]
247 ) -> bool:
248 if not self.match.includes([resource]): 1dceb
249 return False 1dceb
251 match_related = self.match_related 1ceb
252 if not isinstance(match_related, list): 252 ↛ 255line 252 didn't jump to line 255 because the condition on line 252 was always true1ceb
253 match_related = [match_related] 1ceb
255 if not all(match.includes(related) for match in match_related): 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true1ceb
256 return False
258 return True 1ceb
261class EventTrigger(ResourceTrigger): 1a
262 """
263 A trigger that fires based on the presence or absence of events within a given
264 period of time.
265 """
267 type: Literal["event"] = "event" 1a
269 after: Set[str] = Field( 1a
270 default_factory=set,
271 description=(
272 "The event(s) which must first been seen to fire this trigger. If "
273 "empty, then fire this trigger immediately. Events may include "
274 "trailing wildcards, like `prefect.flow-run.*`"
275 ),
276 )
277 expect: Set[str] = Field( 1a
278 default_factory=set,
279 description=(
280 "The event(s) this trigger is expecting to see. If empty, this "
281 "trigger will match any event. Events may include trailing wildcards, "
282 "like `prefect.flow-run.*`"
283 ),
284 )
286 for_each: Set[str] = Field( 1a
287 default_factory=set,
288 description=(
289 "Evaluate the trigger separately for each distinct value of these labels "
290 "on the resource. By default, labels refer to the primary resource of the "
291 "triggering event. You may also refer to labels from related "
292 "resources by specifying `related:<role>:<label>`. This will use the "
293 "value of that label for the first related resource in that role. For "
294 'example, `"for_each": ["related:flow:prefect.resource.id"]` would '
295 "evaluate the trigger for each flow."
296 ),
297 )
298 posture: Literal[Posture.Reactive, Posture.Proactive] = Field( # type: ignore[valid-type] 1a
299 ...,
300 description=(
301 "The posture of this trigger, either Reactive or Proactive. Reactive "
302 "triggers respond to the _presence_ of the expected events, while "
303 "Proactive triggers respond to the _absence_ of those expected events."
304 ),
305 )
306 threshold: int = Field( 1a
307 1,
308 description=(
309 "The number of events required for this trigger to fire (for "
310 "Reactive triggers), or the number of events expected (for Proactive "
311 "triggers)"
312 ),
313 )
314 within: timedelta = Field( 1a
315 timedelta(seconds=0),
316 ge=timedelta(seconds=0),
317 description=(
318 "The time period over which the events must occur. For Reactive triggers, "
319 "this may be as low as 0 seconds, but must be at least 10 seconds for "
320 "Proactive triggers"
321 ),
322 )
324 @model_validator(mode="before") 1a
325 @classmethod 1a
326 def enforce_minimum_within_for_proactive_triggers( 1a
327 cls, data: Dict[str, Any] | Any
328 ) -> Dict[str, Any]:
329 if not isinstance(data, dict): 1dcfb
330 return data 1dcb
332 if "within" in data and data["within"] is None: 1dcfb
333 raise ValueError("`within` should be a valid timedelta") 1dcb
335 posture: Optional[Posture] = data.get("posture") 1dcfb
336 within: Optional[timedelta] = data.get("within") 1dcfb
338 if isinstance(within, (int, float)): 1dcfb
339 data["within"] = within = timedelta(seconds=within) 1dcfb
341 if posture == Posture.Proactive: 1dcfb
342 if not within or within == timedelta(0): 1dcfb
343 data["within"] = timedelta(seconds=10.0) 1db
344 elif within < timedelta(seconds=10.0): 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true1dcfb
345 raise ValueError(
346 "`within` for Proactive triggers must be greater than or equal to "
347 "10 seconds"
348 )
350 return data 1dcfb
352 def covers(self, event: ReceivedEvent) -> bool: 1a
353 if not self.covers_resources(event.resource, event.related): 1dceb
354 return False 1dceb
356 if not self.event_pattern.match(event.event): 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true1ceb
357 return False
359 return True 1ceb
361 @property 1a
362 def immediate(self) -> bool: 1a
363 """Does this reactive trigger fire immediately for all events?"""
364 return self.posture == Posture.Reactive and self.within == timedelta(0) 1ceb
366 _event_pattern: Optional[re.Pattern[str]] = PrivateAttr(None) 1a
368 @property 1a
369 def event_pattern(self) -> re.Pattern[str]: 1a
370 """A regular expression which may be evaluated against any event string to
371 determine if this trigger would be interested in the event"""
372 if self._event_pattern: 1ceb
373 return self._event_pattern 1ceb
375 if not self.expect:
376 # This preserves the trivial match for `expect`, and matches the behavior
377 # of expects() below
378 self._event_pattern = re.compile(".+")
379 else:
380 patterns = [
381 # escape each pattern, then translate wildcards ('*' -> r'.+')
382 re.escape(e).replace("\\*", ".+")
383 for e in self.expect | self.after
384 ]
385 self._event_pattern = re.compile("|".join(patterns))
387 return self._event_pattern
389 def starts_after(self, event: str) -> bool: 1a
390 # Warning: Previously we returned 'True' if there was trivial 'after' criteria.
391 # Although this is not wrong, it led to automations processing more events
392 # than they should have.
393 if not self.after: 1ceb
394 return False 1ceb
396 for candidate in self.after: 1ceb
397 if matches(candidate, event): 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true1ceb
398 return True
399 return False 1ceb
401 def expects(self, event: str) -> bool: 1a
402 if not self.expect: 1ceb
403 return True 1ceb
405 for candidate in self.expect: 1ceb
406 if matches(candidate, event): 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true1ceb
407 return True
408 return False 1ceb
410 def bucketing_key(self, event: ReceivedEvent) -> Tuple[str, ...]: 1a
411 return tuple( 1ceb
412 event.find_resource_label(label) or "" for label in sorted(self.for_each)
413 )
415 def meets_threshold(self, event_count: int) -> bool: 1a
416 if self.posture == Posture.Reactive and event_count >= self.threshold: 416 ↛ 417line 416 didn't jump to line 417 because the condition on line 416 was never true1gceb
417 return True
419 if self.posture == Posture.Proactive and event_count < self.threshold: 1gceb
420 return True 1gceb
422 return False 1ceb
424 def create_automation_state_change_event( 1a
425 self, firing: Firing, trigger_state: TriggerState
426 ) -> ReceivedEvent:
427 """Returns a ReceivedEvent for an automation state change
428 into a triggered or resolved state."""
429 automation = firing.trigger.automation 1cb
430 triggering_event = firing.triggering_event 1cb
432 resource_data = Resource( 1cb
433 {
434 "prefect.resource.id": f"prefect.automation.{automation.id}",
435 "prefect.resource.name": automation.name,
436 }
437 )
439 if self.posture.value: 439 ↛ 442line 439 didn't jump to line 442 because the condition on line 439 was always true1cb
440 resource_data["prefect.posture"] = self.posture.value 1cb
442 return ReceivedEvent( 1cb
443 occurred=firing.triggered,
444 event=f"prefect.automation.{trigger_state.value.lower()}",
445 resource=resource_data,
446 related=(
447 [
448 RelatedResource(
449 {
450 "prefect.resource.id": f"prefect.event.{triggering_event.id}",
451 "prefect.resource.role": "triggering-event",
452 }
453 )
454 ]
455 if triggering_event
456 else []
457 ),
458 payload={
459 "triggering_labels": firing.triggering_labels,
460 "triggering_event": (
461 triggering_event.model_dump(mode="json")
462 if triggering_event
463 else None
464 ),
465 },
466 id=uuid7(),
467 )
470ServerTriggerTypes: TypeAlias = Union[EventTrigger, CompoundTrigger, SequenceTrigger] 1a
471"""The union of all concrete trigger types that a user may actually create""" 1a
473T = TypeVar("T", bound=Trigger) 1a
476class AutomationCore(PrefectBaseModel, extra="ignore"): 1a
477 """Defines an action a user wants to take when a certain number of events
478 do or don't happen to the matching resources"""
480 name: str = Field(default=..., description="The name of this automation") 1a
481 description: str = Field( 1a
482 default="", description="A longer description of this automation"
483 )
485 enabled: bool = Field( 1a
486 default=True, description="Whether this automation will be evaluated"
487 )
488 tags: list[str] = Field( 1a
489 default_factory=list,
490 description="A list of tags associated with this automation",
491 )
493 trigger: ServerTriggerTypes = Field( 1a
494 default=...,
495 description=(
496 "The criteria for which events this Automation covers and how it will "
497 "respond to the presence or absence of those events"
498 ),
499 )
501 actions: list[ServerActionTypes] = Field( 1a
502 default=...,
503 description="The actions to perform when this Automation triggers",
504 )
506 actions_on_trigger: list[ServerActionTypes] = Field( 1a
507 default_factory=list,
508 description="The actions to perform when an Automation goes into a triggered state",
509 )
511 actions_on_resolve: list[ServerActionTypes] = Field( 1a
512 default_factory=list,
513 description="The actions to perform when an Automation goes into a resolving state",
514 )
516 def triggers(self) -> Sequence[Trigger]: 1a
517 """Returns all triggers within this automation"""
518 return self.trigger.all_triggers() 1dcb
520 def triggers_of_type(self, trigger_type: Type[T]) -> Sequence[T]: 1a
521 """Returns all triggers of the specified type within this automation"""
522 return [t for t in self.triggers() if isinstance(t, trigger_type)] 1dcb
524 def trigger_by_id(self, trigger_id: UUID) -> Optional[Trigger]: 1a
525 """Returns the trigger with the given ID, or None if no such trigger exists"""
526 for trigger in self.triggers():
527 if trigger.id == trigger_id:
528 return trigger
529 return None
531 @model_validator(mode="after") 1a
532 def prevent_run_deployment_loops(self) -> Self: 1a
533 """Detects potential infinite loops in automations with RunDeployment actions"""
534 from prefect.server.events.actions import RunDeployment 1dcfb
536 if not self.enabled: 1dcfb
537 # Disabled automations can't cause problems
538 return self 1dcb
540 if (
541 not self.trigger
542 or not isinstance(self.trigger, EventTrigger)
543 or self.trigger.posture != Posture.Reactive
544 ):
545 # Only reactive automations can cause infinite amplification
546 return self 1dcfb
548 if not any(e.startswith("prefect.flow-run.") for e in self.trigger.expect): 548 ↛ 553line 548 didn't jump to line 553 because the condition on line 548 was always true1dcb
549 # Only flow run events can cause infinite amplification
550 return self 1dcb
552 # Every flow run created by a Deployment goes through these states
553 problematic_events = {
554 "prefect.flow-run.Scheduled",
555 "prefect.flow-run.Pending",
556 "prefect.flow-run.Running",
557 "prefect.flow-run.*",
558 }
559 if not problematic_events.intersection(self.trigger.expect):
560 return self
562 actions = [a for a in self.actions if isinstance(a, RunDeployment)]
563 for action in actions:
564 if action.source == "inferred":
565 # Inferred deployments for flow run state change events will always
566 # cause infinite loops, because no matter what filters we place on the
567 # flow run, we're inferring the deployment from it, so we'll always
568 # produce a new flow run that matches those filters.
569 raise ValueError(
570 "Running an inferred deployment from a flow run state change event "
571 "will lead to an infinite loop of flow runs. Please choose a "
572 "specific deployment and add additional filtering labels to the "
573 "match or match_related for this automation's trigger."
574 )
576 if action.source == "selected":
577 # Selected deployments for flow run state changes can cause infinite
578 # loops if there aren't enough filtering labels on the trigger's match
579 # or match_related. While it's still possible to have infinite loops
580 # with additional filters, it's less likely.
581 if self.trigger.match.matches_every_resource_of_kind(
582 "prefect.flow-run"
583 ):
584 relateds = (
585 self.trigger.match_related
586 if isinstance(self.trigger.match_related, list)
587 else [self.trigger.match_related]
588 )
589 if any(
590 related.matches_every_resource_of_kind("prefect.flow-run")
591 for related in relateds
592 ):
593 raise ValueError(
594 "Running a selected deployment from a flow run state "
595 "change event may lead to an infinite loop of flow runs. "
596 "Please include additional filtering labels on either "
597 "match or match_related to narrow down which flow runs "
598 "will trigger this automation to exclude flow runs from "
599 "the deployment you've selected."
600 )
602 return self
605class Automation(ORMBaseModel, AutomationCore, extra="ignore"): 1a
606 def __init__(self, *args: Any, **kwargs: Any): 1a
607 super().__init__(*args, **kwargs) 1dcfb
608 self.trigger._set_parent(self) 1dcfb
610 @classmethod 1a
611 def model_validate( 1a
612 cls: type[Self],
613 obj: Any,
614 *,
615 strict: bool | None = None,
616 from_attributes: bool | None = None,
617 context: dict[str, Any] | None = None,
618 ) -> Self:
619 automation = super().model_validate( 1dcb
620 obj, strict=strict, from_attributes=from_attributes, context=context
621 )
622 automation.trigger._set_parent(automation) 1dcb
623 return automation 1dcb
626class AutomationCreate(AutomationCore, ActionBaseModel, extra="forbid"): 1a
627 owner_resource: Optional[str] = Field( 1a
628 default=None, description="The resource to which this automation belongs"
629 )
632class AutomationUpdate(AutomationCore, ActionBaseModel, extra="forbid"): 1a
633 pass 1a
636class AutomationPartialUpdate(ActionBaseModel, extra="forbid"): 1a
637 enabled: bool = Field(True, description="Whether this automation will be evaluated") 1a
640class AutomationSort(AutoEnum): 1a
641 """Defines automations sorting options."""
643 CREATED_DESC = "CREATED_DESC" 1a
644 UPDATED_DESC = "UPDATED_DESC" 1a
645 NAME_ASC = "NAME_ASC" 1a
646 NAME_DESC = "NAME_DESC" 1a
649class Firing(PrefectBaseModel): 1a
650 """Represents one instance of a trigger firing"""
652 id: UUID = Field(default_factory=uuid7) 1a
654 trigger: ServerTriggerTypes = Field( 1a
655 default=..., description="The trigger that is firing"
656 )
657 trigger_states: Set[TriggerState] = Field( 1a
658 default=...,
659 description="The state changes represented by this Firing",
660 )
661 triggered: DateTime = Field( 1a
662 default=...,
663 description=(
664 "The time at which this trigger fired, which may differ from the "
665 "occurred time of the associated event (as events processing may always "
666 "be slightly delayed)."
667 ),
668 )
669 triggering_labels: Dict[str, str] = Field( 1a
670 default_factory=dict,
671 description=(
672 "The labels associated with this Firing, derived from the underlying "
673 "for_each values of the trigger. Only used in the context "
674 "of EventTriggers."
675 ),
676 )
677 triggering_firings: List[Firing] = Field( 1a
678 default_factory=list,
679 description=(
680 "The firings of the triggers that caused this trigger to fire. Only used "
681 "in the context of CompoundTriggers."
682 ),
683 )
684 triggering_event: Optional[ReceivedEvent] = Field( 1a
685 default=None,
686 description=(
687 "The most recent event associated with this Firing. This may be the "
688 "event that caused the trigger to fire (for Reactive triggers), or the "
689 "last event to match the trigger (for Proactive triggers), or the state "
690 "change event (for a Metric trigger)."
691 ),
692 )
693 triggering_value: Optional[Any] = Field( 1a
694 default=None,
695 description=(
696 "A value associated with this firing of a trigger. Maybe used to "
697 "convey additional information at the point of firing, like the value of "
698 "the last query for a MetricTrigger"
699 ),
700 )
702 @field_validator("trigger_states") 1a
703 @classmethod 1a
704 def validate_trigger_states(cls, value: set[TriggerState]) -> set[TriggerState]: 1a
705 if not value: 705 ↛ 706line 705 didn't jump to line 706 because the condition on line 705 was never true1cfb
706 raise ValueError("At least one trigger state must be provided")
707 return value 1cfb
709 def all_firings(self) -> Sequence[Firing]: 1a
710 return [self] + [ 1cfb
711 f for child in self.triggering_firings for f in child.all_firings()
712 ]
714 def all_events(self) -> Sequence[ReceivedEvent]: 1a
715 events = [self.triggering_event] if self.triggering_event else [] 1cfb
716 return events + [ 1cfb
717 e for child in self.triggering_firings for e in child.all_events()
718 ]
721class TriggeredAction(PrefectBaseModel): 1a
722 """An action caused as the result of an automation"""
724 automation: Automation = Field( 1a
725 ..., description="The Automation that caused this action"
726 )
728 id: UUID = Field( 1a
729 default_factory=uuid7,
730 description="A unique key representing a single triggering of an action",
731 )
733 firing: Optional[Firing] = Field( 1a
734 default=None, description="The Firing that prompted this action"
735 )
737 triggered: DateTime = Field(..., description="When this action was triggered") 1a
738 triggering_labels: Dict[str, str] = Field( 1a
739 ...,
740 description=(
741 "The subset of labels of the Event that triggered this action, "
742 "corresponding to the Automation's for_each. If no for_each is specified, "
743 "this will be an empty set of labels"
744 ),
745 )
746 triggering_event: Optional[ReceivedEvent] = Field( 1a
747 ...,
748 description=(
749 "The last Event to trigger this automation, if applicable. For reactive "
750 "triggers, this will be the event that caused the trigger to fire. For "
751 "proactive triggers, this will be the last event to match the automation, "
752 "if there was one."
753 ),
754 )
755 action: ServerActionTypes = Field( 1a
756 ...,
757 description="The action to perform",
758 )
759 action_index: int = Field( 1a
760 default=0,
761 description="The index of the action within the automation",
762 )
764 def idempotency_key(self) -> str: 1a
765 """Produce a human-friendly idempotency key for this action"""
766 return ", ".join( 1cfb
767 [
768 f"automation {self.automation.id}",
769 f"action {self.action_index}",
770 f"invocation {self.id}",
771 ]
772 )
774 def all_firings(self) -> Sequence[Firing]: 1a
775 return self.firing.all_firings() if self.firing else [] 1cfb
777 def all_events(self) -> Sequence[ReceivedEvent]: 1a
778 return self.firing.all_events() if self.firing else [] 1cfb
781CompoundTrigger.model_rebuild() 1a
782SequenceTrigger.model_rebuild() 1a