Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/triggers.py: 71%

382 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2The triggers consumer watches events streaming in from the event bus and decides whether 

3to act on them based on the automations that users have set up. 

4""" 

5 

6import asyncio 1c

7from contextlib import AsyncExitStack, asynccontextmanager 1c

8from datetime import timedelta 1c

9from typing import ( 1c

10 TYPE_CHECKING, 

11 AsyncGenerator, 

12 Collection, 

13 Dict, 

14 List, 

15 Optional, 

16 Tuple, 

17) 

18from uuid import UUID 1c

19 

20import orjson 1c

21import sqlalchemy as sa 1c

22from sqlalchemy.ext.asyncio import AsyncSession 1c

23from typing_extensions import Literal, TypeAlias 1c

24 

25import prefect.types._datetime 1c

26from prefect._internal.retries import retry_async_fn 1c

27from prefect.logging import get_logger 1c

28from prefect.server.database import PrefectDBInterface, db_injector 1c

29from prefect.server.events import messaging 1c

30from prefect.server.events.actions import ServerActionTypes 1c

31from prefect.server.events.models.automations import ( 1c

32 AUTOMATION_CHANGES_CHANNEL, 

33 AutomationChangeEvent, 

34 automations_session, 

35 read_automation, 

36) 

37from prefect.server.events.models.composite_trigger_child_firing import ( 1c

38 clear_child_firings, 

39 clear_old_child_firings, 

40 get_child_firings, 

41 upsert_child_firing, 

42) 

43from prefect.server.events.ordering import ( 1c

44 PRECEDING_EVENT_LOOKBACK, 

45 EventArrivedEarly, 

46 get_triggers_causal_ordering, 

47) 

48from prefect.server.events.schemas.automations import ( 1c

49 Automation, 

50 CompositeTrigger, 

51 EventTrigger, 

52 Firing, 

53 Posture, 

54 Trigger, 

55 TriggeredAction, 

56 TriggerState, 

57) 

58from prefect.server.events.schemas.events import ReceivedEvent 1c

59from prefect.server.utilities.messaging import Message, MessageHandler 1c

60from prefect.server.utilities.postgres_listener import ( 1c

61 get_pg_notify_connection, 

62 pg_listen, 

63) 

64from prefect.settings import PREFECT_EVENTS_EXPIRED_BUCKET_BUFFER 1c

65from prefect.settings.context import get_current_settings 1c

66 

67if TYPE_CHECKING: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true1c

68 import logging 

69 

70 from prefect.server.database.orm_models import ORMAutomationBucket 

71 

72 

73logger: "logging.Logger" = get_logger(__name__) 1c

74 

75AutomationID: TypeAlias = UUID 1c

76TriggerID: TypeAlias = UUID 1c

77 

78 

79AUTOMATION_BUCKET_BATCH_SIZE = 500 1c

80 

81 

82async def evaluate( 1c

83 session: AsyncSession, 

84 trigger: EventTrigger, 

85 bucket: "ORMAutomationBucket", 

86 now: prefect.types._datetime.DateTime, 

87 triggering_event: Optional[ReceivedEvent], 

88) -> "ORMAutomationBucket | None": 

89 """Evaluates an Automation, either triggered by a specific event or proactively 

90 on a time interval. Evaluating a Automation updates the associated counters for 

91 each automation, and will fire the associated action if it has met the threshold.""" 

92 automation = trigger.automation 1gbda

93 

94 logging_context = { 1gbda

95 "automation": automation.id, 

96 "trigger": trigger.id, 

97 "bucketing_key": bucket.bucketing_key, 

98 "bucket_start": bucket.start, 

99 "bucket_end": bucket.end, 

100 "bucket_initial_count": bucket.count, 

101 "bucket_last_operation": bucket.last_operation, 

102 "now": now, 

103 "triggering_event": ( 

104 { 

105 "id": triggering_event.id, 

106 "event": triggering_event.event, 

107 } 

108 if triggering_event 

109 else None 

110 ), 

111 } 

112 

113 # Implementation notes: 

114 # 

115 # This triggering algorithm maintains an invariant that there is exactly one 

116 # time-based "bucket" open and collecting events for each automation at a time. When 

117 # an event comes in that matches the automation, one of four things can happen: 

118 # 

119 # 1. The event would have matched an older bucket that has either expired or has 

120 # already filled up, and thus is no longer relevant; 

121 # 2. The event matches the current bucket, but the bucket does not meet its 

122 # threshold yet; 

123 # 3. The event matches the current bucket, causes it to meet the the threshold, so 

124 # we fire immediately and advance the bucket to the next time period. 

125 # 4. The event matches the current bucket, but the event is for a future time after 

126 # the current bucket has expired, so we will start the new bucket and re-evaluate 

127 # 

128 # Automations are also evaluated proactively without an event to see if they have 

129 # met their proactive threshold (where not enough events have happened in the time 

130 # period) 

131 

132 # If there was a triggering event, then we need to "spend" this count somewhere, 

133 # either in the currently open bucket or in the next time period's bucket 

134 count = 1 if triggering_event else 0 1gbda

135 

136 if now < bucket.start: 1gbda

137 # This is an older out-of-order message or a redundant event for a reactive 

138 # trigger that has has already fired, so it should not should not affect the 

139 # current bucket. We can safely ignore this event/timestamp entirely. Case #1 

140 # from the implementation notes above. 

141 logger.debug( 1da

142 "Automation %s (%r) trigger %s got a late event for keys (%r)", 

143 automation.id, 

144 automation.name, 

145 trigger.id, 

146 bucket.bucketing_key, 

147 extra=logging_context, 

148 ) 

149 return bucket 1da

150 

151 if count and (trigger.immediate or bucket.start <= now < bucket.end): 1gbda

152 # we are still within the automation time period, so spend the count in the 

153 # current bucket 

154 bucket = await increment_bucket(session, bucket, count, triggering_event) 1bda

155 count = 0 1bda

156 

157 # Reactive automations will fire "eagerly", meaning they will fire as _soon_ as the 

158 # threshold is reached, then wait for the rest of their time period before 

159 # firing again. This is done by creating a new bucket in the future after the 

160 # trigger has fired. 

161 

162 # Proactive automations must wait until the whole bucket has expired before they can 

163 # fire, because we can't know if we'll get one late event just in time to cause the 

164 # automation _not_ to fire. 

165 

166 ready_to_fire = trigger.posture == Posture.Reactive or bucket.end <= now 1gbda

167 meets_threshold = trigger.meets_threshold(bucket.count) 1gbda

168 

169 if ready_to_fire and meets_threshold: 1gbda

170 logger.debug( 1ba

171 ( 

172 "Automation %s (%r) trigger %s triggered for keys (%r) %s, " 

173 "having occurred %s times between %s and %s" 

174 ), 

175 automation.id, 

176 automation.name, 

177 trigger.id, 

178 bucket.bucketing_key, 

179 "reactively" if triggering_event else "proactively", 

180 bucket.count, 

181 bucket.start, 

182 bucket.end, 

183 extra=logging_context, 

184 ) 

185 

186 firing = Firing( 1ba

187 trigger=trigger, 

188 trigger_states={TriggerState.Triggered}, 

189 triggered=prefect.types._datetime.now("UTC"), 

190 triggering_labels={ 

191 label: value 

192 for label, value in zip(sorted(trigger.for_each), bucket.bucketing_key) 

193 }, 

194 triggering_event=triggering_event or bucket.last_event, 

195 ) 

196 

197 await fire(session, firing) 1ba

198 

199 # when acting, remove the current bucket from the database immediately to avoid 

200 # potentially double-acting in the case of a crash between now and the next 

201 # time we backup buckets to the database 

202 await remove_bucket(session, bucket) 1bda

203 

204 elif now < bucket.end: 204 ↛ 249line 204 didn't jump to line 249 because the condition on line 204 was always true1gbda

205 # We didn't fire this time, and also the bucket still has more time, so leave 

206 # before setting up a new future bucket. Case #2 from the implementation notes 

207 # above. 

208 logger.debug( 1gbda

209 "Automation %s (%r) trigger %s has more time for keys (%r)", 

210 automation.id, 

211 automation.name, 

212 trigger.id, 

213 bucket.bucketing_key, 

214 extra={ 

215 **logging_context, 

216 "ready_to_fire": ready_to_fire, 

217 "meets_threshold": meets_threshold, 

218 }, 

219 ) 

220 

221 # Special case of a proactive trigger for which the same event satisfies 

222 # both `after` and `expect`. For example, using flow run heartbeats for crash 

223 # detection, after the first heartbeat we expect a subsequent heartbeat or 

224 # terminal state within a given time. 

225 # 

226 # If we've already reached the proactive threshold, we need to remove the 

227 # current bucket and start a new one for the latest event. 

228 if ( 228 ↛ 235line 228 didn't jump to line 235 because the condition on line 228 was never true

229 triggering_event 

230 and trigger.posture == Posture.Proactive 

231 and not meets_threshold 

232 and trigger.starts_after(triggering_event.event) 

233 and trigger.expects(triggering_event.event) 

234 ): 

235 await remove_bucket(session, bucket) 

236 return await start_new_bucket( 

237 session, 

238 trigger, 

239 bucketing_key=bucket.bucketing_key, 

240 start=triggering_event.occurred, 

241 end=triggering_event.occurred + trigger.within, 

242 count=0, 

243 last_event=triggering_event, 

244 ) 

245 

246 return bucket 1gbda

247 else: 

248 # Case #2 from the implementation notes above. 

249 logger.debug( 

250 "Automation %s (%r) trigger %s not ready to fire for keys (%r)", 

251 automation.id, 

252 automation.name, 

253 trigger.id, 

254 bucket.bucketing_key, 

255 extra={ 

256 **logging_context, 

257 "ready_to_fire": ready_to_fire, 

258 "meets_threshold": meets_threshold, 

259 "bucket_current_count": bucket.count, 

260 }, 

261 ) 

262 

263 # We are now outside of the automation's time period or we triggered this 

264 # time. That means it's time to start a new bucket for the next possible time 

265 # window (if this automation does not require an event to start `after`): 

266 if trigger.after: 266 ↛ 271line 266 didn't jump to line 271 because the condition on line 266 was always true

267 # remove the bucket because it should only get re-created if we see another 

268 # appropriate starting event 

269 return await remove_bucket(session, bucket) 1ba

270 else: 

271 if trigger.within == timedelta(seconds=0): 

272 return None 

273 

274 start = prefect.types._datetime.create_datetime_instance(max(bucket.end, now)) 

275 end = start + trigger.within 

276 

277 # If we're processing a reactive trigger and leaving the function with a count 

278 # that we've just spent in the bucket for the next time window, it means that we 

279 # just processed an event that was in the future. It's possible that this event 

280 # was sufficient enough to cause the trigger to fire, so we need to evaluate one 

281 # more time to see if that's the case. This is case #4 from the implementation 

282 # notes above. 

283 if trigger.posture == Posture.Reactive and count > 0: 

284 bucket = await start_new_bucket( 

285 session, 

286 trigger, 

287 bucketing_key=tuple(bucket.bucketing_key), 

288 start=start, 

289 end=end, 

290 count=0, 

291 ) 

292 return await evaluate(session, trigger, bucket, now, triggering_event) 

293 else: 

294 return await start_new_bucket( 1b

295 session, 

296 trigger, 

297 bucketing_key=tuple(bucket.bucketing_key), 

298 start=start, 

299 end=end, 

300 count=count, 

301 ) 

302 

303 

304async def fire(session: AsyncSession, firing: Firing) -> None: 1c

305 if isinstance(firing.trigger.parent, Automation): 305 ↛ 307line 305 didn't jump to line 307 because the condition on line 305 was always true1ba

306 await act(firing) 1ba

307 elif isinstance(firing.trigger.parent, CompositeTrigger): 

308 await evaluate_composite_trigger(session, firing) 

309 else: 

310 raise NotImplementedError( 

311 f"Cannot fire {firing} with parent trigger type {type(firing.trigger.parent)}" 

312 ) 

313 

314 

315async def evaluate_composite_trigger(session: AsyncSession, firing: Firing) -> None: 1c

316 automation = firing.trigger.automation 

317 

318 assert isinstance(firing.trigger.parent, CompositeTrigger) 

319 trigger: CompositeTrigger = firing.trigger.parent 

320 

321 # If we only need to see 1 child firing, 

322 # then the parent trigger can fire immediately. 

323 if trigger.num_expected_firings == 1: 

324 logger.info( 

325 "Automation %s (%r) %s trigger %s fired (shortcut)", 

326 automation.id, 

327 automation.name, 

328 trigger.type, 

329 trigger.id, 

330 extra={ 

331 "automation": automation.id, 

332 "trigger": trigger.id, 

333 "trigger_type": trigger.type, 

334 "firings": firing.id, 

335 }, 

336 ) 

337 await fire( 

338 session, 

339 Firing( 

340 trigger=trigger, 

341 trigger_states={TriggerState.Triggered}, 

342 triggered=prefect.types._datetime.now("UTC"), 

343 triggering_firings=[firing], 

344 triggering_event=firing.triggering_event, 

345 ), 

346 ) 

347 return 

348 

349 # If we're only looking within a certain time horizon, remove any older firings that 

350 # should no longer be considered as satisfying this trigger 

351 if trigger.within is not None: 

352 await clear_old_child_firings( 

353 session, trigger, firing.triggered - trigger.within 

354 ) 

355 

356 # Otherwise we need N child firings. We'll upsert this firing and then check 

357 # what the current state of the world is. If we have enough firings, we'll 

358 # fire the parent trigger. 

359 await upsert_child_firing(session, firing) 

360 firings: list[Firing] = [ 

361 cf.child_firing for cf in await get_child_firings(session, trigger) 

362 ] 

363 firing_ids: set[UUID] = {f.id for f in firings} 

364 

365 # If our current firing no longer exists when we read firings 

366 # another firing has superseded it, and we should defer to that one 

367 if firing.id not in firing_ids: 

368 return 

369 

370 if trigger.ready_to_fire(firings): 

371 logger.info( 

372 "Automation %s (%r) %s trigger %s fired", 

373 automation.id, 

374 automation.name, 

375 trigger.type, 

376 trigger.id, 

377 extra={ 

378 "automation": automation.id, 

379 "trigger": trigger.id, 

380 "trigger_type": trigger.type, 

381 "firings": ",".join(str(f.id) for f in firings), 

382 }, 

383 ) 

384 

385 # clear by firing id 

386 await clear_child_firings(session, trigger, firing_ids=list(firing_ids)) 

387 

388 await fire( 

389 session, 

390 Firing( 

391 trigger=trigger, 

392 trigger_states={TriggerState.Triggered}, 

393 triggered=prefect.types._datetime.now("UTC"), 

394 triggering_firings=firings, 

395 triggering_event=firing.triggering_event, 

396 ), 

397 ) 

398 

399 

400async def act(firing: Firing) -> None: 1c

401 """Given a Automation that has been triggered, the triggering labels and event 

402 (if there was one), publish an action for the `actions` service to process.""" 

403 automation = firing.trigger.automation 1ba

404 

405 state_change_events: Dict[TriggerState, ReceivedEvent] = { 1ba

406 trigger_state: firing.trigger.create_automation_state_change_event( 

407 firing=firing, 

408 trigger_state=trigger_state, 

409 ) 

410 for trigger_state in sorted(firing.trigger_states, key=list(TriggerState).index) 

411 } 

412 await messaging.publish(state_change_events.values()) 1ba

413 

414 # By default, all `automation.actions` are fired 

415 source_actions: List[Tuple[Optional[ReceivedEvent], ServerActionTypes]] = [ 1ba

416 (firing.triggering_event, action) for action in automation.actions 

417 ] 

418 

419 # Conditionally add in actions that fire on specific trigger states 

420 if TriggerState.Triggered in firing.trigger_states: 420 ↛ 426line 420 didn't jump to line 426 because the condition on line 420 was always true1ba

421 source_actions += [ 1ba

422 (state_change_events[TriggerState.Triggered], action) 

423 for action in automation.actions_on_trigger 

424 ] 

425 

426 if TriggerState.Resolved in firing.trigger_states: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true1ba

427 source_actions += [ 

428 (state_change_events[TriggerState.Resolved], action) 

429 for action in automation.actions_on_resolve 

430 ] 

431 

432 actions = [ 1ba

433 TriggeredAction( 

434 automation=automation, 

435 firing=firing, 

436 triggered=firing.triggered, 

437 triggering_labels=firing.triggering_labels, 

438 triggering_event=action_triggering_event, 

439 action=action, 

440 action_index=index, 

441 ) 

442 for index, (action_triggering_event, action) in enumerate(source_actions) 

443 ] 

444 

445 async with messaging.create_actions_publisher() as publisher: 1ba

446 for action in actions: 1ba

447 await publisher.publish_data(action.model_dump_json().encode(), {}) 1ba

448 

449 

450__events_clock_lock: Optional[asyncio.Lock] = None 1c

451_events_clock: Optional[float] = None 1c

452_events_clock_updated: Optional[float] = None 1c

453 

454 

455def _events_clock_lock() -> asyncio.Lock: 1c

456 global __events_clock_lock 

457 if __events_clock_lock is None: 1febhda

458 __events_clock_lock = asyncio.Lock() 1f

459 return __events_clock_lock 1febhda

460 

461 

462async def update_events_clock(event: ReceivedEvent) -> None: 1c

463 global _events_clock, _events_clock_updated 

464 async with _events_clock_lock(): 1ebda

465 # we want the offset to be negative to represent that we are always 

466 # processing events behind realtime... 

467 now = prefect.types._datetime.now("UTC").timestamp() 1ebda

468 event_timestamp = event.occurred.timestamp() 1ebda

469 offset = event_timestamp - now 1ebda

470 

471 # ...and we should clamp this value to zero so we don't inadvertently look like 

472 # we are processing the future 

473 if offset > 0.0: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true1ebda

474 event_timestamp = now 

475 

476 if not _events_clock or event_timestamp >= _events_clock: 1ebda

477 _events_clock = event_timestamp 1ebda

478 

479 _events_clock_updated = now 1ebda

480 

481 

482async def get_events_clock() -> Optional[float]: 1c

483 global _events_clock 

484 return _events_clock 1f

485 

486 

487async def get_events_clock_offset() -> float: 1c

488 """Calculate the current clock offset. This takes into account both the `occurred` 

489 of the last event, as well as the time we _saw_ the last event. This helps to 

490 ensure that in low volume environments, we don't end up getting huge offsets.""" 

491 global _events_clock, _events_clock_updated 

492 

493 async with _events_clock_lock(): 1febha

494 if _events_clock is None or _events_clock_updated is None: 1febha

495 return 0.0 1fea

496 

497 now: float = prefect.types._datetime.now("UTC").timestamp() 1febha

498 offset = (_events_clock - now) + (now - _events_clock_updated) 1ebha

499 

500 return offset 1febha

501 

502 

503async def reset_events_clock() -> None: 1c

504 global _events_clock, _events_clock_updated 

505 async with _events_clock_lock(): 

506 _events_clock = None 

507 _events_clock_updated = None 1f

508 

509 

510async def reactive_evaluation(event: ReceivedEvent, depth: int = 0) -> None: 1c

511 """ 

512 Evaluate all automations that may apply to this event. 

513 

514 Args: 

515 event (ReceivedEvent): The event to evaluate. This object contains all the necessary information 

516 about the event, including its type, associated resources, and metadata. 

517 depth (int, optional): The current recursion depth. This is used to prevent infinite recursion 

518 due to cyclic event dependencies. Defaults to 0 and is incremented with 

519 each recursive call. 

520 

521 """ 

522 async with AsyncExitStack() as stack: 1ebda

523 await update_events_clock(event) 1ebda

524 await stack.enter_async_context( 1ebda

525 get_triggers_causal_ordering().preceding_event_confirmed( 

526 reactive_evaluation, event, depth 

527 ) 

528 ) 

529 

530 interested_triggers = find_interested_triggers(event) 1ebda

531 if not interested_triggers: 1ebda

532 return 1e

533 

534 for trigger in interested_triggers: 1bda

535 logger.info( 1bda

536 "Automation %s, trigger %s covers event %r (%s) for %r at %r", 

537 trigger.automation.id, 

538 trigger.id, 

539 event.event, 

540 event.id, 

541 event.resource.id, 

542 event.occurred.isoformat(), 

543 ) 

544 

545 bucketing_key = trigger.bucketing_key(event) 1bda

546 

547 async with automations_session(begin_transaction=True) as session: 1bda

548 try: 1bda

549 bucket: Optional["ORMAutomationBucket"] = None 1bda

550 

551 if trigger.after and trigger.starts_after(event.event): 551 ↛ 567line 551 didn't jump to line 567 because the condition on line 551 was never true1bda

552 # When an event matches both the after and expect, each event 

553 # can both start a new bucket and increment the bucket that was 

554 # started by the previous event. Here we offset the bucket to 

555 # start at -1 so that the first event will leave the bucket at 0 

556 # after evaluation. See the tests: 

557 # 

558 # test_same_event_in_expect_and_after_never_reacts_immediately 

559 # test_same_event_in_expect_and_after_reacts_after_threshold_is_met 

560 # test_same_event_in_expect_and_after_proactively_does_not_fire 

561 # test_same_event_in_expect_and_after_proactively_fires 

562 # 

563 # in test_triggers_regressions.py for examples of how we expect 

564 # this to behave. 

565 # 

566 # https://github.com/PrefectHQ/nebula/issues/4201 

567 initial_count = -1 if trigger.expects(event.event) else 0 

568 bucket = await ensure_bucket( 

569 session, 

570 trigger, 

571 bucketing_key, 

572 start=event.occurred, 

573 end=event.occurred + trigger.within, 

574 last_event=event, 

575 initial_count=initial_count, 

576 ) 

577 

578 if ( 

579 not bucket 

580 and not trigger.after 

581 and trigger.expects(event.event) 

582 ): 

583 # When ensuring a bucket and _creating it for the first time_, 

584 # use an old time so that we can catch any other events flowing 

585 # through the system at the same time even if they are out of 

586 # order. After the trigger fires and creates its next bucket, 

587 # time will start from that point forward. We'll use our 

588 # preceding event lookback variable as the horizon that we'll 

589 # accept these older events. 

590 # 

591 # https://github.com/PrefectHQ/nebula/issues/7230 

592 start = event.occurred - PRECEDING_EVENT_LOOKBACK 1bda

593 

594 bucket = await ensure_bucket( 1bhda

595 session, 

596 trigger, 

597 bucketing_key=bucketing_key, 

598 start=start, 

599 end=event.occurred + trigger.within, 

600 last_event=event, 

601 ) 

602 

603 if not trigger.expects(event.event): 1bda

604 continue 1bda

605 

606 if not bucket: 1bda

607 bucket = await read_bucket(session, trigger, bucketing_key) 1bda

608 if not bucket: 608 ↛ 611line 608 didn't jump to line 611 because the condition on line 608 was always true1bda

609 continue 1bda

610 

611 await evaluate( 1bda

612 session, 

613 trigger, 

614 bucket, 

615 event.occurred, 

616 triggering_event=event, 

617 ) 

618 finally: 

619 await session.commit() 1bda

620 

621 

622# retry on operational errors to account for db flakiness with sqlite 

623@retry_async_fn(max_attempts=3, retry_on_exceptions=(sa.exc.OperationalError,)) 1c

624async def get_lost_followers() -> List[ReceivedEvent]: 1c

625 """Get followers that have been sitting around longer than our lookback""" 

626 return await get_triggers_causal_ordering().get_lost_followers() 1febha

627 

628 

629async def periodic_evaluation(now: prefect.types._datetime.DateTime) -> None: 1c

630 """Periodic tasks that should be run regularly, but not as often as every event""" 

631 offset = await get_events_clock_offset() 1febha

632 as_of = now + timedelta(seconds=offset) 1febha

633 

634 logger.debug("Running periodic evaluation as of %s (offset %ss)", as_of, offset) 1febha

635 

636 # Any followers that have been sitting around longer than our lookback are never 

637 # going to see their leader event (maybe it was lost or took too long to arrive). 

638 # These events can just be evaluated now in the order they occurred. 

639 for event in await get_lost_followers(): 639 ↛ 640line 639 didn't jump to line 640 because the loop on line 639 never started1febha

640 await reactive_evaluation(event) 

641 

642 async with automations_session() as session: 1febhda

643 await sweep_closed_buckets( 1febhda

644 session, 

645 as_of - PREFECT_EVENTS_EXPIRED_BUCKET_BUFFER.value(), 

646 ) 

647 await session.commit() 1febda

648 

649 

650async def evaluate_periodically(periodic_granularity: timedelta) -> None: 1c

651 """Runs periodic evaluation on the given interval""" 

652 logger.debug( 1f

653 "Starting periodic evaluation task every %s seconds", 

654 periodic_granularity.total_seconds(), 

655 ) 

656 while True: 1febha

657 try: 1febha

658 await periodic_evaluation(prefect.types._datetime.now("UTC")) 1febhda

659 except Exception: 

660 logger.exception("Error running periodic evaluation") 

661 finally: 

662 await asyncio.sleep(periodic_granularity.total_seconds()) 1ebhda

663 

664 

665# The currently loaded automations for this shard, organized both by ID and by 

666# account and workspace 

667automations_by_id: Dict[UUID, Automation] = {} 1c

668triggers: Dict[TriggerID, EventTrigger] = {} 1c

669next_proactive_runs: Dict[TriggerID, prefect.types._datetime.DateTime] = {} 1c

670 

671# This lock governs any changes to the set of loaded automations; any routine that will 

672# add/remove automations must be holding this lock when it does so. It's best to use 

673# the methods below to access the loaded set of automations. 

674__automations_lock: Optional[asyncio.Lock] = None 1c

675 

676 

677def _automations_lock() -> asyncio.Lock: 1c

678 global __automations_lock 

679 if __automations_lock is None: 1ieba

680 __automations_lock = asyncio.Lock() 1i

681 return __automations_lock 1ieba

682 

683 

684def find_interested_triggers(event: ReceivedEvent) -> Collection[EventTrigger]: 1c

685 candidates = triggers.values() 1ebda

686 return [trigger for trigger in candidates if trigger.covers(event)] 1ebda

687 

688 

689def load_automation(automation: Optional[Automation]) -> None: 1c

690 """Loads the given automation into memory so that it is available for evaluations""" 

691 if not automation: 691 ↛ 692line 691 didn't jump to line 692 because the condition on line 691 was never true1eba

692 return 

693 

694 event_triggers = automation.triggers_of_type(EventTrigger) 1eba

695 

696 if not automation.enabled or not event_triggers: 1eba

697 forget_automation(automation.id) 1eba

698 return 1eba

699 

700 automations_by_id[automation.id] = automation 1ea

701 

702 for trigger in event_triggers: 1ea

703 triggers[trigger.id] = trigger 1ea

704 next_proactive_runs.pop(trigger.id, None) 1ea

705 

706 

707def forget_automation(automation_id: UUID) -> None: 1c

708 """Unloads the given automation from memory""" 

709 if automation := automations_by_id.pop(automation_id, None): 1eba

710 for trigger in automation.triggers(): 

711 triggers.pop(trigger.id, None) 

712 next_proactive_runs.pop(trigger.id, None) 

713 

714 

715async def automation_changed( 1c

716 automation_id: UUID, 

717 event: Literal["automation__created", "automation__updated", "automation__deleted"], 

718) -> None: 

719 async with _automations_lock(): 1ieba

720 if event in ("automation__deleted", "automation__updated"): 1ieba

721 forget_automation(automation_id) 1ea

722 

723 if event in ("automation__created", "automation__updated"): 1ieba

724 async with automations_session() as session: 1ieba

725 automation = await read_automation(session, automation_id) 1ieba

726 load_automation(automation) 1eba

727 

728 

729@db_injector 1c

730async def load_automations(db: PrefectDBInterface, session: AsyncSession): 1c

731 """Loads all automations for the given set of accounts""" 

732 query = sa.select(db.Automation) 1c

733 

734 logger.debug("Loading automations") 1c

735 

736 result = await session.execute(query) 1cf

737 for automation in result.scalars().all(): 

738 load_automation(Automation.model_validate(automation, from_attributes=True)) 

739 

740 logger.debug( 

741 "Loaded %s automations with %s triggers", len(automations_by_id), len(triggers) 

742 ) 

743 

744 

745@db_injector 1c

746async def remove_buckets_exceeding_threshold( 1c

747 db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger 

748): 

749 """Deletes bucket where the count has already exceeded the threshold""" 

750 assert isinstance(trigger, EventTrigger), repr(trigger) 1ba

751 await session.execute( 1gba

752 sa.delete(db.AutomationBucket).where( 

753 db.AutomationBucket.automation_id == trigger.automation.id, 

754 db.AutomationBucket.trigger_id == trigger.id, 

755 db.AutomationBucket.count >= trigger.threshold, 

756 ) 

757 ) 

758 

759 

760@db_injector 1c

761async def read_buckets_for_automation( 1c

762 db: PrefectDBInterface, 

763 session: AsyncSession, 

764 trigger: Trigger, 

765 batch_size: int = AUTOMATION_BUCKET_BATCH_SIZE, 

766) -> AsyncGenerator["ORMAutomationBucket", None]: 

767 """Yields buckets for the given automation and trigger in batches.""" 

768 offset = 0 1gba

769 

770 while True: 1gba

771 query = ( 1gba

772 sa.select(db.AutomationBucket) 

773 .where( 

774 db.AutomationBucket.automation_id == trigger.automation.id, 

775 db.AutomationBucket.trigger_id == trigger.id, 

776 ) 

777 .order_by(db.AutomationBucket.start) 

778 .limit(batch_size) 

779 .offset(offset) 

780 ) 

781 

782 result = await session.execute(query) 1gba

783 buckets = result.scalars().all() 1gba

784 

785 if not buckets: 1gba

786 break 1gba

787 

788 for bucket in buckets: 1gba

789 yield bucket 1gba

790 

791 offset += batch_size 1gba

792 

793 

794@db_injector 1c

795async def read_bucket( 1c

796 db: PrefectDBInterface, 

797 session: AsyncSession, 

798 trigger: Trigger, 

799 bucketing_key: Tuple[str, ...], 

800) -> Optional["ORMAutomationBucket"]: 

801 """Gets the bucket this event would fall into for the given Automation, if there is 

802 one currently""" 

803 return await read_bucket_by_trigger_id( 1bda

804 session, 

805 trigger.automation.id, 

806 trigger.id, 

807 bucketing_key, 

808 ) 

809 

810 

811@db_injector 1c

812async def read_bucket_by_trigger_id( 1c

813 db: PrefectDBInterface, 

814 session: AsyncSession, 

815 automation_id: UUID, 

816 trigger_id: UUID, 

817 bucketing_key: Tuple[str, ...], 

818) -> "ORMAutomationBucket | None": 

819 """Gets the bucket this event would fall into for the given Automation, if there is 

820 one currently""" 

821 query = sa.select(db.AutomationBucket).where( 1bhda

822 db.AutomationBucket.automation_id == automation_id, 

823 db.AutomationBucket.trigger_id == trigger_id, 

824 db.AutomationBucket.bucketing_key == bucketing_key, 

825 ) 

826 result = await session.execute(query) 1gbhda

827 bucket = result.scalars().first() 1bda

828 if bucket: 1bda

829 await session.refresh(bucket) 1bda

830 

831 return bucket 1bda

832 

833 

834@db_injector 1c

835async def increment_bucket( 1c

836 db: PrefectDBInterface, 

837 session: AsyncSession, 

838 bucket: "ORMAutomationBucket", 

839 count: int, 

840 last_event: Optional[ReceivedEvent], 

841) -> "ORMAutomationBucket": 

842 """Adds the given count to the bucket, returning the new bucket""" 

843 additional_updates: dict[str, ReceivedEvent] = ( 1bda

844 {"last_event": last_event} if last_event else {} 

845 ) 

846 await session.execute( 1bda

847 db.queries.insert(db.AutomationBucket) 

848 .values( 

849 automation_id=bucket.automation_id, 

850 trigger_id=bucket.trigger_id, 

851 bucketing_key=bucket.bucketing_key, 

852 start=bucket.start, 

853 end=bucket.end, 

854 count=count, 

855 last_operation="increment_bucket[insert]", 

856 ) 

857 .on_conflict_do_update( 

858 index_elements=[ 

859 db.AutomationBucket.automation_id, 

860 db.AutomationBucket.trigger_id, 

861 db.AutomationBucket.bucketing_key, 

862 ], 

863 set_=dict( 

864 count=db.AutomationBucket.count + count, 

865 last_operation="increment_bucket[update]", 

866 updated=prefect.types._datetime.now("UTC"), 

867 **additional_updates, 

868 ), 

869 ) 

870 ) 

871 

872 read_bucket = await read_bucket_by_trigger_id( 1bda

873 session, 

874 bucket.automation_id, 

875 bucket.trigger_id, 

876 tuple(bucket.bucketing_key), 

877 ) 

878 

879 if TYPE_CHECKING: 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true1bda

880 assert read_bucket is not None 

881 

882 return read_bucket 1bda

883 

884 

885@db_injector 1c

886async def start_new_bucket( 1c

887 db: PrefectDBInterface, 

888 session: AsyncSession, 

889 trigger: EventTrigger, 

890 bucketing_key: Tuple[str, ...], 

891 start: prefect.types._datetime.DateTime, 

892 end: prefect.types._datetime.DateTime, 

893 count: int, 

894 triggered_at: Optional[prefect.types._datetime.DateTime] = None, 

895 last_event: Optional[ReceivedEvent] = None, 

896) -> "ORMAutomationBucket": 

897 """Ensures that a bucket with the given start and end exists with the given count, 

898 returning the new bucket""" 

899 automation = trigger.automation 1ba

900 

901 await session.execute( 1ba

902 db.queries.insert(db.AutomationBucket) 

903 .values( 

904 automation_id=automation.id, 

905 trigger_id=trigger.id, 

906 bucketing_key=bucketing_key, 

907 start=start, 

908 end=end, 

909 count=count, 

910 last_operation="start_new_bucket[insert]", 

911 triggered_at=triggered_at, 

912 last_event=last_event, 

913 ) 

914 .on_conflict_do_update( 

915 index_elements=[ 

916 db.AutomationBucket.automation_id, 

917 db.AutomationBucket.trigger_id, 

918 db.AutomationBucket.bucketing_key, 

919 ], 

920 set_=dict( 

921 start=start, 

922 end=end, 

923 count=count, 

924 last_operation="start_new_bucket[update]", 

925 updated=prefect.types._datetime.now("UTC"), 

926 triggered_at=triggered_at, 

927 last_event=last_event, 

928 ), 

929 ) 

930 ) 

931 

932 read_bucket = await read_bucket_by_trigger_id( 

933 session, 

934 automation.id, 

935 trigger.id, 

936 tuple(bucketing_key), 

937 ) 

938 

939 if TYPE_CHECKING: 

940 assert read_bucket is not None 

941 

942 return read_bucket 

943 

944 

945@db_injector 1c

946async def ensure_bucket( 1c

947 db: PrefectDBInterface, 

948 session: AsyncSession, 

949 trigger: EventTrigger, 

950 bucketing_key: Tuple[str, ...], 

951 start: prefect.types._datetime.DateTime, 

952 end: prefect.types._datetime.DateTime, 

953 last_event: Optional[ReceivedEvent], 

954 initial_count: int = 0, 

955) -> "ORMAutomationBucket": 

956 """Ensures that a bucket has been started for the given automation and key, 

957 returning the current bucket. Will not modify the existing bucket.""" 

958 automation = trigger.automation 1bda

959 additional_updates: dict[str, ReceivedEvent] = ( 1bda

960 {"last_event": last_event} if last_event else {} 

961 ) 

962 await session.execute( 1bhda

963 db.queries.insert(db.AutomationBucket) 

964 .values( 

965 automation_id=automation.id, 

966 trigger_id=trigger.id, 

967 bucketing_key=bucketing_key, 

968 last_event=last_event, 

969 start=start, 

970 end=end, 

971 count=initial_count, 

972 last_operation="ensure_bucket[insert]", 

973 ) 

974 .on_conflict_do_update( 

975 index_elements=[ 

976 db.AutomationBucket.automation_id, 

977 db.AutomationBucket.trigger_id, 

978 db.AutomationBucket.bucketing_key, 

979 ], 

980 set_=dict( 

981 # no-op, but this counts as an update so the query returns a row 

982 count=db.AutomationBucket.count, 

983 **additional_updates, 

984 ), 

985 ) 

986 ) 

987 

988 read_bucket = await read_bucket_by_trigger_id( 1bhda

989 session, automation.id, trigger.id, tuple(bucketing_key) 

990 ) 

991 

992 if TYPE_CHECKING: 992 ↛ 993line 992 didn't jump to line 993 because the condition on line 992 was never true1bda

993 assert read_bucket is not None 

994 

995 return read_bucket 1bda

996 

997 

998@db_injector 1c

999async def remove_bucket( 1c

1000 db: PrefectDBInterface, session: AsyncSession, bucket: "ORMAutomationBucket" 

1001): 

1002 """Removes the given bucket from the database""" 

1003 await session.execute( 1bda

1004 sa.delete(db.AutomationBucket).where( 

1005 db.AutomationBucket.automation_id == bucket.automation_id, 

1006 db.AutomationBucket.trigger_id == bucket.trigger_id, 

1007 db.AutomationBucket.bucketing_key == bucket.bucketing_key, 

1008 ) 

1009 ) 

1010 

1011 

1012@db_injector 1c

1013async def sweep_closed_buckets( 1c

1014 db: PrefectDBInterface, 

1015 session: AsyncSession, 

1016 older_than: prefect.types._datetime.DateTime, 

1017) -> None: 

1018 await session.execute( 1febhda

1019 sa.delete(db.AutomationBucket).where(db.AutomationBucket.end <= older_than) 

1020 ) 

1021 

1022 

1023async def reset() -> None: 1c

1024 """Resets the in-memory state of the service""" 

1025 await reset_events_clock() 

1026 automations_by_id.clear() 

1027 triggers.clear() 

1028 next_proactive_runs.clear() 

1029 

1030 

1031async def listen_for_automation_changes() -> None: 1c

1032 """ 

1033 Listens for any changes to automations via PostgreSQL NOTIFY/LISTEN, 

1034 and applies those changes to the set of loaded automations. 

1035 """ 

1036 logger.info("Starting automation change listener") 

1037 

1038 while True: 

1039 conn = None 

1040 try: 

1041 conn = await get_pg_notify_connection() 

1042 if not conn: 

1043 logger.debug( 

1044 "PostgreSQL NOTIFY/LISTEN not available (not using PostgreSQL). " 

1045 "Automation changes will not be synchronized across servers." 

1046 ) 

1047 return 

1048 

1049 logger.info( 

1050 f"Listening for automation changes on {AUTOMATION_CHANGES_CHANNEL}" 

1051 ) 

1052 

1053 async for payload in pg_listen( 

1054 conn, 

1055 AUTOMATION_CHANGES_CHANNEL, 

1056 heartbeat_interval=get_current_settings().server.services.triggers.pg_notify_heartbeat_interval_seconds, 

1057 ): 

1058 try: 

1059 data = orjson.loads(payload) 

1060 automation_id = UUID(data["automation_id"]) 

1061 event_type = data["event_type"] 

1062 

1063 logger.info( 

1064 f"Received automation change notification: {event_type} for {automation_id}" 

1065 ) 

1066 

1067 event_map: dict[str, AutomationChangeEvent] = { 

1068 "created": "automation__created", 

1069 "updated": "automation__updated", 

1070 "deleted": "automation__deleted", 

1071 } 

1072 

1073 if event_type in event_map: 

1074 await automation_changed(automation_id, event_map[event_type]) 

1075 else: 

1076 logger.warning(f"Unknown automation event type: {event_type}") 

1077 

1078 except Exception as e: 

1079 logger.error( 

1080 f"Error processing automation change notification: {e}", 

1081 exc_info=True, 

1082 ) 

1083 

1084 except asyncio.CancelledError: 

1085 logger.info("Automation change listener cancelled") 

1086 break 

1087 except Exception as e: 

1088 reconnect_seconds = get_current_settings().server.services.triggers.pg_notify_reconnect_interval_seconds 

1089 logger.error( 

1090 f"Error in automation change listener: {e}. Reconnecting in {reconnect_seconds}s...", 

1091 exc_info=True, 

1092 ) 

1093 await asyncio.sleep(reconnect_seconds) 

1094 finally: 

1095 if conn and not conn.is_closed(): 

1096 await conn.close() 

1097 

1098 

1099@asynccontextmanager 1c

1100async def consumer( 1c

1101 periodic_granularity: timedelta = timedelta(seconds=5), 

1102) -> AsyncGenerator[MessageHandler, None]: 

1103 """The `triggers.consumer` processes all Events arriving on the event bus to 

1104 determine if they meet the automation criteria, queuing up a corresponding 

1105 `TriggeredAction` for the `actions` service if the automation criteria is met.""" 

1106 # Start the automation change listener task 

1107 sync_task = asyncio.create_task(listen_for_automation_changes()) 1c

1108 

1109 async with automations_session() as session: 1cf

1110 await load_automations(session) 1cf

1111 

1112 proactive_task = asyncio.create_task(evaluate_periodically(periodic_granularity)) 1f

1113 

1114 ordering = get_triggers_causal_ordering() 1f

1115 

1116 async def message_handler(message: Message): 1f

1117 if not message.data: 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true1ebda

1118 logger.warning("Message had no data") 

1119 

1120 return 

1121 

1122 if not message.attributes: 1122 ↛ 1123line 1122 didn't jump to line 1123 because the condition on line 1122 was never true1ebda

1123 logger.warning("Message had no attributes") 

1124 

1125 return 

1126 

1127 if message.attributes.get("event") == "prefect.log.write": 1127 ↛ 1128line 1127 didn't jump to line 1128 because the condition on line 1127 was never true1ebda

1128 return 

1129 

1130 try: 1ebda

1131 event_id = UUID(message.attributes["id"]) 1ebda

1132 except (KeyError, ValueError, TypeError): 

1133 logger.warning( 

1134 "Unable to get event ID from message attributes: %s", 

1135 repr(message.attributes), 

1136 ) 

1137 return 

1138 

1139 if await ordering.event_has_been_seen(event_id): 1139 ↛ 1140line 1139 didn't jump to line 1140 because the condition on line 1139 was never true1ebda

1140 return 

1141 

1142 event = ReceivedEvent.model_validate_json(message.data) 1ebda

1143 

1144 try: 1ebda

1145 await reactive_evaluation(event) 1ebhda

1146 except EventArrivedEarly: 

1147 pass # it's fine to ACK this message, since it is safe in the DB 

1148 

1149 try: 1f

1150 logger.debug("Starting reactive evaluation task") 1f

1151 yield message_handler 1f

1152 finally: 

1153 sync_task.cancel() 

1154 proactive_task.cancel() 

1155 # Wait for tasks to finish 

1156 await asyncio.gather(sync_task, proactive_task, return_exceptions=True) 

1157 

1158 

1159async def proactive_evaluation( 1c

1160 trigger: EventTrigger, as_of: prefect.types._datetime.DateTime 

1161) -> prefect.types._datetime.DateTime: 

1162 """The core proactive evaluation operation for a single Automation""" 

1163 assert isinstance(trigger, EventTrigger), repr(trigger) 1ba

1164 automation = trigger.automation 1ba

1165 

1166 offset = await get_events_clock_offset() 1ba

1167 as_of += timedelta(seconds=offset) 1ba

1168 

1169 logger.debug( 1ba

1170 "Evaluating automation %s trigger %s proactively as of %s (offset %ss)", 

1171 automation.id, 

1172 trigger.id, 

1173 as_of, 

1174 offset, 

1175 ) 

1176 

1177 # By default, the next run will come after the full trigger window, but it 

1178 # may be sooner based on the state of the buckets 

1179 run_again_at = as_of + trigger.within 1ba

1180 

1181 async with automations_session() as session: 1gba

1182 try: 1ba

1183 if not trigger.for_each: 1ba

1184 await ensure_bucket( 1ba

1185 session, 

1186 trigger, 

1187 bucketing_key=tuple(), 

1188 start=as_of, 

1189 end=as_of + trigger.within, 

1190 last_event=None, 

1191 ) 

1192 

1193 # preemptively delete buckets where possible without 

1194 # evaluating them in memory 

1195 await remove_buckets_exceeding_threshold(session, trigger) 1gba

1196 

1197 async for bucket in read_buckets_for_automation(session, trigger): 1gba

1198 next_bucket = await evaluate( 1ba

1199 session, trigger, bucket, as_of, triggering_event=None 

1200 ) 

1201 if next_bucket and as_of < next_bucket.end < run_again_at: 

1202 run_again_at = prefect.types._datetime.create_datetime_instance( 

1203 next_bucket.end 

1204 ) 

1205 

1206 return run_again_at 1ba

1207 finally: 

1208 await session.commit() 1gba

1209 

1210 

1211async def evaluate_proactive_triggers() -> None: 1c

1212 for trigger in list(triggers.values()): 1cegba

1213 if trigger.posture != Posture.Proactive: 1eba

1214 continue 1eba

1215 

1216 next_run = next_proactive_runs.get( 1ba

1217 trigger.id, prefect.types._datetime.now("UTC") 

1218 ) 

1219 if next_run > prefect.types._datetime.now("UTC"): 1ba

1220 continue 1ba

1221 

1222 try: 1ba

1223 run_again_at = await proactive_evaluation( 1gba

1224 trigger, prefect.types._datetime.now("UTC") 

1225 ) 

1226 logger.debug( 1gba

1227 "Automation %s trigger %s will run again at %s", 

1228 trigger.automation.id, 

1229 trigger.id, 

1230 run_again_at, 

1231 ) 

1232 next_proactive_runs[trigger.id] = run_again_at 1gba

1233 except Exception: 

1234 logger.exception( 

1235 "Error evaluating automation %s trigger %s proactively", 

1236 trigger.automation.id, 

1237 trigger.id, 

1238 )