Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/schemas/automations.py: 52%
146 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import abc 1a
4import textwrap 1a
5from datetime import timedelta 1a
6from enum import Enum 1a
7from typing import ( 1a
8 Annotated,
9 Any,
10 Dict,
11 List,
12 Literal,
13 Optional,
14 Set,
15 Union,
16 cast,
17)
18from uuid import UUID 1a
20from pydantic import ( 1a
21 Discriminator,
22 Field,
23 PrivateAttr,
24 Tag,
25 field_validator,
26 model_validator,
27)
28from typing_extensions import Self, TypeAlias 1a
30from prefect._internal.schemas.bases import PrefectBaseModel 1a
31from prefect.events.actions import ActionTypes, RunDeployment 1a
32from prefect.utilities.collections import AutoEnum 1a
34from .events import ResourceSpecification 1a
37class Posture(AutoEnum): 1a
38 Reactive = "Reactive" 1a
39 Proactive = "Proactive" 1a
40 Metric = "Metric" 1a
43class Trigger(PrefectBaseModel, abc.ABC, extra="ignore"): # type: ignore[call-arg] 1a
44 """
45 Base class describing a set of criteria that must be satisfied in order to trigger
46 an automation.
47 """
49 type: str 1a
51 @abc.abstractmethod 1a
52 def describe_for_cli(self, indent: int = 0) -> str: 1a
53 """Return a human-readable description of this trigger for the CLI"""
55 # The following allows the regular Trigger class to be used when serving or
56 # deploying flows, analogous to how the Deployment*Trigger classes work
58 _deployment_id: Optional[UUID] = PrivateAttr(default=None) 1a
60 def set_deployment_id(self, deployment_id: UUID) -> None: 1a
61 self._deployment_id = deployment_id
63 def owner_resource(self) -> Optional[str]: 1a
64 return f"prefect.deployment.{self._deployment_id}"
66 def actions(self) -> List[ActionTypes]: 1a
67 assert self._deployment_id
68 return [
69 RunDeployment(
70 source="selected",
71 deployment_id=self._deployment_id,
72 parameters=getattr(self, "parameters", None),
73 job_variables=getattr(self, "job_variables", None),
74 schedule_after=getattr(self, "schedule_after", timedelta(0)),
75 )
76 ]
78 def as_automation(self) -> "AutomationCore": 1a
79 assert self._deployment_id
81 trigger: TriggerTypes = cast(TriggerTypes, self)
83 # This is one of the Deployment*Trigger classes, so translate it over to a
84 # plain Trigger
85 if hasattr(self, "trigger_type"):
86 trigger = self.trigger_type(**self.model_dump())
88 return AutomationCore(
89 name=(
90 getattr(self, "name", None)
91 or f"Automation for deployment {self._deployment_id}"
92 ),
93 description=getattr(self, "description", ""),
94 enabled=getattr(self, "enabled", True),
95 trigger=trigger,
96 actions=self.actions(),
97 owner_resource=self.owner_resource(),
98 )
101class ResourceTrigger(Trigger, abc.ABC): 1a
102 """
103 Base class for triggers that may filter by the labels of resources.
104 """
106 type: str 1a
108 match: ResourceSpecification = Field( 1a
109 default_factory=lambda: ResourceSpecification.model_validate({}),
110 description="Labels for resources which this trigger will match.",
111 )
112 match_related: Union[ResourceSpecification, list[ResourceSpecification]] = Field( 1a
113 default_factory=lambda: ResourceSpecification.model_validate({}),
114 description="Labels for related resources which this trigger will match.",
115 )
118class EventTrigger(ResourceTrigger): 1a
119 """
120 A trigger that fires based on the presence or absence of events within a given
121 period of time.
122 """
124 type: Literal["event"] = "event" 1a
126 after: Set[str] = Field( 1a
127 default_factory=set,
128 description=(
129 "The event(s) which must first been seen to fire this trigger. If "
130 "empty, then fire this trigger immediately. Events may include "
131 "trailing wildcards, like `prefect.flow-run.*`"
132 ),
133 )
134 expect: Set[str] = Field( 1a
135 default_factory=set,
136 description=(
137 "The event(s) this trigger is expecting to see. If empty, this "
138 "trigger will match any event. Events may include trailing wildcards, "
139 "like `prefect.flow-run.*`"
140 ),
141 )
143 for_each: Set[str] = Field( 1a
144 default_factory=set,
145 description=(
146 "Evaluate the trigger separately for each distinct value of these labels "
147 "on the resource. By default, labels refer to the primary resource of the "
148 "triggering event. You may also refer to labels from related "
149 "resources by specifying `related:<role>:<label>`. This will use the "
150 "value of that label for the first related resource in that role. For "
151 'example, `"for_each": ["related:flow:prefect.resource.id"]` would '
152 "evaluate the trigger for each flow."
153 ),
154 )
155 posture: Literal[Posture.Reactive, Posture.Proactive] = Field( # type: ignore[valid-type] 1a
156 default=Posture.Reactive,
157 description=(
158 "The posture of this trigger, either Reactive or Proactive. Reactive "
159 "triggers respond to the _presence_ of the expected events, while "
160 "Proactive triggers respond to the _absence_ of those expected events."
161 ),
162 )
163 threshold: int = Field( 1a
164 default=1,
165 description=(
166 "The number of events required for this trigger to fire (for "
167 "Reactive triggers), or the number of events expected (for Proactive "
168 "triggers)"
169 ),
170 )
171 within: timedelta = Field( 1a
172 default=timedelta(seconds=0),
173 ge=timedelta(seconds=0),
174 description=(
175 "The time period over which the events must occur. For Reactive triggers, "
176 "this may be as low as 0 seconds, but must be at least 10 seconds for "
177 "Proactive triggers"
178 ),
179 )
181 @model_validator(mode="before") 1a
182 @classmethod 1a
183 def enforce_minimum_within_for_proactive_triggers( 1a
184 cls, data: Dict[str, Any]
185 ) -> Dict[str, Any]:
186 if not isinstance(data, dict):
187 return data
189 if "within" in data and data["within"] is None:
190 raise ValueError("`within` should be a valid timedelta")
192 posture: Optional[Posture] = data.get("posture")
193 within: Optional[timedelta] = data.get("within")
195 if isinstance(within, (int, float)):
196 within = timedelta(seconds=within)
198 if posture == Posture.Proactive:
199 if not within or within == timedelta(0):
200 within = timedelta(seconds=10.0)
201 elif within < timedelta(seconds=10.0):
202 raise ValueError(
203 "`within` for Proactive triggers must be greater than or equal to "
204 "10 seconds"
205 )
207 if within:
208 data = {**data, "within": within}
209 return data
211 def describe_for_cli(self, indent: int = 0) -> str: 1a
212 """Return a human-readable description of this trigger for the CLI"""
213 if self.posture == Posture.Reactive:
214 return textwrap.indent(
215 "\n".join(
216 [
217 f"Reactive: expecting {self.threshold} of {self.expect}",
218 ],
219 ),
220 prefix=" " * indent,
221 )
222 else:
223 return textwrap.indent(
224 "\n".join(
225 [
226 f"Proactive: expecting {self.threshold} {self.expect} event "
227 f"within {self.within}",
228 ],
229 ),
230 prefix=" " * indent,
231 )
234class MetricTriggerOperator(Enum): 1a
235 LT = "<" 1a
236 LTE = "<=" 1a
237 GT = ">" 1a
238 GTE = ">=" 1a
241class PrefectMetric(Enum): 1a
242 lateness = "lateness" 1a
243 duration = "duration" 1a
244 successes = "successes" 1a
247class MetricTriggerQuery(PrefectBaseModel): 1a
248 """Defines a subset of the Trigger subclass, which is specific
249 to Metric automations, that specify the query configurations
250 and breaching conditions for the Automation"""
252 name: PrefectMetric = Field( 1a
253 ...,
254 description="The name of the metric to query.",
255 )
256 threshold: float = Field( 1a
257 ...,
258 description=(
259 "The threshold value against which we'll compare the query result."
260 ),
261 )
262 operator: MetricTriggerOperator = Field( 1a
263 ...,
264 description=(
265 "The comparative operator (LT / LTE / GT / GTE) used to compare "
266 "the query result against the threshold value."
267 ),
268 )
269 range: timedelta = Field( 1a
270 timedelta(seconds=300), # defaults to 5 minutes
271 description=(
272 "The lookback duration (seconds) for a metric query. This duration is "
273 "used to determine the time range over which the query will be executed. "
274 "The minimum value is 300 seconds (5 minutes)."
275 ),
276 )
277 firing_for: timedelta = Field( 1a
278 timedelta(seconds=300), # defaults to 5 minutes
279 description=(
280 "The duration (seconds) for which the metric query must breach "
281 "or resolve continuously before the state is updated and the "
282 "automation is triggered. "
283 "The minimum value is 300 seconds (5 minutes)."
284 ),
285 )
287 @field_validator("range", "firing_for") 1a
288 def enforce_minimum_range(cls, value: timedelta) -> timedelta: 1a
289 if value < timedelta(seconds=300):
290 raise ValueError("The minimum range is 300 seconds (5 minutes)")
291 return value
294class MetricTrigger(ResourceTrigger): 1a
295 """
296 A trigger that fires based on the results of a metric query.
297 """
299 type: Literal["metric"] = "metric" 1a
301 posture: Literal[Posture.Metric] = Field( # type: ignore[valid-type] 1a
302 Posture.Metric,
303 description="Periodically evaluate the configured metric query.",
304 )
306 metric: MetricTriggerQuery = Field( 1a
307 ...,
308 description="The metric query to evaluate for this trigger. ",
309 )
311 def describe_for_cli(self, indent: int = 0) -> str: 1a
312 """Return a human-readable description of this trigger for the CLI"""
313 m = self.metric
314 return textwrap.indent(
315 "\n".join(
316 [
317 f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}",
318 ]
319 ),
320 prefix=" " * indent,
321 )
324class CompositeTrigger(Trigger, abc.ABC): 1a
325 """
326 Requires some number of triggers to have fired within the given time period.
327 """
329 type: Literal["compound", "sequence"] 1a
330 triggers: List["TriggerTypes"] 1a
331 within: Optional[timedelta] = Field( 1a
332 None,
333 description=(
334 "The time period over which the events must occur. For Reactive triggers, "
335 "this may be as low as 0 seconds, but must be at least 10 seconds for "
336 "Proactive triggers"
337 ),
338 )
341class CompoundTrigger(CompositeTrigger): 1a
342 """A composite trigger that requires some number of triggers to have
343 fired within the given time period"""
345 type: Literal["compound"] = "compound" 1a
346 require: Union[int, Literal["any", "all"]] 1a
348 @model_validator(mode="after") 1a
349 def validate_require(self) -> Self: 1a
350 if isinstance(self.require, int):
351 if self.require < 1:
352 raise ValueError("require must be at least 1")
353 if self.require > len(self.triggers):
354 raise ValueError(
355 "require must be less than or equal to the number of triggers"
356 )
358 return self
360 def describe_for_cli(self, indent: int = 0) -> str: 1a
361 """Return a human-readable description of this trigger for the CLI"""
362 return textwrap.indent(
363 "\n".join(
364 [
365 f"{str(self.require).capitalize()} of:",
366 "\n".join(
367 [
368 trigger.describe_for_cli(indent=indent + 1)
369 for trigger in self.triggers
370 ]
371 ),
372 ]
373 ),
374 prefix=" " * indent,
375 )
378class SequenceTrigger(CompositeTrigger): 1a
379 """A composite trigger that requires some number of triggers to have fired
380 within the given time period in a specific order"""
382 type: Literal["sequence"] = "sequence" 1a
384 def describe_for_cli(self, indent: int = 0) -> str: 1a
385 """Return a human-readable description of this trigger for the CLI"""
386 return textwrap.indent(
387 "\n".join(
388 [
389 "In this order:",
390 "\n".join(
391 [
392 trigger.describe_for_cli(indent=indent + 1)
393 for trigger in self.triggers
394 ]
395 ),
396 ]
397 ),
398 prefix=" " * indent,
399 )
402def trigger_discriminator(value: Any) -> str: 1a
403 """Discriminator for triggers that defaults to 'event' if no type is specified."""
404 if isinstance(value, dict):
405 # Check for explicit type first
406 if "type" in value:
407 return value["type"]
408 # Check for compound/sequence specific fields
409 if "triggers" in value and "require" in value:
410 return "compound"
411 if "triggers" in value and "require" not in value:
412 return "sequence"
413 # Check for metric-specific posture
414 if value.get("posture") == "Metric":
415 return "metric"
416 # Default to event
417 return "event"
418 return getattr(value, "type", "event")
421TriggerTypes: TypeAlias = Annotated[ 1a
422 Union[
423 Annotated[EventTrigger, Tag("event")],
424 Annotated[MetricTrigger, Tag("metric")],
425 Annotated[CompoundTrigger, Tag("compound")],
426 Annotated[SequenceTrigger, Tag("sequence")],
427 ],
428 Discriminator(trigger_discriminator),
429]
430"""The union of all concrete trigger types that a user may actually create""" 1a
432CompoundTrigger.model_rebuild() 1a
433SequenceTrigger.model_rebuild() 1a
436class AutomationCore(PrefectBaseModel, extra="ignore"): # type: ignore[call-arg] 1a
437 """Defines an action a user wants to take when a certain number of events
438 do or don't happen to the matching resources"""
440 name: str = Field(default=..., description="The name of this automation") 1a
441 description: str = Field( 1a
442 default="", description="A longer description of this automation"
443 )
445 enabled: bool = Field( 1a
446 default=True, description="Whether this automation will be evaluated"
447 )
448 tags: List[str] = Field( 1a
449 default_factory=list,
450 description="A list of tags associated with this automation",
451 )
453 trigger: TriggerTypes = Field( 1a
454 default=...,
455 description=(
456 "The criteria for which events this Automation covers and how it will "
457 "respond to the presence or absence of those events"
458 ),
459 )
461 actions: List[ActionTypes] = Field( 1a
462 default=...,
463 description="The actions to perform when this Automation triggers",
464 )
466 actions_on_trigger: List[ActionTypes] = Field( 1a
467 default_factory=list,
468 description="The actions to perform when an Automation goes into a triggered state",
469 )
471 actions_on_resolve: List[ActionTypes] = Field( 1a
472 default_factory=list,
473 description="The actions to perform when an Automation goes into a resolving state",
474 )
476 owner_resource: Optional[str] = Field( 1a
477 default=None, description="The owning resource of this automation"
478 )
481class Automation(AutomationCore): 1a
482 id: UUID = Field(default=..., description="The ID of this automation") 1a