Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/schemas/automations.py: 40%

296 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import abc 1a

4import re 1a

5import weakref 1a

6from datetime import timedelta 1a

7from typing import ( 1a

8 TYPE_CHECKING, 

9 Any, 

10 Dict, 

11 List, 

12 Literal, 

13 Optional, 

14 Sequence, 

15 Set, 

16 Tuple, 

17 Type, 

18 TypeVar, 

19 Union, 

20) 

21from uuid import UUID, uuid4 1a

22 

23from pydantic import ( 1a

24 Field, 

25 PrivateAttr, 

26 field_validator, 

27 model_validator, 

28) 

29from typing_extensions import Self, TypeAlias 1a

30 

31from prefect._internal.uuid7 import uuid7 1a

32from prefect.logging import get_logger 1a

33from prefect.server.events.actions import ServerActionTypes 1a

34from prefect.server.events.schemas.events import ( 1a

35 ReceivedEvent, 

36 RelatedResource, 

37 Resource, 

38 ResourceSpecification, 

39 matches, 

40) 

41from prefect.server.schemas.actions import ActionBaseModel 1a

42from prefect.server.utilities.schemas import ORMBaseModel, PrefectBaseModel 1a

43from prefect.types import DateTime 1a

44from prefect.utilities.collections import AutoEnum 1a

45 

46if TYPE_CHECKING: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true1a

47 import logging 

48 

49logger: "logging.Logger" = get_logger(__name__) 1a

50 

51 

52class Posture(AutoEnum): 1a

53 Reactive = "Reactive" 1a

54 Proactive = "Proactive" 1a

55 Metric = "Metric" 1a

56 

57 

58class TriggerState(AutoEnum): 1a

59 Triggered = "Triggered" 1a

60 Resolved = "Resolved" 1a

61 

62 

63class Trigger(PrefectBaseModel, abc.ABC): 1a

64 """ 

65 Base class describing a set of criteria that must be satisfied in order to trigger 

66 an automation. 

67 """ 

68 

69 type: str 1a

70 

71 id: UUID = Field(default_factory=uuid4, description="The unique ID of this trigger") 1a

72 

73 _automation: Optional[weakref.ref[Any]] = PrivateAttr(None) 1a

74 _parent: Optional[weakref.ref[Any]] = PrivateAttr(None) 1a

75 

76 @property 1a

77 def automation(self) -> "Automation": 1a

78 assert self._automation is not None, "Trigger._automation has not been set" 

79 value = self._automation() 

80 assert value is not None, "Trigger._automation has been garbage collected" 

81 return value 

82 

83 @property 1a

84 def parent(self) -> "Union[Trigger, Automation]": 1a

85 assert self._parent is not None, "Trigger._parent has not been set" 

86 value = self._parent() 

87 assert value is not None, "Trigger._parent has been garbage collected" 

88 return value 

89 

90 def _set_parent(self, value: "Union[Trigger, Automation]"): 1a

91 if isinstance(value, Automation): 

92 self._automation = weakref.ref(value) 

93 self._parent = self._automation 

94 elif isinstance(value, Trigger): 

95 self._parent = weakref.ref(value) 

96 self._automation = value._automation 

97 else: # pragma: no cover 

98 raise ValueError("parent must be an Automation or a Trigger") 

99 

100 def reset_ids(self) -> None: 1a

101 """Resets the ID of this trigger and all of its children""" 

102 self.id = uuid4() 

103 for trigger in self.all_triggers(): 

104 trigger.id = uuid4() 

105 

106 def all_triggers(self) -> Sequence[Trigger]: 1a

107 """Returns all triggers within this trigger""" 

108 return [self] 

109 

110 @abc.abstractmethod 1a

111 def create_automation_state_change_event( 111 ↛ exitline 111 didn't return from function 'create_automation_state_change_event' because 1a

112 self, firing: "Firing", trigger_state: TriggerState 

113 ) -> ReceivedEvent: ... 

114 

115 

116class CompositeTrigger(Trigger, abc.ABC): 1a

117 """ 

118 Requires some number of triggers to have fired within the given time period. 

119 """ 

120 

121 type: Literal["compound", "sequence"] 1a

122 triggers: List["ServerTriggerTypes"] 1a

123 within: Optional[timedelta] 1a

124 

125 def create_automation_state_change_event( 1a

126 self, firing: Firing, trigger_state: TriggerState 

127 ) -> ReceivedEvent: 

128 """Returns a ReceivedEvent for an automation state change 

129 into a triggered or resolved state.""" 

130 automation = firing.trigger.automation 

131 triggering_event = firing.triggering_event 

132 return ReceivedEvent( 

133 occurred=firing.triggered, 

134 event=f"prefect.automation.{trigger_state.value.lower()}", 

135 resource={ 

136 "prefect.resource.id": f"prefect.automation.{automation.id}", 

137 "prefect.resource.name": automation.name, 

138 }, 

139 related=( 

140 [ 

141 { 

142 "prefect.resource.id": f"prefect.event.{triggering_event.id}", 

143 "prefect.resource.role": "triggering-event", 

144 } 

145 ] 

146 if triggering_event 

147 else [] 

148 ), 

149 payload={ 

150 "triggering_labels": firing.triggering_labels, 

151 "triggering_event": ( 

152 triggering_event.model_dump(mode="json") 

153 if triggering_event 

154 else None 

155 ), 

156 }, 

157 id=uuid7(), 

158 ) 

159 

160 def _set_parent(self, value: "Union[Trigger , Automation]"): 1a

161 super()._set_parent(value) 

162 for trigger in self.triggers: 

163 trigger._set_parent(self) 

164 

165 def all_triggers(self) -> Sequence[Trigger]: 1a

166 return [self] + [t for child in self.triggers for t in child.all_triggers()] 

167 

168 @property 1a

169 def child_trigger_ids(self) -> List[UUID]: 1a

170 return [trigger.id for trigger in self.triggers] 

171 

172 @property 1a

173 def num_expected_firings(self) -> int: 1a

174 return len(self.triggers) 

175 

176 @abc.abstractmethod 1a

177 def ready_to_fire(self, firings: Sequence["Firing"]) -> bool: ... 177 ↛ exitline 177 didn't return from function 'ready_to_fire' because 1a

178 

179 

180class CompoundTrigger(CompositeTrigger): 1a

181 """A composite trigger that requires some number of triggers to have 

182 fired within the given time period""" 

183 

184 type: Literal["compound"] = "compound" 1a

185 require: Union[int, Literal["any", "all"]] 1a

186 

187 @property 1a

188 def num_expected_firings(self) -> int: 1a

189 if self.require == "any": 

190 return 1 

191 elif self.require == "all": 

192 return len(self.triggers) 

193 else: 

194 return int(self.require) 

195 

196 def ready_to_fire(self, firings: Sequence["Firing"]) -> bool: 1a

197 return len(firings) >= self.num_expected_firings 

198 

199 @model_validator(mode="after") 1a

200 def validate_require(self) -> Self: 1a

201 if isinstance(self.require, int): 

202 if self.require < 1: 

203 raise ValueError("require must be at least 1") 

204 if self.require > len(self.triggers): 

205 raise ValueError( 

206 "require must be less than or equal to the number of triggers" 

207 ) 

208 

209 return self 

210 

211 

212class SequenceTrigger(CompositeTrigger): 1a

213 """A composite trigger that requires some number of triggers to have fired 

214 within the given time period in a specific order""" 

215 

216 type: Literal["sequence"] = "sequence" 1a

217 

218 @property 1a

219 def expected_firing_order(self) -> List[UUID]: 1a

220 return [trigger.id for trigger in self.triggers] 

221 

222 def ready_to_fire(self, firings: Sequence["Firing"]) -> bool: 1a

223 actual_firing_order = [ 

224 f.trigger.id for f in sorted(firings, key=lambda f: f.triggered) 

225 ] 

226 return actual_firing_order == self.expected_firing_order 

227 

228 

229class ResourceTrigger(Trigger, abc.ABC): 1a

230 """ 

231 Base class for triggers that may filter by the labels of resources. 

232 """ 

233 

234 type: str 1a

235 

236 match: ResourceSpecification = Field( 1a

237 default_factory=lambda: ResourceSpecification.model_validate({}), 

238 description="Labels for resources which this trigger will match.", 

239 ) 

240 match_related: Union[ResourceSpecification, list[ResourceSpecification]] = Field( 1a

241 default_factory=lambda: ResourceSpecification.model_validate({}), 

242 description="Labels for related resources which this trigger will match.", 

243 ) 

244 

245 def covers_resources( 1a

246 self, resource: Resource, related: Sequence[RelatedResource] 

247 ) -> bool: 

248 if not self.match.includes([resource]): 

249 return False 

250 

251 match_related = self.match_related 

252 if not isinstance(match_related, list): 

253 match_related = [match_related] 

254 

255 if not all(match.includes(related) for match in match_related): 

256 return False 

257 

258 return True 

259 

260 

261class EventTrigger(ResourceTrigger): 1a

262 """ 

263 A trigger that fires based on the presence or absence of events within a given 

264 period of time. 

265 """ 

266 

267 type: Literal["event"] = "event" 1a

268 

269 after: Set[str] = Field( 1a

270 default_factory=set, 

271 description=( 

272 "The event(s) which must first been seen to fire this trigger. If " 

273 "empty, then fire this trigger immediately. Events may include " 

274 "trailing wildcards, like `prefect.flow-run.*`" 

275 ), 

276 ) 

277 expect: Set[str] = Field( 1a

278 default_factory=set, 

279 description=( 

280 "The event(s) this trigger is expecting to see. If empty, this " 

281 "trigger will match any event. Events may include trailing wildcards, " 

282 "like `prefect.flow-run.*`" 

283 ), 

284 ) 

285 

286 for_each: Set[str] = Field( 1a

287 default_factory=set, 

288 description=( 

289 "Evaluate the trigger separately for each distinct value of these labels " 

290 "on the resource. By default, labels refer to the primary resource of the " 

291 "triggering event. You may also refer to labels from related " 

292 "resources by specifying `related:<role>:<label>`. This will use the " 

293 "value of that label for the first related resource in that role. For " 

294 'example, `"for_each": ["related:flow:prefect.resource.id"]` would ' 

295 "evaluate the trigger for each flow." 

296 ), 

297 ) 

298 posture: Literal[Posture.Reactive, Posture.Proactive] = Field( # type: ignore[valid-type] 1a

299 ..., 

300 description=( 

301 "The posture of this trigger, either Reactive or Proactive. Reactive " 

302 "triggers respond to the _presence_ of the expected events, while " 

303 "Proactive triggers respond to the _absence_ of those expected events." 

304 ), 

305 ) 

306 threshold: int = Field( 1a

307 1, 

308 description=( 

309 "The number of events required for this trigger to fire (for " 

310 "Reactive triggers), or the number of events expected (for Proactive " 

311 "triggers)" 

312 ), 

313 ) 

314 within: timedelta = Field( 1a

315 timedelta(seconds=0), 

316 ge=timedelta(seconds=0), 

317 description=( 

318 "The time period over which the events must occur. For Reactive triggers, " 

319 "this may be as low as 0 seconds, but must be at least 10 seconds for " 

320 "Proactive triggers" 

321 ), 

322 ) 

323 

324 @model_validator(mode="before") 1a

325 @classmethod 1a

326 def enforce_minimum_within_for_proactive_triggers( 1a

327 cls, data: Dict[str, Any] | Any 

328 ) -> Dict[str, Any]: 

329 if not isinstance(data, dict): 

330 return data 

331 

332 if "within" in data and data["within"] is None: 

333 raise ValueError("`within` should be a valid timedelta") 

334 

335 posture: Optional[Posture] = data.get("posture") 

336 within: Optional[timedelta] = data.get("within") 

337 

338 if isinstance(within, (int, float)): 

339 data["within"] = within = timedelta(seconds=within) 

340 

341 if posture == Posture.Proactive: 

342 if not within or within == timedelta(0): 

343 data["within"] = timedelta(seconds=10.0) 

344 elif within < timedelta(seconds=10.0): 

345 raise ValueError( 

346 "`within` for Proactive triggers must be greater than or equal to " 

347 "10 seconds" 

348 ) 

349 

350 return data 

351 

352 def covers(self, event: ReceivedEvent) -> bool: 1a

353 if not self.covers_resources(event.resource, event.related): 

354 return False 

355 

356 if not self.event_pattern.match(event.event): 

357 return False 

358 

359 return True 

360 

361 @property 1a

362 def immediate(self) -> bool: 1a

363 """Does this reactive trigger fire immediately for all events?""" 

364 return self.posture == Posture.Reactive and self.within == timedelta(0) 

365 

366 _event_pattern: Optional[re.Pattern[str]] = PrivateAttr(None) 1a

367 

368 @property 1a

369 def event_pattern(self) -> re.Pattern[str]: 1a

370 """A regular expression which may be evaluated against any event string to 

371 determine if this trigger would be interested in the event""" 

372 if self._event_pattern: 

373 return self._event_pattern 

374 

375 if not self.expect: 

376 # This preserves the trivial match for `expect`, and matches the behavior 

377 # of expects() below 

378 self._event_pattern = re.compile(".+") 

379 else: 

380 patterns = [ 

381 # escape each pattern, then translate wildcards ('*' -> r'.+') 

382 re.escape(e).replace("\\*", ".+") 

383 for e in self.expect | self.after 

384 ] 

385 self._event_pattern = re.compile("|".join(patterns)) 

386 

387 return self._event_pattern 

388 

389 def starts_after(self, event: str) -> bool: 1a

390 # Warning: Previously we returned 'True' if there was trivial 'after' criteria. 

391 # Although this is not wrong, it led to automations processing more events 

392 # than they should have. 

393 if not self.after: 

394 return False 

395 

396 for candidate in self.after: 

397 if matches(candidate, event): 

398 return True 

399 return False 

400 

401 def expects(self, event: str) -> bool: 1a

402 if not self.expect: 

403 return True 

404 

405 for candidate in self.expect: 

406 if matches(candidate, event): 

407 return True 

408 return False 

409 

410 def bucketing_key(self, event: ReceivedEvent) -> Tuple[str, ...]: 1a

411 return tuple( 

412 event.find_resource_label(label) or "" for label in sorted(self.for_each) 

413 ) 

414 

415 def meets_threshold(self, event_count: int) -> bool: 1a

416 if self.posture == Posture.Reactive and event_count >= self.threshold: 

417 return True 

418 

419 if self.posture == Posture.Proactive and event_count < self.threshold: 

420 return True 

421 

422 return False 

423 

424 def create_automation_state_change_event( 1a

425 self, firing: Firing, trigger_state: TriggerState 

426 ) -> ReceivedEvent: 

427 """Returns a ReceivedEvent for an automation state change 

428 into a triggered or resolved state.""" 

429 automation = firing.trigger.automation 

430 triggering_event = firing.triggering_event 

431 

432 resource_data = Resource( 

433 { 

434 "prefect.resource.id": f"prefect.automation.{automation.id}", 

435 "prefect.resource.name": automation.name, 

436 } 

437 ) 

438 

439 if self.posture.value: 

440 resource_data["prefect.posture"] = self.posture.value 

441 

442 return ReceivedEvent( 

443 occurred=firing.triggered, 

444 event=f"prefect.automation.{trigger_state.value.lower()}", 

445 resource=resource_data, 

446 related=( 

447 [ 

448 RelatedResource( 

449 { 

450 "prefect.resource.id": f"prefect.event.{triggering_event.id}", 

451 "prefect.resource.role": "triggering-event", 

452 } 

453 ) 

454 ] 

455 if triggering_event 

456 else [] 

457 ), 

458 payload={ 

459 "triggering_labels": firing.triggering_labels, 

460 "triggering_event": ( 

461 triggering_event.model_dump(mode="json") 

462 if triggering_event 

463 else None 

464 ), 

465 }, 

466 id=uuid7(), 

467 ) 

468 

469 

470ServerTriggerTypes: TypeAlias = Union[EventTrigger, CompoundTrigger, SequenceTrigger] 1a

471"""The union of all concrete trigger types that a user may actually create""" 1a

472 

473T = TypeVar("T", bound=Trigger) 1a

474 

475 

476class AutomationCore(PrefectBaseModel, extra="ignore"): 1a

477 """Defines an action a user wants to take when a certain number of events 

478 do or don't happen to the matching resources""" 

479 

480 name: str = Field(default=..., description="The name of this automation") 1a

481 description: str = Field( 1a

482 default="", description="A longer description of this automation" 

483 ) 

484 

485 enabled: bool = Field( 1a

486 default=True, description="Whether this automation will be evaluated" 

487 ) 

488 tags: list[str] = Field( 1a

489 default_factory=list, 

490 description="A list of tags associated with this automation", 

491 ) 

492 

493 trigger: ServerTriggerTypes = Field( 1a

494 default=..., 

495 description=( 

496 "The criteria for which events this Automation covers and how it will " 

497 "respond to the presence or absence of those events" 

498 ), 

499 ) 

500 

501 actions: list[ServerActionTypes] = Field( 1a

502 default=..., 

503 description="The actions to perform when this Automation triggers", 

504 ) 

505 

506 actions_on_trigger: list[ServerActionTypes] = Field( 1a

507 default_factory=list, 

508 description="The actions to perform when an Automation goes into a triggered state", 

509 ) 

510 

511 actions_on_resolve: list[ServerActionTypes] = Field( 1a

512 default_factory=list, 

513 description="The actions to perform when an Automation goes into a resolving state", 

514 ) 

515 

516 def triggers(self) -> Sequence[Trigger]: 1a

517 """Returns all triggers within this automation""" 

518 return self.trigger.all_triggers() 

519 

520 def triggers_of_type(self, trigger_type: Type[T]) -> Sequence[T]: 1a

521 """Returns all triggers of the specified type within this automation""" 

522 return [t for t in self.triggers() if isinstance(t, trigger_type)] 

523 

524 def trigger_by_id(self, trigger_id: UUID) -> Optional[Trigger]: 1a

525 """Returns the trigger with the given ID, or None if no such trigger exists""" 

526 for trigger in self.triggers(): 

527 if trigger.id == trigger_id: 

528 return trigger 

529 return None 

530 

531 @model_validator(mode="after") 1a

532 def prevent_run_deployment_loops(self) -> Self: 1a

533 """Detects potential infinite loops in automations with RunDeployment actions""" 

534 from prefect.server.events.actions import RunDeployment 

535 

536 if not self.enabled: 

537 # Disabled automations can't cause problems 

538 return self 

539 

540 if ( 

541 not self.trigger 

542 or not isinstance(self.trigger, EventTrigger) 

543 or self.trigger.posture != Posture.Reactive 

544 ): 

545 # Only reactive automations can cause infinite amplification 

546 return self 

547 

548 if not any(e.startswith("prefect.flow-run.") for e in self.trigger.expect): 

549 # Only flow run events can cause infinite amplification 

550 return self 

551 

552 # Every flow run created by a Deployment goes through these states 

553 problematic_events = { 

554 "prefect.flow-run.Scheduled", 

555 "prefect.flow-run.Pending", 

556 "prefect.flow-run.Running", 

557 "prefect.flow-run.*", 

558 } 

559 if not problematic_events.intersection(self.trigger.expect): 

560 return self 

561 

562 actions = [a for a in self.actions if isinstance(a, RunDeployment)] 

563 for action in actions: 

564 if action.source == "inferred": 

565 # Inferred deployments for flow run state change events will always 

566 # cause infinite loops, because no matter what filters we place on the 

567 # flow run, we're inferring the deployment from it, so we'll always 

568 # produce a new flow run that matches those filters. 

569 raise ValueError( 

570 "Running an inferred deployment from a flow run state change event " 

571 "will lead to an infinite loop of flow runs. Please choose a " 

572 "specific deployment and add additional filtering labels to the " 

573 "match or match_related for this automation's trigger." 

574 ) 

575 

576 if action.source == "selected": 

577 # Selected deployments for flow run state changes can cause infinite 

578 # loops if there aren't enough filtering labels on the trigger's match 

579 # or match_related. While it's still possible to have infinite loops 

580 # with additional filters, it's less likely. 

581 if self.trigger.match.matches_every_resource_of_kind( 

582 "prefect.flow-run" 

583 ): 

584 relateds = ( 

585 self.trigger.match_related 

586 if isinstance(self.trigger.match_related, list) 

587 else [self.trigger.match_related] 

588 ) 

589 if any( 

590 related.matches_every_resource_of_kind("prefect.flow-run") 

591 for related in relateds 

592 ): 

593 raise ValueError( 

594 "Running a selected deployment from a flow run state " 

595 "change event may lead to an infinite loop of flow runs. " 

596 "Please include additional filtering labels on either " 

597 "match or match_related to narrow down which flow runs " 

598 "will trigger this automation to exclude flow runs from " 

599 "the deployment you've selected." 

600 ) 

601 

602 return self 

603 

604 

605class Automation(ORMBaseModel, AutomationCore, extra="ignore"): 1a

606 def __init__(self, *args: Any, **kwargs: Any): 1a

607 super().__init__(*args, **kwargs) 

608 self.trigger._set_parent(self) 

609 

610 @classmethod 1a

611 def model_validate( 1a

612 cls: type[Self], 

613 obj: Any, 

614 *, 

615 strict: bool | None = None, 

616 from_attributes: bool | None = None, 

617 context: dict[str, Any] | None = None, 

618 ) -> Self: 

619 automation = super().model_validate( 

620 obj, strict=strict, from_attributes=from_attributes, context=context 

621 ) 

622 automation.trigger._set_parent(automation) 

623 return automation 

624 

625 

626class AutomationCreate(AutomationCore, ActionBaseModel, extra="forbid"): 1a

627 owner_resource: Optional[str] = Field( 1a

628 default=None, description="The resource to which this automation belongs" 

629 ) 

630 

631 

632class AutomationUpdate(AutomationCore, ActionBaseModel, extra="forbid"): 1a

633 pass 1a

634 

635 

636class AutomationPartialUpdate(ActionBaseModel, extra="forbid"): 1a

637 enabled: bool = Field(True, description="Whether this automation will be evaluated") 1a

638 

639 

640class AutomationSort(AutoEnum): 1a

641 """Defines automations sorting options.""" 

642 

643 CREATED_DESC = "CREATED_DESC" 1a

644 UPDATED_DESC = "UPDATED_DESC" 1a

645 NAME_ASC = "NAME_ASC" 1a

646 NAME_DESC = "NAME_DESC" 1a

647 

648 

649class Firing(PrefectBaseModel): 1a

650 """Represents one instance of a trigger firing""" 

651 

652 id: UUID = Field(default_factory=uuid7) 1a

653 

654 trigger: ServerTriggerTypes = Field( 1a

655 default=..., description="The trigger that is firing" 

656 ) 

657 trigger_states: Set[TriggerState] = Field( 1a

658 default=..., 

659 description="The state changes represented by this Firing", 

660 ) 

661 triggered: DateTime = Field( 1a

662 default=..., 

663 description=( 

664 "The time at which this trigger fired, which may differ from the " 

665 "occurred time of the associated event (as events processing may always " 

666 "be slightly delayed)." 

667 ), 

668 ) 

669 triggering_labels: Dict[str, str] = Field( 1a

670 default_factory=dict, 

671 description=( 

672 "The labels associated with this Firing, derived from the underlying " 

673 "for_each values of the trigger. Only used in the context " 

674 "of EventTriggers." 

675 ), 

676 ) 

677 triggering_firings: List[Firing] = Field( 1a

678 default_factory=list, 

679 description=( 

680 "The firings of the triggers that caused this trigger to fire. Only used " 

681 "in the context of CompoundTriggers." 

682 ), 

683 ) 

684 triggering_event: Optional[ReceivedEvent] = Field( 1a

685 default=None, 

686 description=( 

687 "The most recent event associated with this Firing. This may be the " 

688 "event that caused the trigger to fire (for Reactive triggers), or the " 

689 "last event to match the trigger (for Proactive triggers), or the state " 

690 "change event (for a Metric trigger)." 

691 ), 

692 ) 

693 triggering_value: Optional[Any] = Field( 1a

694 default=None, 

695 description=( 

696 "A value associated with this firing of a trigger. Maybe used to " 

697 "convey additional information at the point of firing, like the value of " 

698 "the last query for a MetricTrigger" 

699 ), 

700 ) 

701 

702 @field_validator("trigger_states") 1a

703 @classmethod 1a

704 def validate_trigger_states(cls, value: set[TriggerState]) -> set[TriggerState]: 1a

705 if not value: 

706 raise ValueError("At least one trigger state must be provided") 

707 return value 

708 

709 def all_firings(self) -> Sequence[Firing]: 1a

710 return [self] + [ 

711 f for child in self.triggering_firings for f in child.all_firings() 

712 ] 

713 

714 def all_events(self) -> Sequence[ReceivedEvent]: 1a

715 events = [self.triggering_event] if self.triggering_event else [] 

716 return events + [ 

717 e for child in self.triggering_firings for e in child.all_events() 

718 ] 

719 

720 

721class TriggeredAction(PrefectBaseModel): 1a

722 """An action caused as the result of an automation""" 

723 

724 automation: Automation = Field( 1a

725 ..., description="The Automation that caused this action" 

726 ) 

727 

728 id: UUID = Field( 1a

729 default_factory=uuid7, 

730 description="A unique key representing a single triggering of an action", 

731 ) 

732 

733 firing: Optional[Firing] = Field( 1a

734 default=None, description="The Firing that prompted this action" 

735 ) 

736 

737 triggered: DateTime = Field(..., description="When this action was triggered") 1a

738 triggering_labels: Dict[str, str] = Field( 1a

739 ..., 

740 description=( 

741 "The subset of labels of the Event that triggered this action, " 

742 "corresponding to the Automation's for_each. If no for_each is specified, " 

743 "this will be an empty set of labels" 

744 ), 

745 ) 

746 triggering_event: Optional[ReceivedEvent] = Field( 1a

747 ..., 

748 description=( 

749 "The last Event to trigger this automation, if applicable. For reactive " 

750 "triggers, this will be the event that caused the trigger to fire. For " 

751 "proactive triggers, this will be the last event to match the automation, " 

752 "if there was one." 

753 ), 

754 ) 

755 action: ServerActionTypes = Field( 1a

756 ..., 

757 description="The action to perform", 

758 ) 

759 action_index: int = Field( 1a

760 default=0, 

761 description="The index of the action within the automation", 

762 ) 

763 

764 def idempotency_key(self) -> str: 1a

765 """Produce a human-friendly idempotency key for this action""" 

766 return ", ".join( 

767 [ 

768 f"automation {self.automation.id}", 

769 f"action {self.action_index}", 

770 f"invocation {self.id}", 

771 ] 

772 ) 

773 

774 def all_firings(self) -> Sequence[Firing]: 1a

775 return self.firing.all_firings() if self.firing else [] 

776 

777 def all_events(self) -> Sequence[ReceivedEvent]: 1a

778 return self.firing.all_events() if self.firing else [] 

779 

780 

781CompoundTrigger.model_rebuild() 1a

782SequenceTrigger.model_rebuild() 1a