Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/schemas/automations.py: 77%

296 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import abc 1a

4import re 1a

5import weakref 1a

6from datetime import timedelta 1a

7from typing import ( 1a

8 TYPE_CHECKING, 

9 Any, 

10 Dict, 

11 List, 

12 Literal, 

13 Optional, 

14 Sequence, 

15 Set, 

16 Tuple, 

17 Type, 

18 TypeVar, 

19 Union, 

20) 

21from uuid import UUID, uuid4 1a

22 

23from pydantic import ( 1a

24 Field, 

25 PrivateAttr, 

26 field_validator, 

27 model_validator, 

28) 

29from typing_extensions import Self, TypeAlias 1a

30 

31from prefect._internal.uuid7 import uuid7 1a

32from prefect.logging import get_logger 1a

33from prefect.server.events.actions import ServerActionTypes 1a

34from prefect.server.events.schemas.events import ( 1a

35 ReceivedEvent, 

36 RelatedResource, 

37 Resource, 

38 ResourceSpecification, 

39 matches, 

40) 

41from prefect.server.schemas.actions import ActionBaseModel 1a

42from prefect.server.utilities.schemas import ORMBaseModel, PrefectBaseModel 1a

43from prefect.types import DateTime 1a

44from prefect.utilities.collections import AutoEnum 1a

45 

46if TYPE_CHECKING: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true1a

47 import logging 

48 

49logger: "logging.Logger" = get_logger(__name__) 1a

50 

51 

52class Posture(AutoEnum): 1a

53 Reactive = "Reactive" 1a

54 Proactive = "Proactive" 1a

55 Metric = "Metric" 1a

56 

57 

58class TriggerState(AutoEnum): 1a

59 Triggered = "Triggered" 1a

60 Resolved = "Resolved" 1a

61 

62 

63class Trigger(PrefectBaseModel, abc.ABC): 1a

64 """ 

65 Base class describing a set of criteria that must be satisfied in order to trigger 

66 an automation. 

67 """ 

68 

69 type: str 1a

70 

71 id: UUID = Field(default_factory=uuid4, description="The unique ID of this trigger") 1a

72 

73 _automation: Optional[weakref.ref[Any]] = PrivateAttr(None) 1a

74 _parent: Optional[weakref.ref[Any]] = PrivateAttr(None) 1a

75 

76 @property 1a

77 def automation(self) -> "Automation": 1a

78 assert self._automation is not None, "Trigger._automation has not been set" 1gceb

79 value = self._automation() 1gceb

80 assert value is not None, "Trigger._automation has been garbage collected" 1gceb

81 return value 1gceb

82 

83 @property 1a

84 def parent(self) -> "Union[Trigger, Automation]": 1a

85 assert self._parent is not None, "Trigger._parent has not been set" 1cb

86 value = self._parent() 1cb

87 assert value is not None, "Trigger._parent has been garbage collected" 1cb

88 return value 1cb

89 

90 def _set_parent(self, value: "Union[Trigger, Automation]"): 1a

91 if isinstance(value, Automation): 91 ↛ 94line 91 didn't jump to line 94 because the condition on line 91 was always true1dcfb

92 self._automation = weakref.ref(value) 1dcfb

93 self._parent = self._automation 1dcfb

94 elif isinstance(value, Trigger): 

95 self._parent = weakref.ref(value) 

96 self._automation = value._automation 

97 else: # pragma: no cover 

98 raise ValueError("parent must be an Automation or a Trigger") 

99 

100 def reset_ids(self) -> None: 1a

101 """Resets the ID of this trigger and all of its children""" 

102 self.id = uuid4() 1dcb

103 for trigger in self.all_triggers(): 1dcb

104 trigger.id = uuid4() 1dcb

105 

106 def all_triggers(self) -> Sequence[Trigger]: 1a

107 """Returns all triggers within this trigger""" 

108 return [self] 1db

109 

110 @abc.abstractmethod 1a

111 def create_automation_state_change_event( 111 ↛ exitline 111 didn't return from function 'create_automation_state_change_event' because 1a

112 self, firing: "Firing", trigger_state: TriggerState 

113 ) -> ReceivedEvent: ... 

114 

115 

116class CompositeTrigger(Trigger, abc.ABC): 1a

117 """ 

118 Requires some number of triggers to have fired within the given time period. 

119 """ 

120 

121 type: Literal["compound", "sequence"] 1a

122 triggers: List["ServerTriggerTypes"] 1a

123 within: Optional[timedelta] 1a

124 

125 def create_automation_state_change_event( 1a

126 self, firing: Firing, trigger_state: TriggerState 

127 ) -> ReceivedEvent: 

128 """Returns a ReceivedEvent for an automation state change 

129 into a triggered or resolved state.""" 

130 automation = firing.trigger.automation 

131 triggering_event = firing.triggering_event 

132 return ReceivedEvent( 

133 occurred=firing.triggered, 

134 event=f"prefect.automation.{trigger_state.value.lower()}", 

135 resource={ 

136 "prefect.resource.id": f"prefect.automation.{automation.id}", 

137 "prefect.resource.name": automation.name, 

138 }, 

139 related=( 

140 [ 

141 { 

142 "prefect.resource.id": f"prefect.event.{triggering_event.id}", 

143 "prefect.resource.role": "triggering-event", 

144 } 

145 ] 

146 if triggering_event 

147 else [] 

148 ), 

149 payload={ 

150 "triggering_labels": firing.triggering_labels, 

151 "triggering_event": ( 

152 triggering_event.model_dump(mode="json") 

153 if triggering_event 

154 else None 

155 ), 

156 }, 

157 id=uuid7(), 

158 ) 

159 

160 def _set_parent(self, value: "Union[Trigger , Automation]"): 1a

161 super()._set_parent(value) 1dcb

162 for trigger in self.triggers: 162 ↛ 163line 162 didn't jump to line 163 because the loop on line 162 never started1dcb

163 trigger._set_parent(self) 

164 

165 def all_triggers(self) -> Sequence[Trigger]: 1a

166 return [self] + [t for child in self.triggers for t in child.all_triggers()] 1dcb

167 

168 @property 1a

169 def child_trigger_ids(self) -> List[UUID]: 1a

170 return [trigger.id for trigger in self.triggers] 

171 

172 @property 1a

173 def num_expected_firings(self) -> int: 1a

174 return len(self.triggers) 

175 

176 @abc.abstractmethod 1a

177 def ready_to_fire(self, firings: Sequence["Firing"]) -> bool: ... 177 ↛ exitline 177 didn't return from function 'ready_to_fire' because 1a

178 

179 

180class CompoundTrigger(CompositeTrigger): 1a

181 """A composite trigger that requires some number of triggers to have 

182 fired within the given time period""" 

183 

184 type: Literal["compound"] = "compound" 1a

185 require: Union[int, Literal["any", "all"]] 1a

186 

187 @property 1a

188 def num_expected_firings(self) -> int: 1a

189 if self.require == "any": 

190 return 1 

191 elif self.require == "all": 

192 return len(self.triggers) 

193 else: 

194 return int(self.require) 

195 

196 def ready_to_fire(self, firings: Sequence["Firing"]) -> bool: 1a

197 return len(firings) >= self.num_expected_firings 

198 

199 @model_validator(mode="after") 1a

200 def validate_require(self) -> Self: 1a

201 if isinstance(self.require, int): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true1dcb

202 if self.require < 1: 

203 raise ValueError("require must be at least 1") 

204 if self.require > len(self.triggers): 

205 raise ValueError( 

206 "require must be less than or equal to the number of triggers" 

207 ) 

208 

209 return self 1dcb

210 

211 

212class SequenceTrigger(CompositeTrigger): 1a

213 """A composite trigger that requires some number of triggers to have fired 

214 within the given time period in a specific order""" 

215 

216 type: Literal["sequence"] = "sequence" 1a

217 

218 @property 1a

219 def expected_firing_order(self) -> List[UUID]: 1a

220 return [trigger.id for trigger in self.triggers] 

221 

222 def ready_to_fire(self, firings: Sequence["Firing"]) -> bool: 1a

223 actual_firing_order = [ 

224 f.trigger.id for f in sorted(firings, key=lambda f: f.triggered) 

225 ] 

226 return actual_firing_order == self.expected_firing_order 

227 

228 

229class ResourceTrigger(Trigger, abc.ABC): 1a

230 """ 

231 Base class for triggers that may filter by the labels of resources. 

232 """ 

233 

234 type: str 1a

235 

236 match: ResourceSpecification = Field( 1a

237 default_factory=lambda: ResourceSpecification.model_validate({}), 

238 description="Labels for resources which this trigger will match.", 

239 ) 

240 match_related: Union[ResourceSpecification, list[ResourceSpecification]] = Field( 1a

241 default_factory=lambda: ResourceSpecification.model_validate({}), 

242 description="Labels for related resources which this trigger will match.", 

243 ) 

244 

245 def covers_resources( 1a

246 self, resource: Resource, related: Sequence[RelatedResource] 

247 ) -> bool: 

248 if not self.match.includes([resource]): 1dceb

249 return False 1dceb

250 

251 match_related = self.match_related 1ceb

252 if not isinstance(match_related, list): 252 ↛ 255line 252 didn't jump to line 255 because the condition on line 252 was always true1ceb

253 match_related = [match_related] 1ceb

254 

255 if not all(match.includes(related) for match in match_related): 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true1ceb

256 return False 

257 

258 return True 1ceb

259 

260 

261class EventTrigger(ResourceTrigger): 1a

262 """ 

263 A trigger that fires based on the presence or absence of events within a given 

264 period of time. 

265 """ 

266 

267 type: Literal["event"] = "event" 1a

268 

269 after: Set[str] = Field( 1a

270 default_factory=set, 

271 description=( 

272 "The event(s) which must first been seen to fire this trigger. If " 

273 "empty, then fire this trigger immediately. Events may include " 

274 "trailing wildcards, like `prefect.flow-run.*`" 

275 ), 

276 ) 

277 expect: Set[str] = Field( 1a

278 default_factory=set, 

279 description=( 

280 "The event(s) this trigger is expecting to see. If empty, this " 

281 "trigger will match any event. Events may include trailing wildcards, " 

282 "like `prefect.flow-run.*`" 

283 ), 

284 ) 

285 

286 for_each: Set[str] = Field( 1a

287 default_factory=set, 

288 description=( 

289 "Evaluate the trigger separately for each distinct value of these labels " 

290 "on the resource. By default, labels refer to the primary resource of the " 

291 "triggering event. You may also refer to labels from related " 

292 "resources by specifying `related:<role>:<label>`. This will use the " 

293 "value of that label for the first related resource in that role. For " 

294 'example, `"for_each": ["related:flow:prefect.resource.id"]` would ' 

295 "evaluate the trigger for each flow." 

296 ), 

297 ) 

298 posture: Literal[Posture.Reactive, Posture.Proactive] = Field( # type: ignore[valid-type] 1a

299 ..., 

300 description=( 

301 "The posture of this trigger, either Reactive or Proactive. Reactive " 

302 "triggers respond to the _presence_ of the expected events, while " 

303 "Proactive triggers respond to the _absence_ of those expected events." 

304 ), 

305 ) 

306 threshold: int = Field( 1a

307 1, 

308 description=( 

309 "The number of events required for this trigger to fire (for " 

310 "Reactive triggers), or the number of events expected (for Proactive " 

311 "triggers)" 

312 ), 

313 ) 

314 within: timedelta = Field( 1a

315 timedelta(seconds=0), 

316 ge=timedelta(seconds=0), 

317 description=( 

318 "The time period over which the events must occur. For Reactive triggers, " 

319 "this may be as low as 0 seconds, but must be at least 10 seconds for " 

320 "Proactive triggers" 

321 ), 

322 ) 

323 

324 @model_validator(mode="before") 1a

325 @classmethod 1a

326 def enforce_minimum_within_for_proactive_triggers( 1a

327 cls, data: Dict[str, Any] | Any 

328 ) -> Dict[str, Any]: 

329 if not isinstance(data, dict): 1dcfb

330 return data 1dcb

331 

332 if "within" in data and data["within"] is None: 1dcfb

333 raise ValueError("`within` should be a valid timedelta") 1dcb

334 

335 posture: Optional[Posture] = data.get("posture") 1dcfb

336 within: Optional[timedelta] = data.get("within") 1dcfb

337 

338 if isinstance(within, (int, float)): 1dcfb

339 data["within"] = within = timedelta(seconds=within) 1dcfb

340 

341 if posture == Posture.Proactive: 1dcfb

342 if not within or within == timedelta(0): 1dcfb

343 data["within"] = timedelta(seconds=10.0) 1db

344 elif within < timedelta(seconds=10.0): 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true1dcfb

345 raise ValueError( 

346 "`within` for Proactive triggers must be greater than or equal to " 

347 "10 seconds" 

348 ) 

349 

350 return data 1dcfb

351 

352 def covers(self, event: ReceivedEvent) -> bool: 1a

353 if not self.covers_resources(event.resource, event.related): 1dceb

354 return False 1dceb

355 

356 if not self.event_pattern.match(event.event): 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true1ceb

357 return False 

358 

359 return True 1ceb

360 

361 @property 1a

362 def immediate(self) -> bool: 1a

363 """Does this reactive trigger fire immediately for all events?""" 

364 return self.posture == Posture.Reactive and self.within == timedelta(0) 1ceb

365 

366 _event_pattern: Optional[re.Pattern[str]] = PrivateAttr(None) 1a

367 

368 @property 1a

369 def event_pattern(self) -> re.Pattern[str]: 1a

370 """A regular expression which may be evaluated against any event string to 

371 determine if this trigger would be interested in the event""" 

372 if self._event_pattern: 1ceb

373 return self._event_pattern 1ceb

374 

375 if not self.expect: 

376 # This preserves the trivial match for `expect`, and matches the behavior 

377 # of expects() below 

378 self._event_pattern = re.compile(".+") 

379 else: 

380 patterns = [ 

381 # escape each pattern, then translate wildcards ('*' -> r'.+') 

382 re.escape(e).replace("\\*", ".+") 

383 for e in self.expect | self.after 

384 ] 

385 self._event_pattern = re.compile("|".join(patterns)) 

386 

387 return self._event_pattern 

388 

389 def starts_after(self, event: str) -> bool: 1a

390 # Warning: Previously we returned 'True' if there was trivial 'after' criteria. 

391 # Although this is not wrong, it led to automations processing more events 

392 # than they should have. 

393 if not self.after: 1ceb

394 return False 1ceb

395 

396 for candidate in self.after: 1ceb

397 if matches(candidate, event): 397 ↛ 398line 397 didn't jump to line 398 because the condition on line 397 was never true1ceb

398 return True 

399 return False 1ceb

400 

401 def expects(self, event: str) -> bool: 1a

402 if not self.expect: 1ceb

403 return True 1ceb

404 

405 for candidate in self.expect: 1ceb

406 if matches(candidate, event): 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true1ceb

407 return True 

408 return False 1ceb

409 

410 def bucketing_key(self, event: ReceivedEvent) -> Tuple[str, ...]: 1a

411 return tuple( 1ceb

412 event.find_resource_label(label) or "" for label in sorted(self.for_each) 

413 ) 

414 

415 def meets_threshold(self, event_count: int) -> bool: 1a

416 if self.posture == Posture.Reactive and event_count >= self.threshold: 416 ↛ 417line 416 didn't jump to line 417 because the condition on line 416 was never true1gceb

417 return True 

418 

419 if self.posture == Posture.Proactive and event_count < self.threshold: 1gceb

420 return True 1gceb

421 

422 return False 1ceb

423 

424 def create_automation_state_change_event( 1a

425 self, firing: Firing, trigger_state: TriggerState 

426 ) -> ReceivedEvent: 

427 """Returns a ReceivedEvent for an automation state change 

428 into a triggered or resolved state.""" 

429 automation = firing.trigger.automation 1cb

430 triggering_event = firing.triggering_event 1cb

431 

432 resource_data = Resource( 1cb

433 { 

434 "prefect.resource.id": f"prefect.automation.{automation.id}", 

435 "prefect.resource.name": automation.name, 

436 } 

437 ) 

438 

439 if self.posture.value: 439 ↛ 442line 439 didn't jump to line 442 because the condition on line 439 was always true1cb

440 resource_data["prefect.posture"] = self.posture.value 1cb

441 

442 return ReceivedEvent( 1cb

443 occurred=firing.triggered, 

444 event=f"prefect.automation.{trigger_state.value.lower()}", 

445 resource=resource_data, 

446 related=( 

447 [ 

448 RelatedResource( 

449 { 

450 "prefect.resource.id": f"prefect.event.{triggering_event.id}", 

451 "prefect.resource.role": "triggering-event", 

452 } 

453 ) 

454 ] 

455 if triggering_event 

456 else [] 

457 ), 

458 payload={ 

459 "triggering_labels": firing.triggering_labels, 

460 "triggering_event": ( 

461 triggering_event.model_dump(mode="json") 

462 if triggering_event 

463 else None 

464 ), 

465 }, 

466 id=uuid7(), 

467 ) 

468 

469 

470ServerTriggerTypes: TypeAlias = Union[EventTrigger, CompoundTrigger, SequenceTrigger] 1a

471"""The union of all concrete trigger types that a user may actually create""" 1a

472 

473T = TypeVar("T", bound=Trigger) 1a

474 

475 

476class AutomationCore(PrefectBaseModel, extra="ignore"): 1a

477 """Defines an action a user wants to take when a certain number of events 

478 do or don't happen to the matching resources""" 

479 

480 name: str = Field(default=..., description="The name of this automation") 1a

481 description: str = Field( 1a

482 default="", description="A longer description of this automation" 

483 ) 

484 

485 enabled: bool = Field( 1a

486 default=True, description="Whether this automation will be evaluated" 

487 ) 

488 tags: list[str] = Field( 1a

489 default_factory=list, 

490 description="A list of tags associated with this automation", 

491 ) 

492 

493 trigger: ServerTriggerTypes = Field( 1a

494 default=..., 

495 description=( 

496 "The criteria for which events this Automation covers and how it will " 

497 "respond to the presence or absence of those events" 

498 ), 

499 ) 

500 

501 actions: list[ServerActionTypes] = Field( 1a

502 default=..., 

503 description="The actions to perform when this Automation triggers", 

504 ) 

505 

506 actions_on_trigger: list[ServerActionTypes] = Field( 1a

507 default_factory=list, 

508 description="The actions to perform when an Automation goes into a triggered state", 

509 ) 

510 

511 actions_on_resolve: list[ServerActionTypes] = Field( 1a

512 default_factory=list, 

513 description="The actions to perform when an Automation goes into a resolving state", 

514 ) 

515 

516 def triggers(self) -> Sequence[Trigger]: 1a

517 """Returns all triggers within this automation""" 

518 return self.trigger.all_triggers() 1dcb

519 

520 def triggers_of_type(self, trigger_type: Type[T]) -> Sequence[T]: 1a

521 """Returns all triggers of the specified type within this automation""" 

522 return [t for t in self.triggers() if isinstance(t, trigger_type)] 1dcb

523 

524 def trigger_by_id(self, trigger_id: UUID) -> Optional[Trigger]: 1a

525 """Returns the trigger with the given ID, or None if no such trigger exists""" 

526 for trigger in self.triggers(): 

527 if trigger.id == trigger_id: 

528 return trigger 

529 return None 

530 

531 @model_validator(mode="after") 1a

532 def prevent_run_deployment_loops(self) -> Self: 1a

533 """Detects potential infinite loops in automations with RunDeployment actions""" 

534 from prefect.server.events.actions import RunDeployment 1dcfb

535 

536 if not self.enabled: 1dcfb

537 # Disabled automations can't cause problems 

538 return self 1dcb

539 

540 if ( 

541 not self.trigger 

542 or not isinstance(self.trigger, EventTrigger) 

543 or self.trigger.posture != Posture.Reactive 

544 ): 

545 # Only reactive automations can cause infinite amplification 

546 return self 1dcfb

547 

548 if not any(e.startswith("prefect.flow-run.") for e in self.trigger.expect): 548 ↛ 553line 548 didn't jump to line 553 because the condition on line 548 was always true1dcb

549 # Only flow run events can cause infinite amplification 

550 return self 1dcb

551 

552 # Every flow run created by a Deployment goes through these states 

553 problematic_events = { 

554 "prefect.flow-run.Scheduled", 

555 "prefect.flow-run.Pending", 

556 "prefect.flow-run.Running", 

557 "prefect.flow-run.*", 

558 } 

559 if not problematic_events.intersection(self.trigger.expect): 

560 return self 

561 

562 actions = [a for a in self.actions if isinstance(a, RunDeployment)] 

563 for action in actions: 

564 if action.source == "inferred": 

565 # Inferred deployments for flow run state change events will always 

566 # cause infinite loops, because no matter what filters we place on the 

567 # flow run, we're inferring the deployment from it, so we'll always 

568 # produce a new flow run that matches those filters. 

569 raise ValueError( 

570 "Running an inferred deployment from a flow run state change event " 

571 "will lead to an infinite loop of flow runs. Please choose a " 

572 "specific deployment and add additional filtering labels to the " 

573 "match or match_related for this automation's trigger." 

574 ) 

575 

576 if action.source == "selected": 

577 # Selected deployments for flow run state changes can cause infinite 

578 # loops if there aren't enough filtering labels on the trigger's match 

579 # or match_related. While it's still possible to have infinite loops 

580 # with additional filters, it's less likely. 

581 if self.trigger.match.matches_every_resource_of_kind( 

582 "prefect.flow-run" 

583 ): 

584 relateds = ( 

585 self.trigger.match_related 

586 if isinstance(self.trigger.match_related, list) 

587 else [self.trigger.match_related] 

588 ) 

589 if any( 

590 related.matches_every_resource_of_kind("prefect.flow-run") 

591 for related in relateds 

592 ): 

593 raise ValueError( 

594 "Running a selected deployment from a flow run state " 

595 "change event may lead to an infinite loop of flow runs. " 

596 "Please include additional filtering labels on either " 

597 "match or match_related to narrow down which flow runs " 

598 "will trigger this automation to exclude flow runs from " 

599 "the deployment you've selected." 

600 ) 

601 

602 return self 

603 

604 

605class Automation(ORMBaseModel, AutomationCore, extra="ignore"): 1a

606 def __init__(self, *args: Any, **kwargs: Any): 1a

607 super().__init__(*args, **kwargs) 1dcfb

608 self.trigger._set_parent(self) 1dcfb

609 

610 @classmethod 1a

611 def model_validate( 1a

612 cls: type[Self], 

613 obj: Any, 

614 *, 

615 strict: bool | None = None, 

616 from_attributes: bool | None = None, 

617 context: dict[str, Any] | None = None, 

618 ) -> Self: 

619 automation = super().model_validate( 1dcb

620 obj, strict=strict, from_attributes=from_attributes, context=context 

621 ) 

622 automation.trigger._set_parent(automation) 1dcb

623 return automation 1dcb

624 

625 

626class AutomationCreate(AutomationCore, ActionBaseModel, extra="forbid"): 1a

627 owner_resource: Optional[str] = Field( 1a

628 default=None, description="The resource to which this automation belongs" 

629 ) 

630 

631 

632class AutomationUpdate(AutomationCore, ActionBaseModel, extra="forbid"): 1a

633 pass 1a

634 

635 

636class AutomationPartialUpdate(ActionBaseModel, extra="forbid"): 1a

637 enabled: bool = Field(True, description="Whether this automation will be evaluated") 1a

638 

639 

640class AutomationSort(AutoEnum): 1a

641 """Defines automations sorting options.""" 

642 

643 CREATED_DESC = "CREATED_DESC" 1a

644 UPDATED_DESC = "UPDATED_DESC" 1a

645 NAME_ASC = "NAME_ASC" 1a

646 NAME_DESC = "NAME_DESC" 1a

647 

648 

649class Firing(PrefectBaseModel): 1a

650 """Represents one instance of a trigger firing""" 

651 

652 id: UUID = Field(default_factory=uuid7) 1a

653 

654 trigger: ServerTriggerTypes = Field( 1a

655 default=..., description="The trigger that is firing" 

656 ) 

657 trigger_states: Set[TriggerState] = Field( 1a

658 default=..., 

659 description="The state changes represented by this Firing", 

660 ) 

661 triggered: DateTime = Field( 1a

662 default=..., 

663 description=( 

664 "The time at which this trigger fired, which may differ from the " 

665 "occurred time of the associated event (as events processing may always " 

666 "be slightly delayed)." 

667 ), 

668 ) 

669 triggering_labels: Dict[str, str] = Field( 1a

670 default_factory=dict, 

671 description=( 

672 "The labels associated with this Firing, derived from the underlying " 

673 "for_each values of the trigger. Only used in the context " 

674 "of EventTriggers." 

675 ), 

676 ) 

677 triggering_firings: List[Firing] = Field( 1a

678 default_factory=list, 

679 description=( 

680 "The firings of the triggers that caused this trigger to fire. Only used " 

681 "in the context of CompoundTriggers." 

682 ), 

683 ) 

684 triggering_event: Optional[ReceivedEvent] = Field( 1a

685 default=None, 

686 description=( 

687 "The most recent event associated with this Firing. This may be the " 

688 "event that caused the trigger to fire (for Reactive triggers), or the " 

689 "last event to match the trigger (for Proactive triggers), or the state " 

690 "change event (for a Metric trigger)." 

691 ), 

692 ) 

693 triggering_value: Optional[Any] = Field( 1a

694 default=None, 

695 description=( 

696 "A value associated with this firing of a trigger. Maybe used to " 

697 "convey additional information at the point of firing, like the value of " 

698 "the last query for a MetricTrigger" 

699 ), 

700 ) 

701 

702 @field_validator("trigger_states") 1a

703 @classmethod 1a

704 def validate_trigger_states(cls, value: set[TriggerState]) -> set[TriggerState]: 1a

705 if not value: 705 ↛ 706line 705 didn't jump to line 706 because the condition on line 705 was never true1cfb

706 raise ValueError("At least one trigger state must be provided") 

707 return value 1cfb

708 

709 def all_firings(self) -> Sequence[Firing]: 1a

710 return [self] + [ 1cfb

711 f for child in self.triggering_firings for f in child.all_firings() 

712 ] 

713 

714 def all_events(self) -> Sequence[ReceivedEvent]: 1a

715 events = [self.triggering_event] if self.triggering_event else [] 1cfb

716 return events + [ 1cfb

717 e for child in self.triggering_firings for e in child.all_events() 

718 ] 

719 

720 

721class TriggeredAction(PrefectBaseModel): 1a

722 """An action caused as the result of an automation""" 

723 

724 automation: Automation = Field( 1a

725 ..., description="The Automation that caused this action" 

726 ) 

727 

728 id: UUID = Field( 1a

729 default_factory=uuid7, 

730 description="A unique key representing a single triggering of an action", 

731 ) 

732 

733 firing: Optional[Firing] = Field( 1a

734 default=None, description="The Firing that prompted this action" 

735 ) 

736 

737 triggered: DateTime = Field(..., description="When this action was triggered") 1a

738 triggering_labels: Dict[str, str] = Field( 1a

739 ..., 

740 description=( 

741 "The subset of labels of the Event that triggered this action, " 

742 "corresponding to the Automation's for_each. If no for_each is specified, " 

743 "this will be an empty set of labels" 

744 ), 

745 ) 

746 triggering_event: Optional[ReceivedEvent] = Field( 1a

747 ..., 

748 description=( 

749 "The last Event to trigger this automation, if applicable. For reactive " 

750 "triggers, this will be the event that caused the trigger to fire. For " 

751 "proactive triggers, this will be the last event to match the automation, " 

752 "if there was one." 

753 ), 

754 ) 

755 action: ServerActionTypes = Field( 1a

756 ..., 

757 description="The action to perform", 

758 ) 

759 action_index: int = Field( 1a

760 default=0, 

761 description="The index of the action within the automation", 

762 ) 

763 

764 def idempotency_key(self) -> str: 1a

765 """Produce a human-friendly idempotency key for this action""" 

766 return ", ".join( 1cfb

767 [ 

768 f"automation {self.automation.id}", 

769 f"action {self.action_index}", 

770 f"invocation {self.id}", 

771 ] 

772 ) 

773 

774 def all_firings(self) -> Sequence[Firing]: 1a

775 return self.firing.all_firings() if self.firing else [] 1cfb

776 

777 def all_events(self) -> Sequence[ReceivedEvent]: 1a

778 return self.firing.all_events() if self.firing else [] 1cfb

779 

780 

781CompoundTrigger.model_rebuild() 1a

782SequenceTrigger.model_rebuild() 1a