Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/triggers.py: 25%

382 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2The triggers consumer watches events streaming in from the event bus and decides whether 

3to act on them based on the automations that users have set up. 

4""" 

5 

6import asyncio 1a

7from contextlib import AsyncExitStack, asynccontextmanager 1a

8from datetime import timedelta 1a

9from typing import ( 1a

10 TYPE_CHECKING, 

11 AsyncGenerator, 

12 Collection, 

13 Dict, 

14 List, 

15 Optional, 

16 Tuple, 

17) 

18from uuid import UUID 1a

19 

20import orjson 1a

21import sqlalchemy as sa 1a

22from sqlalchemy.ext.asyncio import AsyncSession 1a

23from typing_extensions import Literal, TypeAlias 1a

24 

25import prefect.types._datetime 1a

26from prefect._internal.retries import retry_async_fn 1a

27from prefect.logging import get_logger 1a

28from prefect.server.database import PrefectDBInterface, db_injector 1a

29from prefect.server.events import messaging 1a

30from prefect.server.events.actions import ServerActionTypes 1a

31from prefect.server.events.models.automations import ( 1a

32 AUTOMATION_CHANGES_CHANNEL, 

33 AutomationChangeEvent, 

34 automations_session, 

35 read_automation, 

36) 

37from prefect.server.events.models.composite_trigger_child_firing import ( 1a

38 clear_child_firings, 

39 clear_old_child_firings, 

40 get_child_firings, 

41 upsert_child_firing, 

42) 

43from prefect.server.events.ordering import ( 1a

44 PRECEDING_EVENT_LOOKBACK, 

45 EventArrivedEarly, 

46 get_triggers_causal_ordering, 

47) 

48from prefect.server.events.schemas.automations import ( 1a

49 Automation, 

50 CompositeTrigger, 

51 EventTrigger, 

52 Firing, 

53 Posture, 

54 Trigger, 

55 TriggeredAction, 

56 TriggerState, 

57) 

58from prefect.server.events.schemas.events import ReceivedEvent 1a

59from prefect.server.utilities.messaging import Message, MessageHandler 1a

60from prefect.server.utilities.postgres_listener import ( 1a

61 get_pg_notify_connection, 

62 pg_listen, 

63) 

64from prefect.settings import PREFECT_EVENTS_EXPIRED_BUCKET_BUFFER 1a

65from prefect.settings.context import get_current_settings 1a

66 

67if TYPE_CHECKING: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true1a

68 import logging 

69 

70 from prefect.server.database.orm_models import ORMAutomationBucket 

71 

72 

73logger: "logging.Logger" = get_logger(__name__) 1a

74 

75AutomationID: TypeAlias = UUID 1a

76TriggerID: TypeAlias = UUID 1a

77 

78 

79AUTOMATION_BUCKET_BATCH_SIZE = 500 1a

80 

81 

82async def evaluate( 1a

83 session: AsyncSession, 

84 trigger: EventTrigger, 

85 bucket: "ORMAutomationBucket", 

86 now: prefect.types._datetime.DateTime, 

87 triggering_event: Optional[ReceivedEvent], 

88) -> "ORMAutomationBucket | None": 

89 """Evaluates an Automation, either triggered by a specific event or proactively 

90 on a time interval. Evaluating a Automation updates the associated counters for 

91 each automation, and will fire the associated action if it has met the threshold.""" 

92 automation = trigger.automation 

93 

94 logging_context = { 

95 "automation": automation.id, 

96 "trigger": trigger.id, 

97 "bucketing_key": bucket.bucketing_key, 

98 "bucket_start": bucket.start, 

99 "bucket_end": bucket.end, 

100 "bucket_initial_count": bucket.count, 

101 "bucket_last_operation": bucket.last_operation, 

102 "now": now, 

103 "triggering_event": ( 

104 { 

105 "id": triggering_event.id, 

106 "event": triggering_event.event, 

107 } 

108 if triggering_event 

109 else None 

110 ), 

111 } 

112 

113 # Implementation notes: 

114 # 

115 # This triggering algorithm maintains an invariant that there is exactly one 

116 # time-based "bucket" open and collecting events for each automation at a time. When 

117 # an event comes in that matches the automation, one of four things can happen: 

118 # 

119 # 1. The event would have matched an older bucket that has either expired or has 

120 # already filled up, and thus is no longer relevant; 

121 # 2. The event matches the current bucket, but the bucket does not meet its 

122 # threshold yet; 

123 # 3. The event matches the current bucket, causes it to meet the the threshold, so 

124 # we fire immediately and advance the bucket to the next time period. 

125 # 4. The event matches the current bucket, but the event is for a future time after 

126 # the current bucket has expired, so we will start the new bucket and re-evaluate 

127 # 

128 # Automations are also evaluated proactively without an event to see if they have 

129 # met their proactive threshold (where not enough events have happened in the time 

130 # period) 

131 

132 # If there was a triggering event, then we need to "spend" this count somewhere, 

133 # either in the currently open bucket or in the next time period's bucket 

134 count = 1 if triggering_event else 0 

135 

136 if now < bucket.start: 

137 # This is an older out-of-order message or a redundant event for a reactive 

138 # trigger that has has already fired, so it should not should not affect the 

139 # current bucket. We can safely ignore this event/timestamp entirely. Case #1 

140 # from the implementation notes above. 

141 logger.debug( 

142 "Automation %s (%r) trigger %s got a late event for keys (%r)", 

143 automation.id, 

144 automation.name, 

145 trigger.id, 

146 bucket.bucketing_key, 

147 extra=logging_context, 

148 ) 

149 return bucket 

150 

151 if count and (trigger.immediate or bucket.start <= now < bucket.end): 

152 # we are still within the automation time period, so spend the count in the 

153 # current bucket 

154 bucket = await increment_bucket(session, bucket, count, triggering_event) 

155 count = 0 

156 

157 # Reactive automations will fire "eagerly", meaning they will fire as _soon_ as the 

158 # threshold is reached, then wait for the rest of their time period before 

159 # firing again. This is done by creating a new bucket in the future after the 

160 # trigger has fired. 

161 

162 # Proactive automations must wait until the whole bucket has expired before they can 

163 # fire, because we can't know if we'll get one late event just in time to cause the 

164 # automation _not_ to fire. 

165 

166 ready_to_fire = trigger.posture == Posture.Reactive or bucket.end <= now 

167 meets_threshold = trigger.meets_threshold(bucket.count) 

168 

169 if ready_to_fire and meets_threshold: 

170 logger.debug( 

171 ( 

172 "Automation %s (%r) trigger %s triggered for keys (%r) %s, " 

173 "having occurred %s times between %s and %s" 

174 ), 

175 automation.id, 

176 automation.name, 

177 trigger.id, 

178 bucket.bucketing_key, 

179 "reactively" if triggering_event else "proactively", 

180 bucket.count, 

181 bucket.start, 

182 bucket.end, 

183 extra=logging_context, 

184 ) 

185 

186 firing = Firing( 

187 trigger=trigger, 

188 trigger_states={TriggerState.Triggered}, 

189 triggered=prefect.types._datetime.now("UTC"), 

190 triggering_labels={ 

191 label: value 

192 for label, value in zip(sorted(trigger.for_each), bucket.bucketing_key) 

193 }, 

194 triggering_event=triggering_event or bucket.last_event, 

195 ) 

196 

197 await fire(session, firing) 

198 

199 # when acting, remove the current bucket from the database immediately to avoid 

200 # potentially double-acting in the case of a crash between now and the next 

201 # time we backup buckets to the database 

202 await remove_bucket(session, bucket) 

203 

204 elif now < bucket.end: 

205 # We didn't fire this time, and also the bucket still has more time, so leave 

206 # before setting up a new future bucket. Case #2 from the implementation notes 

207 # above. 

208 logger.debug( 

209 "Automation %s (%r) trigger %s has more time for keys (%r)", 

210 automation.id, 

211 automation.name, 

212 trigger.id, 

213 bucket.bucketing_key, 

214 extra={ 

215 **logging_context, 

216 "ready_to_fire": ready_to_fire, 

217 "meets_threshold": meets_threshold, 

218 }, 

219 ) 

220 

221 # Special case of a proactive trigger for which the same event satisfies 

222 # both `after` and `expect`. For example, using flow run heartbeats for crash 

223 # detection, after the first heartbeat we expect a subsequent heartbeat or 

224 # terminal state within a given time. 

225 # 

226 # If we've already reached the proactive threshold, we need to remove the 

227 # current bucket and start a new one for the latest event. 

228 if ( 

229 triggering_event 

230 and trigger.posture == Posture.Proactive 

231 and not meets_threshold 

232 and trigger.starts_after(triggering_event.event) 

233 and trigger.expects(triggering_event.event) 

234 ): 

235 await remove_bucket(session, bucket) 

236 return await start_new_bucket( 

237 session, 

238 trigger, 

239 bucketing_key=bucket.bucketing_key, 

240 start=triggering_event.occurred, 

241 end=triggering_event.occurred + trigger.within, 

242 count=0, 

243 last_event=triggering_event, 

244 ) 

245 

246 return bucket 

247 else: 

248 # Case #2 from the implementation notes above. 

249 logger.debug( 

250 "Automation %s (%r) trigger %s not ready to fire for keys (%r)", 

251 automation.id, 

252 automation.name, 

253 trigger.id, 

254 bucket.bucketing_key, 

255 extra={ 

256 **logging_context, 

257 "ready_to_fire": ready_to_fire, 

258 "meets_threshold": meets_threshold, 

259 "bucket_current_count": bucket.count, 

260 }, 

261 ) 

262 

263 # We are now outside of the automation's time period or we triggered this 

264 # time. That means it's time to start a new bucket for the next possible time 

265 # window (if this automation does not require an event to start `after`): 

266 if trigger.after: 

267 # remove the bucket because it should only get re-created if we see another 

268 # appropriate starting event 

269 return await remove_bucket(session, bucket) 

270 else: 

271 if trigger.within == timedelta(seconds=0): 

272 return None 

273 

274 start = prefect.types._datetime.create_datetime_instance(max(bucket.end, now)) 

275 end = start + trigger.within 

276 

277 # If we're processing a reactive trigger and leaving the function with a count 

278 # that we've just spent in the bucket for the next time window, it means that we 

279 # just processed an event that was in the future. It's possible that this event 

280 # was sufficient enough to cause the trigger to fire, so we need to evaluate one 

281 # more time to see if that's the case. This is case #4 from the implementation 

282 # notes above. 

283 if trigger.posture == Posture.Reactive and count > 0: 

284 bucket = await start_new_bucket( 

285 session, 

286 trigger, 

287 bucketing_key=tuple(bucket.bucketing_key), 

288 start=start, 

289 end=end, 

290 count=0, 

291 ) 

292 return await evaluate(session, trigger, bucket, now, triggering_event) 

293 else: 

294 return await start_new_bucket( 

295 session, 

296 trigger, 

297 bucketing_key=tuple(bucket.bucketing_key), 

298 start=start, 

299 end=end, 

300 count=count, 

301 ) 

302 

303 

304async def fire(session: AsyncSession, firing: Firing) -> None: 1a

305 if isinstance(firing.trigger.parent, Automation): 

306 await act(firing) 

307 elif isinstance(firing.trigger.parent, CompositeTrigger): 

308 await evaluate_composite_trigger(session, firing) 

309 else: 

310 raise NotImplementedError( 

311 f"Cannot fire {firing} with parent trigger type {type(firing.trigger.parent)}" 

312 ) 

313 

314 

315async def evaluate_composite_trigger(session: AsyncSession, firing: Firing) -> None: 1a

316 automation = firing.trigger.automation 

317 

318 assert isinstance(firing.trigger.parent, CompositeTrigger) 

319 trigger: CompositeTrigger = firing.trigger.parent 

320 

321 # If we only need to see 1 child firing, 

322 # then the parent trigger can fire immediately. 

323 if trigger.num_expected_firings == 1: 

324 logger.info( 

325 "Automation %s (%r) %s trigger %s fired (shortcut)", 

326 automation.id, 

327 automation.name, 

328 trigger.type, 

329 trigger.id, 

330 extra={ 

331 "automation": automation.id, 

332 "trigger": trigger.id, 

333 "trigger_type": trigger.type, 

334 "firings": firing.id, 

335 }, 

336 ) 

337 await fire( 

338 session, 

339 Firing( 

340 trigger=trigger, 

341 trigger_states={TriggerState.Triggered}, 

342 triggered=prefect.types._datetime.now("UTC"), 

343 triggering_firings=[firing], 

344 triggering_event=firing.triggering_event, 

345 ), 

346 ) 

347 return 

348 

349 # If we're only looking within a certain time horizon, remove any older firings that 

350 # should no longer be considered as satisfying this trigger 

351 if trigger.within is not None: 

352 await clear_old_child_firings( 

353 session, trigger, firing.triggered - trigger.within 

354 ) 

355 

356 # Otherwise we need N child firings. We'll upsert this firing and then check 

357 # what the current state of the world is. If we have enough firings, we'll 

358 # fire the parent trigger. 

359 await upsert_child_firing(session, firing) 

360 firings: list[Firing] = [ 

361 cf.child_firing for cf in await get_child_firings(session, trigger) 

362 ] 

363 firing_ids: set[UUID] = {f.id for f in firings} 

364 

365 # If our current firing no longer exists when we read firings 

366 # another firing has superseded it, and we should defer to that one 

367 if firing.id not in firing_ids: 

368 return 

369 

370 if trigger.ready_to_fire(firings): 

371 logger.info( 

372 "Automation %s (%r) %s trigger %s fired", 

373 automation.id, 

374 automation.name, 

375 trigger.type, 

376 trigger.id, 

377 extra={ 

378 "automation": automation.id, 

379 "trigger": trigger.id, 

380 "trigger_type": trigger.type, 

381 "firings": ",".join(str(f.id) for f in firings), 

382 }, 

383 ) 

384 

385 # clear by firing id 

386 await clear_child_firings(session, trigger, firing_ids=list(firing_ids)) 

387 

388 await fire( 

389 session, 

390 Firing( 

391 trigger=trigger, 

392 trigger_states={TriggerState.Triggered}, 

393 triggered=prefect.types._datetime.now("UTC"), 

394 triggering_firings=firings, 

395 triggering_event=firing.triggering_event, 

396 ), 

397 ) 

398 

399 

400async def act(firing: Firing) -> None: 1a

401 """Given a Automation that has been triggered, the triggering labels and event 

402 (if there was one), publish an action for the `actions` service to process.""" 

403 automation = firing.trigger.automation 

404 

405 state_change_events: Dict[TriggerState, ReceivedEvent] = { 

406 trigger_state: firing.trigger.create_automation_state_change_event( 

407 firing=firing, 

408 trigger_state=trigger_state, 

409 ) 

410 for trigger_state in sorted(firing.trigger_states, key=list(TriggerState).index) 

411 } 

412 await messaging.publish(state_change_events.values()) 

413 

414 # By default, all `automation.actions` are fired 

415 source_actions: List[Tuple[Optional[ReceivedEvent], ServerActionTypes]] = [ 

416 (firing.triggering_event, action) for action in automation.actions 

417 ] 

418 

419 # Conditionally add in actions that fire on specific trigger states 

420 if TriggerState.Triggered in firing.trigger_states: 

421 source_actions += [ 

422 (state_change_events[TriggerState.Triggered], action) 

423 for action in automation.actions_on_trigger 

424 ] 

425 

426 if TriggerState.Resolved in firing.trigger_states: 

427 source_actions += [ 

428 (state_change_events[TriggerState.Resolved], action) 

429 for action in automation.actions_on_resolve 

430 ] 

431 

432 actions = [ 

433 TriggeredAction( 

434 automation=automation, 

435 firing=firing, 

436 triggered=firing.triggered, 

437 triggering_labels=firing.triggering_labels, 

438 triggering_event=action_triggering_event, 

439 action=action, 

440 action_index=index, 

441 ) 

442 for index, (action_triggering_event, action) in enumerate(source_actions) 

443 ] 

444 

445 async with messaging.create_actions_publisher() as publisher: 

446 for action in actions: 

447 await publisher.publish_data(action.model_dump_json().encode(), {}) 

448 

449 

450__events_clock_lock: Optional[asyncio.Lock] = None 1a

451_events_clock: Optional[float] = None 1a

452_events_clock_updated: Optional[float] = None 1a

453 

454 

455def _events_clock_lock() -> asyncio.Lock: 1a

456 global __events_clock_lock 

457 if __events_clock_lock is None: 457 ↛ 459line 457 didn't jump to line 459 because the condition on line 457 was always true1b

458 __events_clock_lock = asyncio.Lock() 1b

459 return __events_clock_lock 1b

460 

461 

462async def update_events_clock(event: ReceivedEvent) -> None: 1a

463 global _events_clock, _events_clock_updated 

464 async with _events_clock_lock(): 

465 # we want the offset to be negative to represent that we are always 

466 # processing events behind realtime... 

467 now = prefect.types._datetime.now("UTC").timestamp() 

468 event_timestamp = event.occurred.timestamp() 

469 offset = event_timestamp - now 

470 

471 # ...and we should clamp this value to zero so we don't inadvertently look like 

472 # we are processing the future 

473 if offset > 0.0: 

474 event_timestamp = now 

475 

476 if not _events_clock or event_timestamp >= _events_clock: 

477 _events_clock = event_timestamp 

478 

479 _events_clock_updated = now 

480 

481 

482async def get_events_clock() -> Optional[float]: 1a

483 global _events_clock 

484 return _events_clock 1b

485 

486 

487async def get_events_clock_offset() -> float: 1a

488 """Calculate the current clock offset. This takes into account both the `occurred` 

489 of the last event, as well as the time we _saw_ the last event. This helps to 

490 ensure that in low volume environments, we don't end up getting huge offsets.""" 

491 global _events_clock, _events_clock_updated 

492 

493 async with _events_clock_lock(): 1b

494 if _events_clock is None or _events_clock_updated is None: 494 ↛ 497line 494 didn't jump to line 497 because the condition on line 494 was always true1b

495 return 0.0 1b

496 

497 now: float = prefect.types._datetime.now("UTC").timestamp() 1b

498 offset = (_events_clock - now) + (now - _events_clock_updated) 

499 

500 return offset 1b

501 

502 

503async def reset_events_clock() -> None: 1a

504 global _events_clock, _events_clock_updated 

505 async with _events_clock_lock(): 

506 _events_clock = None 

507 _events_clock_updated = None 1b

508 

509 

510async def reactive_evaluation(event: ReceivedEvent, depth: int = 0) -> None: 1a

511 """ 

512 Evaluate all automations that may apply to this event. 

513 

514 Args: 

515 event (ReceivedEvent): The event to evaluate. This object contains all the necessary information 

516 about the event, including its type, associated resources, and metadata. 

517 depth (int, optional): The current recursion depth. This is used to prevent infinite recursion 

518 due to cyclic event dependencies. Defaults to 0 and is incremented with 

519 each recursive call. 

520 

521 """ 

522 async with AsyncExitStack() as stack: 

523 await update_events_clock(event) 

524 await stack.enter_async_context( 

525 get_triggers_causal_ordering().preceding_event_confirmed( 

526 reactive_evaluation, event, depth 

527 ) 

528 ) 

529 

530 interested_triggers = find_interested_triggers(event) 

531 if not interested_triggers: 

532 return 

533 

534 for trigger in interested_triggers: 

535 logger.info( 

536 "Automation %s, trigger %s covers event %r (%s) for %r at %r", 

537 trigger.automation.id, 

538 trigger.id, 

539 event.event, 

540 event.id, 

541 event.resource.id, 

542 event.occurred.isoformat(), 

543 ) 

544 

545 bucketing_key = trigger.bucketing_key(event) 

546 

547 async with automations_session(begin_transaction=True) as session: 

548 try: 

549 bucket: Optional["ORMAutomationBucket"] = None 

550 

551 if trigger.after and trigger.starts_after(event.event): 

552 # When an event matches both the after and expect, each event 

553 # can both start a new bucket and increment the bucket that was 

554 # started by the previous event. Here we offset the bucket to 

555 # start at -1 so that the first event will leave the bucket at 0 

556 # after evaluation. See the tests: 

557 # 

558 # test_same_event_in_expect_and_after_never_reacts_immediately 

559 # test_same_event_in_expect_and_after_reacts_after_threshold_is_met 

560 # test_same_event_in_expect_and_after_proactively_does_not_fire 

561 # test_same_event_in_expect_and_after_proactively_fires 

562 # 

563 # in test_triggers_regressions.py for examples of how we expect 

564 # this to behave. 

565 # 

566 # https://github.com/PrefectHQ/nebula/issues/4201 

567 initial_count = -1 if trigger.expects(event.event) else 0 

568 bucket = await ensure_bucket( 

569 session, 

570 trigger, 

571 bucketing_key, 

572 start=event.occurred, 

573 end=event.occurred + trigger.within, 

574 last_event=event, 

575 initial_count=initial_count, 

576 ) 

577 

578 if ( 

579 not bucket 

580 and not trigger.after 

581 and trigger.expects(event.event) 

582 ): 

583 # When ensuring a bucket and _creating it for the first time_, 

584 # use an old time so that we can catch any other events flowing 

585 # through the system at the same time even if they are out of 

586 # order. After the trigger fires and creates its next bucket, 

587 # time will start from that point forward. We'll use our 

588 # preceding event lookback variable as the horizon that we'll 

589 # accept these older events. 

590 # 

591 # https://github.com/PrefectHQ/nebula/issues/7230 

592 start = event.occurred - PRECEDING_EVENT_LOOKBACK 

593 

594 bucket = await ensure_bucket( 

595 session, 

596 trigger, 

597 bucketing_key=bucketing_key, 

598 start=start, 

599 end=event.occurred + trigger.within, 

600 last_event=event, 

601 ) 

602 

603 if not trigger.expects(event.event): 

604 continue 

605 

606 if not bucket: 

607 bucket = await read_bucket(session, trigger, bucketing_key) 

608 if not bucket: 

609 continue 

610 

611 await evaluate( 

612 session, 

613 trigger, 

614 bucket, 

615 event.occurred, 

616 triggering_event=event, 

617 ) 

618 finally: 

619 await session.commit() 

620 

621 

622# retry on operational errors to account for db flakiness with sqlite 

623@retry_async_fn(max_attempts=3, retry_on_exceptions=(sa.exc.OperationalError,)) 1a

624async def get_lost_followers() -> List[ReceivedEvent]: 1a

625 """Get followers that have been sitting around longer than our lookback""" 

626 return await get_triggers_causal_ordering().get_lost_followers() 1b

627 

628 

629async def periodic_evaluation(now: prefect.types._datetime.DateTime) -> None: 1a

630 """Periodic tasks that should be run regularly, but not as often as every event""" 

631 offset = await get_events_clock_offset() 1b

632 as_of = now + timedelta(seconds=offset) 1b

633 

634 logger.debug("Running periodic evaluation as of %s (offset %ss)", as_of, offset) 1b

635 

636 # Any followers that have been sitting around longer than our lookback are never 

637 # going to see their leader event (maybe it was lost or took too long to arrive). 

638 # These events can just be evaluated now in the order they occurred. 

639 for event in await get_lost_followers(): 639 ↛ 640line 639 didn't jump to line 640 because the loop on line 639 never started1b

640 await reactive_evaluation(event) 

641 

642 async with automations_session() as session: 1b

643 await sweep_closed_buckets( 1b

644 session, 

645 as_of - PREFECT_EVENTS_EXPIRED_BUCKET_BUFFER.value(), 

646 ) 

647 await session.commit() 1b

648 

649 

650async def evaluate_periodically(periodic_granularity: timedelta) -> None: 1a

651 """Runs periodic evaluation on the given interval""" 

652 logger.debug( 1b

653 "Starting periodic evaluation task every %s seconds", 

654 periodic_granularity.total_seconds(), 

655 ) 

656 while True: 1b

657 try: 1b

658 await periodic_evaluation(prefect.types._datetime.now("UTC")) 1b

659 except Exception: 

660 logger.exception("Error running periodic evaluation") 

661 finally: 

662 await asyncio.sleep(periodic_granularity.total_seconds()) 1b

663 

664 

665# The currently loaded automations for this shard, organized both by ID and by 

666# account and workspace 

667automations_by_id: Dict[UUID, Automation] = {} 1a

668triggers: Dict[TriggerID, EventTrigger] = {} 1a

669next_proactive_runs: Dict[TriggerID, prefect.types._datetime.DateTime] = {} 1a

670 

671# This lock governs any changes to the set of loaded automations; any routine that will 

672# add/remove automations must be holding this lock when it does so. It's best to use 

673# the methods below to access the loaded set of automations. 

674__automations_lock: Optional[asyncio.Lock] = None 1a

675 

676 

677def _automations_lock() -> asyncio.Lock: 1a

678 global __automations_lock 

679 if __automations_lock is None: 

680 __automations_lock = asyncio.Lock() 

681 return __automations_lock 

682 

683 

684def find_interested_triggers(event: ReceivedEvent) -> Collection[EventTrigger]: 1a

685 candidates = triggers.values() 

686 return [trigger for trigger in candidates if trigger.covers(event)] 

687 

688 

689def load_automation(automation: Optional[Automation]) -> None: 1a

690 """Loads the given automation into memory so that it is available for evaluations""" 

691 if not automation: 

692 return 

693 

694 event_triggers = automation.triggers_of_type(EventTrigger) 

695 

696 if not automation.enabled or not event_triggers: 

697 forget_automation(automation.id) 

698 return 

699 

700 automations_by_id[automation.id] = automation 

701 

702 for trigger in event_triggers: 

703 triggers[trigger.id] = trigger 

704 next_proactive_runs.pop(trigger.id, None) 

705 

706 

707def forget_automation(automation_id: UUID) -> None: 1a

708 """Unloads the given automation from memory""" 

709 if automation := automations_by_id.pop(automation_id, None): 

710 for trigger in automation.triggers(): 

711 triggers.pop(trigger.id, None) 

712 next_proactive_runs.pop(trigger.id, None) 

713 

714 

715async def automation_changed( 1a

716 automation_id: UUID, 

717 event: Literal["automation__created", "automation__updated", "automation__deleted"], 

718) -> None: 

719 async with _automations_lock(): 

720 if event in ("automation__deleted", "automation__updated"): 

721 forget_automation(automation_id) 

722 

723 if event in ("automation__created", "automation__updated"): 

724 async with automations_session() as session: 

725 automation = await read_automation(session, automation_id) 

726 load_automation(automation) 

727 

728 

729@db_injector 1a

730async def load_automations(db: PrefectDBInterface, session: AsyncSession): 1a

731 """Loads all automations for the given set of accounts""" 

732 query = sa.select(db.Automation) 1a

733 

734 logger.debug("Loading automations") 1a

735 

736 result = await session.execute(query) 1ab

737 for automation in result.scalars().all(): 

738 load_automation(Automation.model_validate(automation, from_attributes=True)) 

739 

740 logger.debug( 

741 "Loaded %s automations with %s triggers", len(automations_by_id), len(triggers) 

742 ) 

743 

744 

745@db_injector 1a

746async def remove_buckets_exceeding_threshold( 1a

747 db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger 

748): 

749 """Deletes bucket where the count has already exceeded the threshold""" 

750 assert isinstance(trigger, EventTrigger), repr(trigger) 

751 await session.execute( 

752 sa.delete(db.AutomationBucket).where( 

753 db.AutomationBucket.automation_id == trigger.automation.id, 

754 db.AutomationBucket.trigger_id == trigger.id, 

755 db.AutomationBucket.count >= trigger.threshold, 

756 ) 

757 ) 

758 

759 

760@db_injector 1a

761async def read_buckets_for_automation( 1a

762 db: PrefectDBInterface, 

763 session: AsyncSession, 

764 trigger: Trigger, 

765 batch_size: int = AUTOMATION_BUCKET_BATCH_SIZE, 

766) -> AsyncGenerator["ORMAutomationBucket", None]: 

767 """Yields buckets for the given automation and trigger in batches.""" 

768 offset = 0 

769 

770 while True: 

771 query = ( 

772 sa.select(db.AutomationBucket) 

773 .where( 

774 db.AutomationBucket.automation_id == trigger.automation.id, 

775 db.AutomationBucket.trigger_id == trigger.id, 

776 ) 

777 .order_by(db.AutomationBucket.start) 

778 .limit(batch_size) 

779 .offset(offset) 

780 ) 

781 

782 result = await session.execute(query) 

783 buckets = result.scalars().all() 

784 

785 if not buckets: 

786 break 

787 

788 for bucket in buckets: 

789 yield bucket 

790 

791 offset += batch_size 

792 

793 

794@db_injector 1a

795async def read_bucket( 1a

796 db: PrefectDBInterface, 

797 session: AsyncSession, 

798 trigger: Trigger, 

799 bucketing_key: Tuple[str, ...], 

800) -> Optional["ORMAutomationBucket"]: 

801 """Gets the bucket this event would fall into for the given Automation, if there is 

802 one currently""" 

803 return await read_bucket_by_trigger_id( 

804 session, 

805 trigger.automation.id, 

806 trigger.id, 

807 bucketing_key, 

808 ) 

809 

810 

811@db_injector 1a

812async def read_bucket_by_trigger_id( 1a

813 db: PrefectDBInterface, 

814 session: AsyncSession, 

815 automation_id: UUID, 

816 trigger_id: UUID, 

817 bucketing_key: Tuple[str, ...], 

818) -> "ORMAutomationBucket | None": 

819 """Gets the bucket this event would fall into for the given Automation, if there is 

820 one currently""" 

821 query = sa.select(db.AutomationBucket).where( 

822 db.AutomationBucket.automation_id == automation_id, 

823 db.AutomationBucket.trigger_id == trigger_id, 

824 db.AutomationBucket.bucketing_key == bucketing_key, 

825 ) 

826 result = await session.execute(query) 

827 bucket = result.scalars().first() 

828 if bucket: 

829 await session.refresh(bucket) 

830 

831 return bucket 

832 

833 

834@db_injector 1a

835async def increment_bucket( 1a

836 db: PrefectDBInterface, 

837 session: AsyncSession, 

838 bucket: "ORMAutomationBucket", 

839 count: int, 

840 last_event: Optional[ReceivedEvent], 

841) -> "ORMAutomationBucket": 

842 """Adds the given count to the bucket, returning the new bucket""" 

843 additional_updates: dict[str, ReceivedEvent] = ( 

844 {"last_event": last_event} if last_event else {} 

845 ) 

846 await session.execute( 

847 db.queries.insert(db.AutomationBucket) 

848 .values( 

849 automation_id=bucket.automation_id, 

850 trigger_id=bucket.trigger_id, 

851 bucketing_key=bucket.bucketing_key, 

852 start=bucket.start, 

853 end=bucket.end, 

854 count=count, 

855 last_operation="increment_bucket[insert]", 

856 ) 

857 .on_conflict_do_update( 

858 index_elements=[ 

859 db.AutomationBucket.automation_id, 

860 db.AutomationBucket.trigger_id, 

861 db.AutomationBucket.bucketing_key, 

862 ], 

863 set_=dict( 

864 count=db.AutomationBucket.count + count, 

865 last_operation="increment_bucket[update]", 

866 updated=prefect.types._datetime.now("UTC"), 

867 **additional_updates, 

868 ), 

869 ) 

870 ) 

871 

872 read_bucket = await read_bucket_by_trigger_id( 

873 session, 

874 bucket.automation_id, 

875 bucket.trigger_id, 

876 tuple(bucket.bucketing_key), 

877 ) 

878 

879 if TYPE_CHECKING: 

880 assert read_bucket is not None 

881 

882 return read_bucket 

883 

884 

885@db_injector 1a

886async def start_new_bucket( 1a

887 db: PrefectDBInterface, 

888 session: AsyncSession, 

889 trigger: EventTrigger, 

890 bucketing_key: Tuple[str, ...], 

891 start: prefect.types._datetime.DateTime, 

892 end: prefect.types._datetime.DateTime, 

893 count: int, 

894 triggered_at: Optional[prefect.types._datetime.DateTime] = None, 

895 last_event: Optional[ReceivedEvent] = None, 

896) -> "ORMAutomationBucket": 

897 """Ensures that a bucket with the given start and end exists with the given count, 

898 returning the new bucket""" 

899 automation = trigger.automation 

900 

901 await session.execute( 

902 db.queries.insert(db.AutomationBucket) 

903 .values( 

904 automation_id=automation.id, 

905 trigger_id=trigger.id, 

906 bucketing_key=bucketing_key, 

907 start=start, 

908 end=end, 

909 count=count, 

910 last_operation="start_new_bucket[insert]", 

911 triggered_at=triggered_at, 

912 last_event=last_event, 

913 ) 

914 .on_conflict_do_update( 

915 index_elements=[ 

916 db.AutomationBucket.automation_id, 

917 db.AutomationBucket.trigger_id, 

918 db.AutomationBucket.bucketing_key, 

919 ], 

920 set_=dict( 

921 start=start, 

922 end=end, 

923 count=count, 

924 last_operation="start_new_bucket[update]", 

925 updated=prefect.types._datetime.now("UTC"), 

926 triggered_at=triggered_at, 

927 last_event=last_event, 

928 ), 

929 ) 

930 ) 

931 

932 read_bucket = await read_bucket_by_trigger_id( 

933 session, 

934 automation.id, 

935 trigger.id, 

936 tuple(bucketing_key), 

937 ) 

938 

939 if TYPE_CHECKING: 

940 assert read_bucket is not None 

941 

942 return read_bucket 

943 

944 

945@db_injector 1a

946async def ensure_bucket( 1a

947 db: PrefectDBInterface, 

948 session: AsyncSession, 

949 trigger: EventTrigger, 

950 bucketing_key: Tuple[str, ...], 

951 start: prefect.types._datetime.DateTime, 

952 end: prefect.types._datetime.DateTime, 

953 last_event: Optional[ReceivedEvent], 

954 initial_count: int = 0, 

955) -> "ORMAutomationBucket": 

956 """Ensures that a bucket has been started for the given automation and key, 

957 returning the current bucket. Will not modify the existing bucket.""" 

958 automation = trigger.automation 

959 additional_updates: dict[str, ReceivedEvent] = ( 

960 {"last_event": last_event} if last_event else {} 

961 ) 

962 await session.execute( 

963 db.queries.insert(db.AutomationBucket) 

964 .values( 

965 automation_id=automation.id, 

966 trigger_id=trigger.id, 

967 bucketing_key=bucketing_key, 

968 last_event=last_event, 

969 start=start, 

970 end=end, 

971 count=initial_count, 

972 last_operation="ensure_bucket[insert]", 

973 ) 

974 .on_conflict_do_update( 

975 index_elements=[ 

976 db.AutomationBucket.automation_id, 

977 db.AutomationBucket.trigger_id, 

978 db.AutomationBucket.bucketing_key, 

979 ], 

980 set_=dict( 

981 # no-op, but this counts as an update so the query returns a row 

982 count=db.AutomationBucket.count, 

983 **additional_updates, 

984 ), 

985 ) 

986 ) 

987 

988 read_bucket = await read_bucket_by_trigger_id( 

989 session, automation.id, trigger.id, tuple(bucketing_key) 

990 ) 

991 

992 if TYPE_CHECKING: 

993 assert read_bucket is not None 

994 

995 return read_bucket 

996 

997 

998@db_injector 1a

999async def remove_bucket( 1a

1000 db: PrefectDBInterface, session: AsyncSession, bucket: "ORMAutomationBucket" 

1001): 

1002 """Removes the given bucket from the database""" 

1003 await session.execute( 

1004 sa.delete(db.AutomationBucket).where( 

1005 db.AutomationBucket.automation_id == bucket.automation_id, 

1006 db.AutomationBucket.trigger_id == bucket.trigger_id, 

1007 db.AutomationBucket.bucketing_key == bucket.bucketing_key, 

1008 ) 

1009 ) 

1010 

1011 

1012@db_injector 1a

1013async def sweep_closed_buckets( 1a

1014 db: PrefectDBInterface, 

1015 session: AsyncSession, 

1016 older_than: prefect.types._datetime.DateTime, 

1017) -> None: 

1018 await session.execute( 1b

1019 sa.delete(db.AutomationBucket).where(db.AutomationBucket.end <= older_than) 

1020 ) 

1021 

1022 

1023async def reset() -> None: 1a

1024 """Resets the in-memory state of the service""" 

1025 await reset_events_clock() 

1026 automations_by_id.clear() 

1027 triggers.clear() 

1028 next_proactive_runs.clear() 

1029 

1030 

1031async def listen_for_automation_changes() -> None: 1a

1032 """ 

1033 Listens for any changes to automations via PostgreSQL NOTIFY/LISTEN, 

1034 and applies those changes to the set of loaded automations. 

1035 """ 

1036 logger.info("Starting automation change listener") 

1037 

1038 while True: 

1039 conn = None 

1040 try: 

1041 conn = await get_pg_notify_connection() 

1042 if not conn: 

1043 logger.debug( 

1044 "PostgreSQL NOTIFY/LISTEN not available (not using PostgreSQL). " 

1045 "Automation changes will not be synchronized across servers." 

1046 ) 

1047 return 

1048 

1049 logger.info( 

1050 f"Listening for automation changes on {AUTOMATION_CHANGES_CHANNEL}" 

1051 ) 

1052 

1053 async for payload in pg_listen( 

1054 conn, 

1055 AUTOMATION_CHANGES_CHANNEL, 

1056 heartbeat_interval=get_current_settings().server.services.triggers.pg_notify_heartbeat_interval_seconds, 

1057 ): 

1058 try: 

1059 data = orjson.loads(payload) 

1060 automation_id = UUID(data["automation_id"]) 

1061 event_type = data["event_type"] 

1062 

1063 logger.info( 

1064 f"Received automation change notification: {event_type} for {automation_id}" 

1065 ) 

1066 

1067 event_map: dict[str, AutomationChangeEvent] = { 

1068 "created": "automation__created", 

1069 "updated": "automation__updated", 

1070 "deleted": "automation__deleted", 

1071 } 

1072 

1073 if event_type in event_map: 

1074 await automation_changed(automation_id, event_map[event_type]) 

1075 else: 

1076 logger.warning(f"Unknown automation event type: {event_type}") 

1077 

1078 except Exception as e: 

1079 logger.error( 

1080 f"Error processing automation change notification: {e}", 

1081 exc_info=True, 

1082 ) 

1083 

1084 except asyncio.CancelledError: 

1085 logger.info("Automation change listener cancelled") 

1086 break 

1087 except Exception as e: 

1088 reconnect_seconds = get_current_settings().server.services.triggers.pg_notify_reconnect_interval_seconds 

1089 logger.error( 

1090 f"Error in automation change listener: {e}. Reconnecting in {reconnect_seconds}s...", 

1091 exc_info=True, 

1092 ) 

1093 await asyncio.sleep(reconnect_seconds) 

1094 finally: 

1095 if conn and not conn.is_closed(): 

1096 await conn.close() 

1097 

1098 

1099@asynccontextmanager 1a

1100async def consumer( 1a

1101 periodic_granularity: timedelta = timedelta(seconds=5), 

1102) -> AsyncGenerator[MessageHandler, None]: 

1103 """The `triggers.consumer` processes all Events arriving on the event bus to 

1104 determine if they meet the automation criteria, queuing up a corresponding 

1105 `TriggeredAction` for the `actions` service if the automation criteria is met.""" 

1106 # Start the automation change listener task 

1107 sync_task = asyncio.create_task(listen_for_automation_changes()) 1a

1108 

1109 async with automations_session() as session: 1ab

1110 await load_automations(session) 1ab

1111 

1112 proactive_task = asyncio.create_task(evaluate_periodically(periodic_granularity)) 1b

1113 

1114 ordering = get_triggers_causal_ordering() 1b

1115 

1116 async def message_handler(message: Message): 1b

1117 if not message.data: 

1118 logger.warning("Message had no data") 

1119 

1120 return 

1121 

1122 if not message.attributes: 

1123 logger.warning("Message had no attributes") 

1124 

1125 return 

1126 

1127 if message.attributes.get("event") == "prefect.log.write": 

1128 return 

1129 

1130 try: 

1131 event_id = UUID(message.attributes["id"]) 

1132 except (KeyError, ValueError, TypeError): 

1133 logger.warning( 

1134 "Unable to get event ID from message attributes: %s", 

1135 repr(message.attributes), 

1136 ) 

1137 return 

1138 

1139 if await ordering.event_has_been_seen(event_id): 

1140 return 

1141 

1142 event = ReceivedEvent.model_validate_json(message.data) 

1143 

1144 try: 

1145 await reactive_evaluation(event) 

1146 except EventArrivedEarly: 

1147 pass # it's fine to ACK this message, since it is safe in the DB 

1148 

1149 try: 1b

1150 logger.debug("Starting reactive evaluation task") 1b

1151 yield message_handler 1b

1152 finally: 

1153 sync_task.cancel() 

1154 proactive_task.cancel() 

1155 # Wait for tasks to finish 

1156 await asyncio.gather(sync_task, proactive_task, return_exceptions=True) 

1157 

1158 

1159async def proactive_evaluation( 1a

1160 trigger: EventTrigger, as_of: prefect.types._datetime.DateTime 

1161) -> prefect.types._datetime.DateTime: 

1162 """The core proactive evaluation operation for a single Automation""" 

1163 assert isinstance(trigger, EventTrigger), repr(trigger) 

1164 automation = trigger.automation 

1165 

1166 offset = await get_events_clock_offset() 

1167 as_of += timedelta(seconds=offset) 

1168 

1169 logger.debug( 

1170 "Evaluating automation %s trigger %s proactively as of %s (offset %ss)", 

1171 automation.id, 

1172 trigger.id, 

1173 as_of, 

1174 offset, 

1175 ) 

1176 

1177 # By default, the next run will come after the full trigger window, but it 

1178 # may be sooner based on the state of the buckets 

1179 run_again_at = as_of + trigger.within 

1180 

1181 async with automations_session() as session: 

1182 try: 

1183 if not trigger.for_each: 

1184 await ensure_bucket( 

1185 session, 

1186 trigger, 

1187 bucketing_key=tuple(), 

1188 start=as_of, 

1189 end=as_of + trigger.within, 

1190 last_event=None, 

1191 ) 

1192 

1193 # preemptively delete buckets where possible without 

1194 # evaluating them in memory 

1195 await remove_buckets_exceeding_threshold(session, trigger) 

1196 

1197 async for bucket in read_buckets_for_automation(session, trigger): 

1198 next_bucket = await evaluate( 

1199 session, trigger, bucket, as_of, triggering_event=None 

1200 ) 

1201 if next_bucket and as_of < next_bucket.end < run_again_at: 

1202 run_again_at = prefect.types._datetime.create_datetime_instance( 

1203 next_bucket.end 

1204 ) 

1205 

1206 return run_again_at 

1207 finally: 

1208 await session.commit() 

1209 

1210 

1211async def evaluate_proactive_triggers() -> None: 1a

1212 for trigger in list(triggers.values()): 1212 ↛ 1213line 1212 didn't jump to line 1213 because the loop on line 1212 never started1a

1213 if trigger.posture != Posture.Proactive: 

1214 continue 

1215 

1216 next_run = next_proactive_runs.get( 

1217 trigger.id, prefect.types._datetime.now("UTC") 

1218 ) 

1219 if next_run > prefect.types._datetime.now("UTC"): 

1220 continue 

1221 

1222 try: 

1223 run_again_at = await proactive_evaluation( 

1224 trigger, prefect.types._datetime.now("UTC") 

1225 ) 

1226 logger.debug( 

1227 "Automation %s trigger %s will run again at %s", 

1228 trigger.automation.id, 

1229 trigger.id, 

1230 run_again_at, 

1231 ) 

1232 next_proactive_runs[trigger.id] = run_again_at 

1233 except Exception: 

1234 logger.exception( 

1235 "Error evaluating automation %s trigger %s proactively", 

1236 trigger.automation.id, 

1237 trigger.id, 

1238 )