Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/schemas/automations.py: 52%

146 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import abc 1a

4import textwrap 1a

5from datetime import timedelta 1a

6from enum import Enum 1a

7from typing import ( 1a

8 Annotated, 

9 Any, 

10 Dict, 

11 List, 

12 Literal, 

13 Optional, 

14 Set, 

15 Union, 

16 cast, 

17) 

18from uuid import UUID 1a

19 

20from pydantic import ( 1a

21 Discriminator, 

22 Field, 

23 PrivateAttr, 

24 Tag, 

25 field_validator, 

26 model_validator, 

27) 

28from typing_extensions import Self, TypeAlias 1a

29 

30from prefect._internal.schemas.bases import PrefectBaseModel 1a

31from prefect.events.actions import ActionTypes, RunDeployment 1a

32from prefect.utilities.collections import AutoEnum 1a

33 

34from .events import ResourceSpecification 1a

35 

36 

37class Posture(AutoEnum): 1a

38 Reactive = "Reactive" 1a

39 Proactive = "Proactive" 1a

40 Metric = "Metric" 1a

41 

42 

43class Trigger(PrefectBaseModel, abc.ABC, extra="ignore"): # type: ignore[call-arg] 1a

44 """ 

45 Base class describing a set of criteria that must be satisfied in order to trigger 

46 an automation. 

47 """ 

48 

49 type: str 1a

50 

51 @abc.abstractmethod 1a

52 def describe_for_cli(self, indent: int = 0) -> str: 1a

53 """Return a human-readable description of this trigger for the CLI""" 

54 

55 # The following allows the regular Trigger class to be used when serving or 

56 # deploying flows, analogous to how the Deployment*Trigger classes work 

57 

58 _deployment_id: Optional[UUID] = PrivateAttr(default=None) 1a

59 

60 def set_deployment_id(self, deployment_id: UUID) -> None: 1a

61 self._deployment_id = deployment_id 

62 

63 def owner_resource(self) -> Optional[str]: 1a

64 return f"prefect.deployment.{self._deployment_id}" 

65 

66 def actions(self) -> List[ActionTypes]: 1a

67 assert self._deployment_id 

68 return [ 

69 RunDeployment( 

70 source="selected", 

71 deployment_id=self._deployment_id, 

72 parameters=getattr(self, "parameters", None), 

73 job_variables=getattr(self, "job_variables", None), 

74 schedule_after=getattr(self, "schedule_after", timedelta(0)), 

75 ) 

76 ] 

77 

78 def as_automation(self) -> "AutomationCore": 1a

79 assert self._deployment_id 

80 

81 trigger: TriggerTypes = cast(TriggerTypes, self) 

82 

83 # This is one of the Deployment*Trigger classes, so translate it over to a 

84 # plain Trigger 

85 if hasattr(self, "trigger_type"): 

86 trigger = self.trigger_type(**self.model_dump()) 

87 

88 return AutomationCore( 

89 name=( 

90 getattr(self, "name", None) 

91 or f"Automation for deployment {self._deployment_id}" 

92 ), 

93 description=getattr(self, "description", ""), 

94 enabled=getattr(self, "enabled", True), 

95 trigger=trigger, 

96 actions=self.actions(), 

97 owner_resource=self.owner_resource(), 

98 ) 

99 

100 

101class ResourceTrigger(Trigger, abc.ABC): 1a

102 """ 

103 Base class for triggers that may filter by the labels of resources. 

104 """ 

105 

106 type: str 1a

107 

108 match: ResourceSpecification = Field( 1a

109 default_factory=lambda: ResourceSpecification.model_validate({}), 

110 description="Labels for resources which this trigger will match.", 

111 ) 

112 match_related: Union[ResourceSpecification, list[ResourceSpecification]] = Field( 1a

113 default_factory=lambda: ResourceSpecification.model_validate({}), 

114 description="Labels for related resources which this trigger will match.", 

115 ) 

116 

117 

118class EventTrigger(ResourceTrigger): 1a

119 """ 

120 A trigger that fires based on the presence or absence of events within a given 

121 period of time. 

122 """ 

123 

124 type: Literal["event"] = "event" 1a

125 

126 after: Set[str] = Field( 1a

127 default_factory=set, 

128 description=( 

129 "The event(s) which must first been seen to fire this trigger. If " 

130 "empty, then fire this trigger immediately. Events may include " 

131 "trailing wildcards, like `prefect.flow-run.*`" 

132 ), 

133 ) 

134 expect: Set[str] = Field( 1a

135 default_factory=set, 

136 description=( 

137 "The event(s) this trigger is expecting to see. If empty, this " 

138 "trigger will match any event. Events may include trailing wildcards, " 

139 "like `prefect.flow-run.*`" 

140 ), 

141 ) 

142 

143 for_each: Set[str] = Field( 1a

144 default_factory=set, 

145 description=( 

146 "Evaluate the trigger separately for each distinct value of these labels " 

147 "on the resource. By default, labels refer to the primary resource of the " 

148 "triggering event. You may also refer to labels from related " 

149 "resources by specifying `related:<role>:<label>`. This will use the " 

150 "value of that label for the first related resource in that role. For " 

151 'example, `"for_each": ["related:flow:prefect.resource.id"]` would ' 

152 "evaluate the trigger for each flow." 

153 ), 

154 ) 

155 posture: Literal[Posture.Reactive, Posture.Proactive] = Field( # type: ignore[valid-type] 1a

156 default=Posture.Reactive, 

157 description=( 

158 "The posture of this trigger, either Reactive or Proactive. Reactive " 

159 "triggers respond to the _presence_ of the expected events, while " 

160 "Proactive triggers respond to the _absence_ of those expected events." 

161 ), 

162 ) 

163 threshold: int = Field( 1a

164 default=1, 

165 description=( 

166 "The number of events required for this trigger to fire (for " 

167 "Reactive triggers), or the number of events expected (for Proactive " 

168 "triggers)" 

169 ), 

170 ) 

171 within: timedelta = Field( 1a

172 default=timedelta(seconds=0), 

173 ge=timedelta(seconds=0), 

174 description=( 

175 "The time period over which the events must occur. For Reactive triggers, " 

176 "this may be as low as 0 seconds, but must be at least 10 seconds for " 

177 "Proactive triggers" 

178 ), 

179 ) 

180 

181 @model_validator(mode="before") 1a

182 @classmethod 1a

183 def enforce_minimum_within_for_proactive_triggers( 1a

184 cls, data: Dict[str, Any] 

185 ) -> Dict[str, Any]: 

186 if not isinstance(data, dict): 

187 return data 

188 

189 if "within" in data and data["within"] is None: 

190 raise ValueError("`within` should be a valid timedelta") 

191 

192 posture: Optional[Posture] = data.get("posture") 

193 within: Optional[timedelta] = data.get("within") 

194 

195 if isinstance(within, (int, float)): 

196 within = timedelta(seconds=within) 

197 

198 if posture == Posture.Proactive: 

199 if not within or within == timedelta(0): 

200 within = timedelta(seconds=10.0) 

201 elif within < timedelta(seconds=10.0): 

202 raise ValueError( 

203 "`within` for Proactive triggers must be greater than or equal to " 

204 "10 seconds" 

205 ) 

206 

207 if within: 

208 data = {**data, "within": within} 

209 return data 

210 

211 def describe_for_cli(self, indent: int = 0) -> str: 1a

212 """Return a human-readable description of this trigger for the CLI""" 

213 if self.posture == Posture.Reactive: 

214 return textwrap.indent( 

215 "\n".join( 

216 [ 

217 f"Reactive: expecting {self.threshold} of {self.expect}", 

218 ], 

219 ), 

220 prefix=" " * indent, 

221 ) 

222 else: 

223 return textwrap.indent( 

224 "\n".join( 

225 [ 

226 f"Proactive: expecting {self.threshold} {self.expect} event " 

227 f"within {self.within}", 

228 ], 

229 ), 

230 prefix=" " * indent, 

231 ) 

232 

233 

234class MetricTriggerOperator(Enum): 1a

235 LT = "<" 1a

236 LTE = "<=" 1a

237 GT = ">" 1a

238 GTE = ">=" 1a

239 

240 

241class PrefectMetric(Enum): 1a

242 lateness = "lateness" 1a

243 duration = "duration" 1a

244 successes = "successes" 1a

245 

246 

247class MetricTriggerQuery(PrefectBaseModel): 1a

248 """Defines a subset of the Trigger subclass, which is specific 

249 to Metric automations, that specify the query configurations 

250 and breaching conditions for the Automation""" 

251 

252 name: PrefectMetric = Field( 1a

253 ..., 

254 description="The name of the metric to query.", 

255 ) 

256 threshold: float = Field( 1a

257 ..., 

258 description=( 

259 "The threshold value against which we'll compare the query result." 

260 ), 

261 ) 

262 operator: MetricTriggerOperator = Field( 1a

263 ..., 

264 description=( 

265 "The comparative operator (LT / LTE / GT / GTE) used to compare " 

266 "the query result against the threshold value." 

267 ), 

268 ) 

269 range: timedelta = Field( 1a

270 timedelta(seconds=300), # defaults to 5 minutes 

271 description=( 

272 "The lookback duration (seconds) for a metric query. This duration is " 

273 "used to determine the time range over which the query will be executed. " 

274 "The minimum value is 300 seconds (5 minutes)." 

275 ), 

276 ) 

277 firing_for: timedelta = Field( 1a

278 timedelta(seconds=300), # defaults to 5 minutes 

279 description=( 

280 "The duration (seconds) for which the metric query must breach " 

281 "or resolve continuously before the state is updated and the " 

282 "automation is triggered. " 

283 "The minimum value is 300 seconds (5 minutes)." 

284 ), 

285 ) 

286 

287 @field_validator("range", "firing_for") 1a

288 def enforce_minimum_range(cls, value: timedelta) -> timedelta: 1a

289 if value < timedelta(seconds=300): 

290 raise ValueError("The minimum range is 300 seconds (5 minutes)") 

291 return value 

292 

293 

294class MetricTrigger(ResourceTrigger): 1a

295 """ 

296 A trigger that fires based on the results of a metric query. 

297 """ 

298 

299 type: Literal["metric"] = "metric" 1a

300 

301 posture: Literal[Posture.Metric] = Field( # type: ignore[valid-type] 1a

302 Posture.Metric, 

303 description="Periodically evaluate the configured metric query.", 

304 ) 

305 

306 metric: MetricTriggerQuery = Field( 1a

307 ..., 

308 description="The metric query to evaluate for this trigger. ", 

309 ) 

310 

311 def describe_for_cli(self, indent: int = 0) -> str: 1a

312 """Return a human-readable description of this trigger for the CLI""" 

313 m = self.metric 

314 return textwrap.indent( 

315 "\n".join( 

316 [ 

317 f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}", 

318 ] 

319 ), 

320 prefix=" " * indent, 

321 ) 

322 

323 

324class CompositeTrigger(Trigger, abc.ABC): 1a

325 """ 

326 Requires some number of triggers to have fired within the given time period. 

327 """ 

328 

329 type: Literal["compound", "sequence"] 1a

330 triggers: List["TriggerTypes"] 1a

331 within: Optional[timedelta] = Field( 1a

332 None, 

333 description=( 

334 "The time period over which the events must occur. For Reactive triggers, " 

335 "this may be as low as 0 seconds, but must be at least 10 seconds for " 

336 "Proactive triggers" 

337 ), 

338 ) 

339 

340 

341class CompoundTrigger(CompositeTrigger): 1a

342 """A composite trigger that requires some number of triggers to have 

343 fired within the given time period""" 

344 

345 type: Literal["compound"] = "compound" 1a

346 require: Union[int, Literal["any", "all"]] 1a

347 

348 @model_validator(mode="after") 1a

349 def validate_require(self) -> Self: 1a

350 if isinstance(self.require, int): 

351 if self.require < 1: 

352 raise ValueError("require must be at least 1") 

353 if self.require > len(self.triggers): 

354 raise ValueError( 

355 "require must be less than or equal to the number of triggers" 

356 ) 

357 

358 return self 

359 

360 def describe_for_cli(self, indent: int = 0) -> str: 1a

361 """Return a human-readable description of this trigger for the CLI""" 

362 return textwrap.indent( 

363 "\n".join( 

364 [ 

365 f"{str(self.require).capitalize()} of:", 

366 "\n".join( 

367 [ 

368 trigger.describe_for_cli(indent=indent + 1) 

369 for trigger in self.triggers 

370 ] 

371 ), 

372 ] 

373 ), 

374 prefix=" " * indent, 

375 ) 

376 

377 

378class SequenceTrigger(CompositeTrigger): 1a

379 """A composite trigger that requires some number of triggers to have fired 

380 within the given time period in a specific order""" 

381 

382 type: Literal["sequence"] = "sequence" 1a

383 

384 def describe_for_cli(self, indent: int = 0) -> str: 1a

385 """Return a human-readable description of this trigger for the CLI""" 

386 return textwrap.indent( 

387 "\n".join( 

388 [ 

389 "In this order:", 

390 "\n".join( 

391 [ 

392 trigger.describe_for_cli(indent=indent + 1) 

393 for trigger in self.triggers 

394 ] 

395 ), 

396 ] 

397 ), 

398 prefix=" " * indent, 

399 ) 

400 

401 

402def trigger_discriminator(value: Any) -> str: 1a

403 """Discriminator for triggers that defaults to 'event' if no type is specified.""" 

404 if isinstance(value, dict): 

405 # Check for explicit type first 

406 if "type" in value: 

407 return value["type"] 

408 # Check for compound/sequence specific fields 

409 if "triggers" in value and "require" in value: 

410 return "compound" 

411 if "triggers" in value and "require" not in value: 

412 return "sequence" 

413 # Check for metric-specific posture 

414 if value.get("posture") == "Metric": 

415 return "metric" 

416 # Default to event 

417 return "event" 

418 return getattr(value, "type", "event") 

419 

420 

421TriggerTypes: TypeAlias = Annotated[ 1a

422 Union[ 

423 Annotated[EventTrigger, Tag("event")], 

424 Annotated[MetricTrigger, Tag("metric")], 

425 Annotated[CompoundTrigger, Tag("compound")], 

426 Annotated[SequenceTrigger, Tag("sequence")], 

427 ], 

428 Discriminator(trigger_discriminator), 

429] 

430"""The union of all concrete trigger types that a user may actually create""" 1a

431 

432CompoundTrigger.model_rebuild() 1a

433SequenceTrigger.model_rebuild() 1a

434 

435 

436class AutomationCore(PrefectBaseModel, extra="ignore"): # type: ignore[call-arg] 1a

437 """Defines an action a user wants to take when a certain number of events 

438 do or don't happen to the matching resources""" 

439 

440 name: str = Field(default=..., description="The name of this automation") 1a

441 description: str = Field( 1a

442 default="", description="A longer description of this automation" 

443 ) 

444 

445 enabled: bool = Field( 1a

446 default=True, description="Whether this automation will be evaluated" 

447 ) 

448 tags: List[str] = Field( 1a

449 default_factory=list, 

450 description="A list of tags associated with this automation", 

451 ) 

452 

453 trigger: TriggerTypes = Field( 1a

454 default=..., 

455 description=( 

456 "The criteria for which events this Automation covers and how it will " 

457 "respond to the presence or absence of those events" 

458 ), 

459 ) 

460 

461 actions: List[ActionTypes] = Field( 1a

462 default=..., 

463 description="The actions to perform when this Automation triggers", 

464 ) 

465 

466 actions_on_trigger: List[ActionTypes] = Field( 1a

467 default_factory=list, 

468 description="The actions to perform when an Automation goes into a triggered state", 

469 ) 

470 

471 actions_on_resolve: List[ActionTypes] = Field( 1a

472 default_factory=list, 

473 description="The actions to perform when an Automation goes into a resolving state", 

474 ) 

475 

476 owner_resource: Optional[str] = Field( 1a

477 default=None, description="The owning resource of this automation" 

478 ) 

479 

480 

481class Automation(AutomationCore): 1a

482 id: UUID = Field(default=..., description="The ID of this automation") 1a