Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/triggers.py: 71%
382 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2The triggers consumer watches events streaming in from the event bus and decides whether
3to act on them based on the automations that users have set up.
4"""
6import asyncio 1c
7from contextlib import AsyncExitStack, asynccontextmanager 1c
8from datetime import timedelta 1c
9from typing import ( 1c
10 TYPE_CHECKING,
11 AsyncGenerator,
12 Collection,
13 Dict,
14 List,
15 Optional,
16 Tuple,
17)
18from uuid import UUID 1c
20import orjson 1c
21import sqlalchemy as sa 1c
22from sqlalchemy.ext.asyncio import AsyncSession 1c
23from typing_extensions import Literal, TypeAlias 1c
25import prefect.types._datetime 1c
26from prefect._internal.retries import retry_async_fn 1c
27from prefect.logging import get_logger 1c
28from prefect.server.database import PrefectDBInterface, db_injector 1c
29from prefect.server.events import messaging 1c
30from prefect.server.events.actions import ServerActionTypes 1c
31from prefect.server.events.models.automations import ( 1c
32 AUTOMATION_CHANGES_CHANNEL,
33 AutomationChangeEvent,
34 automations_session,
35 read_automation,
36)
37from prefect.server.events.models.composite_trigger_child_firing import ( 1c
38 clear_child_firings,
39 clear_old_child_firings,
40 get_child_firings,
41 upsert_child_firing,
42)
43from prefect.server.events.ordering import ( 1c
44 PRECEDING_EVENT_LOOKBACK,
45 EventArrivedEarly,
46 get_triggers_causal_ordering,
47)
48from prefect.server.events.schemas.automations import ( 1c
49 Automation,
50 CompositeTrigger,
51 EventTrigger,
52 Firing,
53 Posture,
54 Trigger,
55 TriggeredAction,
56 TriggerState,
57)
58from prefect.server.events.schemas.events import ReceivedEvent 1c
59from prefect.server.utilities.messaging import Message, MessageHandler 1c
60from prefect.server.utilities.postgres_listener import ( 1c
61 get_pg_notify_connection,
62 pg_listen,
63)
64from prefect.settings import PREFECT_EVENTS_EXPIRED_BUCKET_BUFFER 1c
65from prefect.settings.context import get_current_settings 1c
67if TYPE_CHECKING: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true1c
68 import logging
70 from prefect.server.database.orm_models import ORMAutomationBucket
73logger: "logging.Logger" = get_logger(__name__) 1c
75AutomationID: TypeAlias = UUID 1c
76TriggerID: TypeAlias = UUID 1c
79AUTOMATION_BUCKET_BATCH_SIZE = 500 1c
82async def evaluate( 1c
83 session: AsyncSession,
84 trigger: EventTrigger,
85 bucket: "ORMAutomationBucket",
86 now: prefect.types._datetime.DateTime,
87 triggering_event: Optional[ReceivedEvent],
88) -> "ORMAutomationBucket | None":
89 """Evaluates an Automation, either triggered by a specific event or proactively
90 on a time interval. Evaluating a Automation updates the associated counters for
91 each automation, and will fire the associated action if it has met the threshold."""
92 automation = trigger.automation 1gbda
94 logging_context = { 1gbda
95 "automation": automation.id,
96 "trigger": trigger.id,
97 "bucketing_key": bucket.bucketing_key,
98 "bucket_start": bucket.start,
99 "bucket_end": bucket.end,
100 "bucket_initial_count": bucket.count,
101 "bucket_last_operation": bucket.last_operation,
102 "now": now,
103 "triggering_event": (
104 {
105 "id": triggering_event.id,
106 "event": triggering_event.event,
107 }
108 if triggering_event
109 else None
110 ),
111 }
113 # Implementation notes:
114 #
115 # This triggering algorithm maintains an invariant that there is exactly one
116 # time-based "bucket" open and collecting events for each automation at a time. When
117 # an event comes in that matches the automation, one of four things can happen:
118 #
119 # 1. The event would have matched an older bucket that has either expired or has
120 # already filled up, and thus is no longer relevant;
121 # 2. The event matches the current bucket, but the bucket does not meet its
122 # threshold yet;
123 # 3. The event matches the current bucket, causes it to meet the the threshold, so
124 # we fire immediately and advance the bucket to the next time period.
125 # 4. The event matches the current bucket, but the event is for a future time after
126 # the current bucket has expired, so we will start the new bucket and re-evaluate
127 #
128 # Automations are also evaluated proactively without an event to see if they have
129 # met their proactive threshold (where not enough events have happened in the time
130 # period)
132 # If there was a triggering event, then we need to "spend" this count somewhere,
133 # either in the currently open bucket or in the next time period's bucket
134 count = 1 if triggering_event else 0 1gbda
136 if now < bucket.start: 1gbda
137 # This is an older out-of-order message or a redundant event for a reactive
138 # trigger that has has already fired, so it should not should not affect the
139 # current bucket. We can safely ignore this event/timestamp entirely. Case #1
140 # from the implementation notes above.
141 logger.debug( 1da
142 "Automation %s (%r) trigger %s got a late event for keys (%r)",
143 automation.id,
144 automation.name,
145 trigger.id,
146 bucket.bucketing_key,
147 extra=logging_context,
148 )
149 return bucket 1da
151 if count and (trigger.immediate or bucket.start <= now < bucket.end): 1gbda
152 # we are still within the automation time period, so spend the count in the
153 # current bucket
154 bucket = await increment_bucket(session, bucket, count, triggering_event) 1bda
155 count = 0 1bda
157 # Reactive automations will fire "eagerly", meaning they will fire as _soon_ as the
158 # threshold is reached, then wait for the rest of their time period before
159 # firing again. This is done by creating a new bucket in the future after the
160 # trigger has fired.
162 # Proactive automations must wait until the whole bucket has expired before they can
163 # fire, because we can't know if we'll get one late event just in time to cause the
164 # automation _not_ to fire.
166 ready_to_fire = trigger.posture == Posture.Reactive or bucket.end <= now 1gbda
167 meets_threshold = trigger.meets_threshold(bucket.count) 1gbda
169 if ready_to_fire and meets_threshold: 1gbda
170 logger.debug( 1ba
171 (
172 "Automation %s (%r) trigger %s triggered for keys (%r) %s, "
173 "having occurred %s times between %s and %s"
174 ),
175 automation.id,
176 automation.name,
177 trigger.id,
178 bucket.bucketing_key,
179 "reactively" if triggering_event else "proactively",
180 bucket.count,
181 bucket.start,
182 bucket.end,
183 extra=logging_context,
184 )
186 firing = Firing( 1ba
187 trigger=trigger,
188 trigger_states={TriggerState.Triggered},
189 triggered=prefect.types._datetime.now("UTC"),
190 triggering_labels={
191 label: value
192 for label, value in zip(sorted(trigger.for_each), bucket.bucketing_key)
193 },
194 triggering_event=triggering_event or bucket.last_event,
195 )
197 await fire(session, firing) 1ba
199 # when acting, remove the current bucket from the database immediately to avoid
200 # potentially double-acting in the case of a crash between now and the next
201 # time we backup buckets to the database
202 await remove_bucket(session, bucket) 1bda
204 elif now < bucket.end: 204 ↛ 249line 204 didn't jump to line 249 because the condition on line 204 was always true1gbda
205 # We didn't fire this time, and also the bucket still has more time, so leave
206 # before setting up a new future bucket. Case #2 from the implementation notes
207 # above.
208 logger.debug( 1gbda
209 "Automation %s (%r) trigger %s has more time for keys (%r)",
210 automation.id,
211 automation.name,
212 trigger.id,
213 bucket.bucketing_key,
214 extra={
215 **logging_context,
216 "ready_to_fire": ready_to_fire,
217 "meets_threshold": meets_threshold,
218 },
219 )
221 # Special case of a proactive trigger for which the same event satisfies
222 # both `after` and `expect`. For example, using flow run heartbeats for crash
223 # detection, after the first heartbeat we expect a subsequent heartbeat or
224 # terminal state within a given time.
225 #
226 # If we've already reached the proactive threshold, we need to remove the
227 # current bucket and start a new one for the latest event.
228 if ( 228 ↛ 235line 228 didn't jump to line 235 because the condition on line 228 was never true
229 triggering_event
230 and trigger.posture == Posture.Proactive
231 and not meets_threshold
232 and trigger.starts_after(triggering_event.event)
233 and trigger.expects(triggering_event.event)
234 ):
235 await remove_bucket(session, bucket)
236 return await start_new_bucket(
237 session,
238 trigger,
239 bucketing_key=bucket.bucketing_key,
240 start=triggering_event.occurred,
241 end=triggering_event.occurred + trigger.within,
242 count=0,
243 last_event=triggering_event,
244 )
246 return bucket 1gbda
247 else:
248 # Case #2 from the implementation notes above.
249 logger.debug(
250 "Automation %s (%r) trigger %s not ready to fire for keys (%r)",
251 automation.id,
252 automation.name,
253 trigger.id,
254 bucket.bucketing_key,
255 extra={
256 **logging_context,
257 "ready_to_fire": ready_to_fire,
258 "meets_threshold": meets_threshold,
259 "bucket_current_count": bucket.count,
260 },
261 )
263 # We are now outside of the automation's time period or we triggered this
264 # time. That means it's time to start a new bucket for the next possible time
265 # window (if this automation does not require an event to start `after`):
266 if trigger.after: 266 ↛ 271line 266 didn't jump to line 271 because the condition on line 266 was always true
267 # remove the bucket because it should only get re-created if we see another
268 # appropriate starting event
269 return await remove_bucket(session, bucket) 1ba
270 else:
271 if trigger.within == timedelta(seconds=0):
272 return None
274 start = prefect.types._datetime.create_datetime_instance(max(bucket.end, now))
275 end = start + trigger.within
277 # If we're processing a reactive trigger and leaving the function with a count
278 # that we've just spent in the bucket for the next time window, it means that we
279 # just processed an event that was in the future. It's possible that this event
280 # was sufficient enough to cause the trigger to fire, so we need to evaluate one
281 # more time to see if that's the case. This is case #4 from the implementation
282 # notes above.
283 if trigger.posture == Posture.Reactive and count > 0:
284 bucket = await start_new_bucket(
285 session,
286 trigger,
287 bucketing_key=tuple(bucket.bucketing_key),
288 start=start,
289 end=end,
290 count=0,
291 )
292 return await evaluate(session, trigger, bucket, now, triggering_event)
293 else:
294 return await start_new_bucket( 1b
295 session,
296 trigger,
297 bucketing_key=tuple(bucket.bucketing_key),
298 start=start,
299 end=end,
300 count=count,
301 )
304async def fire(session: AsyncSession, firing: Firing) -> None: 1c
305 if isinstance(firing.trigger.parent, Automation): 305 ↛ 307line 305 didn't jump to line 307 because the condition on line 305 was always true1ba
306 await act(firing) 1ba
307 elif isinstance(firing.trigger.parent, CompositeTrigger):
308 await evaluate_composite_trigger(session, firing)
309 else:
310 raise NotImplementedError(
311 f"Cannot fire {firing} with parent trigger type {type(firing.trigger.parent)}"
312 )
315async def evaluate_composite_trigger(session: AsyncSession, firing: Firing) -> None: 1c
316 automation = firing.trigger.automation
318 assert isinstance(firing.trigger.parent, CompositeTrigger)
319 trigger: CompositeTrigger = firing.trigger.parent
321 # If we only need to see 1 child firing,
322 # then the parent trigger can fire immediately.
323 if trigger.num_expected_firings == 1:
324 logger.info(
325 "Automation %s (%r) %s trigger %s fired (shortcut)",
326 automation.id,
327 automation.name,
328 trigger.type,
329 trigger.id,
330 extra={
331 "automation": automation.id,
332 "trigger": trigger.id,
333 "trigger_type": trigger.type,
334 "firings": firing.id,
335 },
336 )
337 await fire(
338 session,
339 Firing(
340 trigger=trigger,
341 trigger_states={TriggerState.Triggered},
342 triggered=prefect.types._datetime.now("UTC"),
343 triggering_firings=[firing],
344 triggering_event=firing.triggering_event,
345 ),
346 )
347 return
349 # If we're only looking within a certain time horizon, remove any older firings that
350 # should no longer be considered as satisfying this trigger
351 if trigger.within is not None:
352 await clear_old_child_firings(
353 session, trigger, firing.triggered - trigger.within
354 )
356 # Otherwise we need N child firings. We'll upsert this firing and then check
357 # what the current state of the world is. If we have enough firings, we'll
358 # fire the parent trigger.
359 await upsert_child_firing(session, firing)
360 firings: list[Firing] = [
361 cf.child_firing for cf in await get_child_firings(session, trigger)
362 ]
363 firing_ids: set[UUID] = {f.id for f in firings}
365 # If our current firing no longer exists when we read firings
366 # another firing has superseded it, and we should defer to that one
367 if firing.id not in firing_ids:
368 return
370 if trigger.ready_to_fire(firings):
371 logger.info(
372 "Automation %s (%r) %s trigger %s fired",
373 automation.id,
374 automation.name,
375 trigger.type,
376 trigger.id,
377 extra={
378 "automation": automation.id,
379 "trigger": trigger.id,
380 "trigger_type": trigger.type,
381 "firings": ",".join(str(f.id) for f in firings),
382 },
383 )
385 # clear by firing id
386 await clear_child_firings(session, trigger, firing_ids=list(firing_ids))
388 await fire(
389 session,
390 Firing(
391 trigger=trigger,
392 trigger_states={TriggerState.Triggered},
393 triggered=prefect.types._datetime.now("UTC"),
394 triggering_firings=firings,
395 triggering_event=firing.triggering_event,
396 ),
397 )
400async def act(firing: Firing) -> None: 1c
401 """Given a Automation that has been triggered, the triggering labels and event
402 (if there was one), publish an action for the `actions` service to process."""
403 automation = firing.trigger.automation 1ba
405 state_change_events: Dict[TriggerState, ReceivedEvent] = { 1ba
406 trigger_state: firing.trigger.create_automation_state_change_event(
407 firing=firing,
408 trigger_state=trigger_state,
409 )
410 for trigger_state in sorted(firing.trigger_states, key=list(TriggerState).index)
411 }
412 await messaging.publish(state_change_events.values()) 1ba
414 # By default, all `automation.actions` are fired
415 source_actions: List[Tuple[Optional[ReceivedEvent], ServerActionTypes]] = [ 1ba
416 (firing.triggering_event, action) for action in automation.actions
417 ]
419 # Conditionally add in actions that fire on specific trigger states
420 if TriggerState.Triggered in firing.trigger_states: 420 ↛ 426line 420 didn't jump to line 426 because the condition on line 420 was always true1ba
421 source_actions += [ 1ba
422 (state_change_events[TriggerState.Triggered], action)
423 for action in automation.actions_on_trigger
424 ]
426 if TriggerState.Resolved in firing.trigger_states: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true1ba
427 source_actions += [
428 (state_change_events[TriggerState.Resolved], action)
429 for action in automation.actions_on_resolve
430 ]
432 actions = [ 1ba
433 TriggeredAction(
434 automation=automation,
435 firing=firing,
436 triggered=firing.triggered,
437 triggering_labels=firing.triggering_labels,
438 triggering_event=action_triggering_event,
439 action=action,
440 action_index=index,
441 )
442 for index, (action_triggering_event, action) in enumerate(source_actions)
443 ]
445 async with messaging.create_actions_publisher() as publisher: 1ba
446 for action in actions: 1ba
447 await publisher.publish_data(action.model_dump_json().encode(), {}) 1ba
450__events_clock_lock: Optional[asyncio.Lock] = None 1c
451_events_clock: Optional[float] = None 1c
452_events_clock_updated: Optional[float] = None 1c
455def _events_clock_lock() -> asyncio.Lock: 1c
456 global __events_clock_lock
457 if __events_clock_lock is None: 1febhda
458 __events_clock_lock = asyncio.Lock() 1f
459 return __events_clock_lock 1febhda
462async def update_events_clock(event: ReceivedEvent) -> None: 1c
463 global _events_clock, _events_clock_updated
464 async with _events_clock_lock(): 1ebda
465 # we want the offset to be negative to represent that we are always
466 # processing events behind realtime...
467 now = prefect.types._datetime.now("UTC").timestamp() 1ebda
468 event_timestamp = event.occurred.timestamp() 1ebda
469 offset = event_timestamp - now 1ebda
471 # ...and we should clamp this value to zero so we don't inadvertently look like
472 # we are processing the future
473 if offset > 0.0: 473 ↛ 474line 473 didn't jump to line 474 because the condition on line 473 was never true1ebda
474 event_timestamp = now
476 if not _events_clock or event_timestamp >= _events_clock: 1ebda
477 _events_clock = event_timestamp 1ebda
479 _events_clock_updated = now 1ebda
482async def get_events_clock() -> Optional[float]: 1c
483 global _events_clock
484 return _events_clock 1f
487async def get_events_clock_offset() -> float: 1c
488 """Calculate the current clock offset. This takes into account both the `occurred`
489 of the last event, as well as the time we _saw_ the last event. This helps to
490 ensure that in low volume environments, we don't end up getting huge offsets."""
491 global _events_clock, _events_clock_updated
493 async with _events_clock_lock(): 1febha
494 if _events_clock is None or _events_clock_updated is None: 1febha
495 return 0.0 1fea
497 now: float = prefect.types._datetime.now("UTC").timestamp() 1febha
498 offset = (_events_clock - now) + (now - _events_clock_updated) 1ebha
500 return offset 1febha
503async def reset_events_clock() -> None: 1c
504 global _events_clock, _events_clock_updated
505 async with _events_clock_lock():
506 _events_clock = None
507 _events_clock_updated = None 1f
510async def reactive_evaluation(event: ReceivedEvent, depth: int = 0) -> None: 1c
511 """
512 Evaluate all automations that may apply to this event.
514 Args:
515 event (ReceivedEvent): The event to evaluate. This object contains all the necessary information
516 about the event, including its type, associated resources, and metadata.
517 depth (int, optional): The current recursion depth. This is used to prevent infinite recursion
518 due to cyclic event dependencies. Defaults to 0 and is incremented with
519 each recursive call.
521 """
522 async with AsyncExitStack() as stack: 1ebda
523 await update_events_clock(event) 1ebda
524 await stack.enter_async_context( 1ebda
525 get_triggers_causal_ordering().preceding_event_confirmed(
526 reactive_evaluation, event, depth
527 )
528 )
530 interested_triggers = find_interested_triggers(event) 1ebda
531 if not interested_triggers: 1ebda
532 return 1e
534 for trigger in interested_triggers: 1bda
535 logger.info( 1bda
536 "Automation %s, trigger %s covers event %r (%s) for %r at %r",
537 trigger.automation.id,
538 trigger.id,
539 event.event,
540 event.id,
541 event.resource.id,
542 event.occurred.isoformat(),
543 )
545 bucketing_key = trigger.bucketing_key(event) 1bda
547 async with automations_session(begin_transaction=True) as session: 1bda
548 try: 1bda
549 bucket: Optional["ORMAutomationBucket"] = None 1bda
551 if trigger.after and trigger.starts_after(event.event): 551 ↛ 567line 551 didn't jump to line 567 because the condition on line 551 was never true1bda
552 # When an event matches both the after and expect, each event
553 # can both start a new bucket and increment the bucket that was
554 # started by the previous event. Here we offset the bucket to
555 # start at -1 so that the first event will leave the bucket at 0
556 # after evaluation. See the tests:
557 #
558 # test_same_event_in_expect_and_after_never_reacts_immediately
559 # test_same_event_in_expect_and_after_reacts_after_threshold_is_met
560 # test_same_event_in_expect_and_after_proactively_does_not_fire
561 # test_same_event_in_expect_and_after_proactively_fires
562 #
563 # in test_triggers_regressions.py for examples of how we expect
564 # this to behave.
565 #
566 # https://github.com/PrefectHQ/nebula/issues/4201
567 initial_count = -1 if trigger.expects(event.event) else 0
568 bucket = await ensure_bucket(
569 session,
570 trigger,
571 bucketing_key,
572 start=event.occurred,
573 end=event.occurred + trigger.within,
574 last_event=event,
575 initial_count=initial_count,
576 )
578 if (
579 not bucket
580 and not trigger.after
581 and trigger.expects(event.event)
582 ):
583 # When ensuring a bucket and _creating it for the first time_,
584 # use an old time so that we can catch any other events flowing
585 # through the system at the same time even if they are out of
586 # order. After the trigger fires and creates its next bucket,
587 # time will start from that point forward. We'll use our
588 # preceding event lookback variable as the horizon that we'll
589 # accept these older events.
590 #
591 # https://github.com/PrefectHQ/nebula/issues/7230
592 start = event.occurred - PRECEDING_EVENT_LOOKBACK 1bda
594 bucket = await ensure_bucket( 1bhda
595 session,
596 trigger,
597 bucketing_key=bucketing_key,
598 start=start,
599 end=event.occurred + trigger.within,
600 last_event=event,
601 )
603 if not trigger.expects(event.event): 1bda
604 continue 1bda
606 if not bucket: 1bda
607 bucket = await read_bucket(session, trigger, bucketing_key) 1bda
608 if not bucket: 608 ↛ 611line 608 didn't jump to line 611 because the condition on line 608 was always true1bda
609 continue 1bda
611 await evaluate( 1bda
612 session,
613 trigger,
614 bucket,
615 event.occurred,
616 triggering_event=event,
617 )
618 finally:
619 await session.commit() 1bda
622# retry on operational errors to account for db flakiness with sqlite
623@retry_async_fn(max_attempts=3, retry_on_exceptions=(sa.exc.OperationalError,)) 1c
624async def get_lost_followers() -> List[ReceivedEvent]: 1c
625 """Get followers that have been sitting around longer than our lookback"""
626 return await get_triggers_causal_ordering().get_lost_followers() 1febha
629async def periodic_evaluation(now: prefect.types._datetime.DateTime) -> None: 1c
630 """Periodic tasks that should be run regularly, but not as often as every event"""
631 offset = await get_events_clock_offset() 1febha
632 as_of = now + timedelta(seconds=offset) 1febha
634 logger.debug("Running periodic evaluation as of %s (offset %ss)", as_of, offset) 1febha
636 # Any followers that have been sitting around longer than our lookback are never
637 # going to see their leader event (maybe it was lost or took too long to arrive).
638 # These events can just be evaluated now in the order they occurred.
639 for event in await get_lost_followers(): 639 ↛ 640line 639 didn't jump to line 640 because the loop on line 639 never started1febha
640 await reactive_evaluation(event)
642 async with automations_session() as session: 1febhda
643 await sweep_closed_buckets( 1febhda
644 session,
645 as_of - PREFECT_EVENTS_EXPIRED_BUCKET_BUFFER.value(),
646 )
647 await session.commit() 1febda
650async def evaluate_periodically(periodic_granularity: timedelta) -> None: 1c
651 """Runs periodic evaluation on the given interval"""
652 logger.debug( 1f
653 "Starting periodic evaluation task every %s seconds",
654 periodic_granularity.total_seconds(),
655 )
656 while True: 1febha
657 try: 1febha
658 await periodic_evaluation(prefect.types._datetime.now("UTC")) 1febhda
659 except Exception:
660 logger.exception("Error running periodic evaluation")
661 finally:
662 await asyncio.sleep(periodic_granularity.total_seconds()) 1ebhda
665# The currently loaded automations for this shard, organized both by ID and by
666# account and workspace
667automations_by_id: Dict[UUID, Automation] = {} 1c
668triggers: Dict[TriggerID, EventTrigger] = {} 1c
669next_proactive_runs: Dict[TriggerID, prefect.types._datetime.DateTime] = {} 1c
671# This lock governs any changes to the set of loaded automations; any routine that will
672# add/remove automations must be holding this lock when it does so. It's best to use
673# the methods below to access the loaded set of automations.
674__automations_lock: Optional[asyncio.Lock] = None 1c
677def _automations_lock() -> asyncio.Lock: 1c
678 global __automations_lock
679 if __automations_lock is None: 1ieba
680 __automations_lock = asyncio.Lock() 1i
681 return __automations_lock 1ieba
684def find_interested_triggers(event: ReceivedEvent) -> Collection[EventTrigger]: 1c
685 candidates = triggers.values() 1ebda
686 return [trigger for trigger in candidates if trigger.covers(event)] 1ebda
689def load_automation(automation: Optional[Automation]) -> None: 1c
690 """Loads the given automation into memory so that it is available for evaluations"""
691 if not automation: 691 ↛ 692line 691 didn't jump to line 692 because the condition on line 691 was never true1eba
692 return
694 event_triggers = automation.triggers_of_type(EventTrigger) 1eba
696 if not automation.enabled or not event_triggers: 1eba
697 forget_automation(automation.id) 1eba
698 return 1eba
700 automations_by_id[automation.id] = automation 1ea
702 for trigger in event_triggers: 1ea
703 triggers[trigger.id] = trigger 1ea
704 next_proactive_runs.pop(trigger.id, None) 1ea
707def forget_automation(automation_id: UUID) -> None: 1c
708 """Unloads the given automation from memory"""
709 if automation := automations_by_id.pop(automation_id, None): 1eba
710 for trigger in automation.triggers():
711 triggers.pop(trigger.id, None)
712 next_proactive_runs.pop(trigger.id, None)
715async def automation_changed( 1c
716 automation_id: UUID,
717 event: Literal["automation__created", "automation__updated", "automation__deleted"],
718) -> None:
719 async with _automations_lock(): 1ieba
720 if event in ("automation__deleted", "automation__updated"): 1ieba
721 forget_automation(automation_id) 1ea
723 if event in ("automation__created", "automation__updated"): 1ieba
724 async with automations_session() as session: 1ieba
725 automation = await read_automation(session, automation_id) 1ieba
726 load_automation(automation) 1eba
729@db_injector 1c
730async def load_automations(db: PrefectDBInterface, session: AsyncSession): 1c
731 """Loads all automations for the given set of accounts"""
732 query = sa.select(db.Automation) 1c
734 logger.debug("Loading automations") 1c
736 result = await session.execute(query) 1cf
737 for automation in result.scalars().all():
738 load_automation(Automation.model_validate(automation, from_attributes=True))
740 logger.debug(
741 "Loaded %s automations with %s triggers", len(automations_by_id), len(triggers)
742 )
745@db_injector 1c
746async def remove_buckets_exceeding_threshold( 1c
747 db: PrefectDBInterface, session: AsyncSession, trigger: EventTrigger
748):
749 """Deletes bucket where the count has already exceeded the threshold"""
750 assert isinstance(trigger, EventTrigger), repr(trigger) 1ba
751 await session.execute( 1gba
752 sa.delete(db.AutomationBucket).where(
753 db.AutomationBucket.automation_id == trigger.automation.id,
754 db.AutomationBucket.trigger_id == trigger.id,
755 db.AutomationBucket.count >= trigger.threshold,
756 )
757 )
760@db_injector 1c
761async def read_buckets_for_automation( 1c
762 db: PrefectDBInterface,
763 session: AsyncSession,
764 trigger: Trigger,
765 batch_size: int = AUTOMATION_BUCKET_BATCH_SIZE,
766) -> AsyncGenerator["ORMAutomationBucket", None]:
767 """Yields buckets for the given automation and trigger in batches."""
768 offset = 0 1gba
770 while True: 1gba
771 query = ( 1gba
772 sa.select(db.AutomationBucket)
773 .where(
774 db.AutomationBucket.automation_id == trigger.automation.id,
775 db.AutomationBucket.trigger_id == trigger.id,
776 )
777 .order_by(db.AutomationBucket.start)
778 .limit(batch_size)
779 .offset(offset)
780 )
782 result = await session.execute(query) 1gba
783 buckets = result.scalars().all() 1gba
785 if not buckets: 1gba
786 break 1gba
788 for bucket in buckets: 1gba
789 yield bucket 1gba
791 offset += batch_size 1gba
794@db_injector 1c
795async def read_bucket( 1c
796 db: PrefectDBInterface,
797 session: AsyncSession,
798 trigger: Trigger,
799 bucketing_key: Tuple[str, ...],
800) -> Optional["ORMAutomationBucket"]:
801 """Gets the bucket this event would fall into for the given Automation, if there is
802 one currently"""
803 return await read_bucket_by_trigger_id( 1bda
804 session,
805 trigger.automation.id,
806 trigger.id,
807 bucketing_key,
808 )
811@db_injector 1c
812async def read_bucket_by_trigger_id( 1c
813 db: PrefectDBInterface,
814 session: AsyncSession,
815 automation_id: UUID,
816 trigger_id: UUID,
817 bucketing_key: Tuple[str, ...],
818) -> "ORMAutomationBucket | None":
819 """Gets the bucket this event would fall into for the given Automation, if there is
820 one currently"""
821 query = sa.select(db.AutomationBucket).where( 1bhda
822 db.AutomationBucket.automation_id == automation_id,
823 db.AutomationBucket.trigger_id == trigger_id,
824 db.AutomationBucket.bucketing_key == bucketing_key,
825 )
826 result = await session.execute(query) 1gbhda
827 bucket = result.scalars().first() 1bda
828 if bucket: 1bda
829 await session.refresh(bucket) 1bda
831 return bucket 1bda
834@db_injector 1c
835async def increment_bucket( 1c
836 db: PrefectDBInterface,
837 session: AsyncSession,
838 bucket: "ORMAutomationBucket",
839 count: int,
840 last_event: Optional[ReceivedEvent],
841) -> "ORMAutomationBucket":
842 """Adds the given count to the bucket, returning the new bucket"""
843 additional_updates: dict[str, ReceivedEvent] = ( 1bda
844 {"last_event": last_event} if last_event else {}
845 )
846 await session.execute( 1bda
847 db.queries.insert(db.AutomationBucket)
848 .values(
849 automation_id=bucket.automation_id,
850 trigger_id=bucket.trigger_id,
851 bucketing_key=bucket.bucketing_key,
852 start=bucket.start,
853 end=bucket.end,
854 count=count,
855 last_operation="increment_bucket[insert]",
856 )
857 .on_conflict_do_update(
858 index_elements=[
859 db.AutomationBucket.automation_id,
860 db.AutomationBucket.trigger_id,
861 db.AutomationBucket.bucketing_key,
862 ],
863 set_=dict(
864 count=db.AutomationBucket.count + count,
865 last_operation="increment_bucket[update]",
866 updated=prefect.types._datetime.now("UTC"),
867 **additional_updates,
868 ),
869 )
870 )
872 read_bucket = await read_bucket_by_trigger_id( 1bda
873 session,
874 bucket.automation_id,
875 bucket.trigger_id,
876 tuple(bucket.bucketing_key),
877 )
879 if TYPE_CHECKING: 879 ↛ 880line 879 didn't jump to line 880 because the condition on line 879 was never true1bda
880 assert read_bucket is not None
882 return read_bucket 1bda
885@db_injector 1c
886async def start_new_bucket( 1c
887 db: PrefectDBInterface,
888 session: AsyncSession,
889 trigger: EventTrigger,
890 bucketing_key: Tuple[str, ...],
891 start: prefect.types._datetime.DateTime,
892 end: prefect.types._datetime.DateTime,
893 count: int,
894 triggered_at: Optional[prefect.types._datetime.DateTime] = None,
895 last_event: Optional[ReceivedEvent] = None,
896) -> "ORMAutomationBucket":
897 """Ensures that a bucket with the given start and end exists with the given count,
898 returning the new bucket"""
899 automation = trigger.automation 1ba
901 await session.execute( 1ba
902 db.queries.insert(db.AutomationBucket)
903 .values(
904 automation_id=automation.id,
905 trigger_id=trigger.id,
906 bucketing_key=bucketing_key,
907 start=start,
908 end=end,
909 count=count,
910 last_operation="start_new_bucket[insert]",
911 triggered_at=triggered_at,
912 last_event=last_event,
913 )
914 .on_conflict_do_update(
915 index_elements=[
916 db.AutomationBucket.automation_id,
917 db.AutomationBucket.trigger_id,
918 db.AutomationBucket.bucketing_key,
919 ],
920 set_=dict(
921 start=start,
922 end=end,
923 count=count,
924 last_operation="start_new_bucket[update]",
925 updated=prefect.types._datetime.now("UTC"),
926 triggered_at=triggered_at,
927 last_event=last_event,
928 ),
929 )
930 )
932 read_bucket = await read_bucket_by_trigger_id(
933 session,
934 automation.id,
935 trigger.id,
936 tuple(bucketing_key),
937 )
939 if TYPE_CHECKING:
940 assert read_bucket is not None
942 return read_bucket
945@db_injector 1c
946async def ensure_bucket( 1c
947 db: PrefectDBInterface,
948 session: AsyncSession,
949 trigger: EventTrigger,
950 bucketing_key: Tuple[str, ...],
951 start: prefect.types._datetime.DateTime,
952 end: prefect.types._datetime.DateTime,
953 last_event: Optional[ReceivedEvent],
954 initial_count: int = 0,
955) -> "ORMAutomationBucket":
956 """Ensures that a bucket has been started for the given automation and key,
957 returning the current bucket. Will not modify the existing bucket."""
958 automation = trigger.automation 1bda
959 additional_updates: dict[str, ReceivedEvent] = ( 1bda
960 {"last_event": last_event} if last_event else {}
961 )
962 await session.execute( 1bhda
963 db.queries.insert(db.AutomationBucket)
964 .values(
965 automation_id=automation.id,
966 trigger_id=trigger.id,
967 bucketing_key=bucketing_key,
968 last_event=last_event,
969 start=start,
970 end=end,
971 count=initial_count,
972 last_operation="ensure_bucket[insert]",
973 )
974 .on_conflict_do_update(
975 index_elements=[
976 db.AutomationBucket.automation_id,
977 db.AutomationBucket.trigger_id,
978 db.AutomationBucket.bucketing_key,
979 ],
980 set_=dict(
981 # no-op, but this counts as an update so the query returns a row
982 count=db.AutomationBucket.count,
983 **additional_updates,
984 ),
985 )
986 )
988 read_bucket = await read_bucket_by_trigger_id( 1bhda
989 session, automation.id, trigger.id, tuple(bucketing_key)
990 )
992 if TYPE_CHECKING: 992 ↛ 993line 992 didn't jump to line 993 because the condition on line 992 was never true1bda
993 assert read_bucket is not None
995 return read_bucket 1bda
998@db_injector 1c
999async def remove_bucket( 1c
1000 db: PrefectDBInterface, session: AsyncSession, bucket: "ORMAutomationBucket"
1001):
1002 """Removes the given bucket from the database"""
1003 await session.execute( 1bda
1004 sa.delete(db.AutomationBucket).where(
1005 db.AutomationBucket.automation_id == bucket.automation_id,
1006 db.AutomationBucket.trigger_id == bucket.trigger_id,
1007 db.AutomationBucket.bucketing_key == bucket.bucketing_key,
1008 )
1009 )
1012@db_injector 1c
1013async def sweep_closed_buckets( 1c
1014 db: PrefectDBInterface,
1015 session: AsyncSession,
1016 older_than: prefect.types._datetime.DateTime,
1017) -> None:
1018 await session.execute( 1febhda
1019 sa.delete(db.AutomationBucket).where(db.AutomationBucket.end <= older_than)
1020 )
1023async def reset() -> None: 1c
1024 """Resets the in-memory state of the service"""
1025 await reset_events_clock()
1026 automations_by_id.clear()
1027 triggers.clear()
1028 next_proactive_runs.clear()
1031async def listen_for_automation_changes() -> None: 1c
1032 """
1033 Listens for any changes to automations via PostgreSQL NOTIFY/LISTEN,
1034 and applies those changes to the set of loaded automations.
1035 """
1036 logger.info("Starting automation change listener")
1038 while True:
1039 conn = None
1040 try:
1041 conn = await get_pg_notify_connection()
1042 if not conn:
1043 logger.debug(
1044 "PostgreSQL NOTIFY/LISTEN not available (not using PostgreSQL). "
1045 "Automation changes will not be synchronized across servers."
1046 )
1047 return
1049 logger.info(
1050 f"Listening for automation changes on {AUTOMATION_CHANGES_CHANNEL}"
1051 )
1053 async for payload in pg_listen(
1054 conn,
1055 AUTOMATION_CHANGES_CHANNEL,
1056 heartbeat_interval=get_current_settings().server.services.triggers.pg_notify_heartbeat_interval_seconds,
1057 ):
1058 try:
1059 data = orjson.loads(payload)
1060 automation_id = UUID(data["automation_id"])
1061 event_type = data["event_type"]
1063 logger.info(
1064 f"Received automation change notification: {event_type} for {automation_id}"
1065 )
1067 event_map: dict[str, AutomationChangeEvent] = {
1068 "created": "automation__created",
1069 "updated": "automation__updated",
1070 "deleted": "automation__deleted",
1071 }
1073 if event_type in event_map:
1074 await automation_changed(automation_id, event_map[event_type])
1075 else:
1076 logger.warning(f"Unknown automation event type: {event_type}")
1078 except Exception as e:
1079 logger.error(
1080 f"Error processing automation change notification: {e}",
1081 exc_info=True,
1082 )
1084 except asyncio.CancelledError:
1085 logger.info("Automation change listener cancelled")
1086 break
1087 except Exception as e:
1088 reconnect_seconds = get_current_settings().server.services.triggers.pg_notify_reconnect_interval_seconds
1089 logger.error(
1090 f"Error in automation change listener: {e}. Reconnecting in {reconnect_seconds}s...",
1091 exc_info=True,
1092 )
1093 await asyncio.sleep(reconnect_seconds)
1094 finally:
1095 if conn and not conn.is_closed():
1096 await conn.close()
1099@asynccontextmanager 1c
1100async def consumer( 1c
1101 periodic_granularity: timedelta = timedelta(seconds=5),
1102) -> AsyncGenerator[MessageHandler, None]:
1103 """The `triggers.consumer` processes all Events arriving on the event bus to
1104 determine if they meet the automation criteria, queuing up a corresponding
1105 `TriggeredAction` for the `actions` service if the automation criteria is met."""
1106 # Start the automation change listener task
1107 sync_task = asyncio.create_task(listen_for_automation_changes()) 1c
1109 async with automations_session() as session: 1cf
1110 await load_automations(session) 1cf
1112 proactive_task = asyncio.create_task(evaluate_periodically(periodic_granularity)) 1f
1114 ordering = get_triggers_causal_ordering() 1f
1116 async def message_handler(message: Message): 1f
1117 if not message.data: 1117 ↛ 1118line 1117 didn't jump to line 1118 because the condition on line 1117 was never true1ebda
1118 logger.warning("Message had no data")
1120 return
1122 if not message.attributes: 1122 ↛ 1123line 1122 didn't jump to line 1123 because the condition on line 1122 was never true1ebda
1123 logger.warning("Message had no attributes")
1125 return
1127 if message.attributes.get("event") == "prefect.log.write": 1127 ↛ 1128line 1127 didn't jump to line 1128 because the condition on line 1127 was never true1ebda
1128 return
1130 try: 1ebda
1131 event_id = UUID(message.attributes["id"]) 1ebda
1132 except (KeyError, ValueError, TypeError):
1133 logger.warning(
1134 "Unable to get event ID from message attributes: %s",
1135 repr(message.attributes),
1136 )
1137 return
1139 if await ordering.event_has_been_seen(event_id): 1139 ↛ 1140line 1139 didn't jump to line 1140 because the condition on line 1139 was never true1ebda
1140 return
1142 event = ReceivedEvent.model_validate_json(message.data) 1ebda
1144 try: 1ebda
1145 await reactive_evaluation(event) 1ebhda
1146 except EventArrivedEarly:
1147 pass # it's fine to ACK this message, since it is safe in the DB
1149 try: 1f
1150 logger.debug("Starting reactive evaluation task") 1f
1151 yield message_handler 1f
1152 finally:
1153 sync_task.cancel()
1154 proactive_task.cancel()
1155 # Wait for tasks to finish
1156 await asyncio.gather(sync_task, proactive_task, return_exceptions=True)
1159async def proactive_evaluation( 1c
1160 trigger: EventTrigger, as_of: prefect.types._datetime.DateTime
1161) -> prefect.types._datetime.DateTime:
1162 """The core proactive evaluation operation for a single Automation"""
1163 assert isinstance(trigger, EventTrigger), repr(trigger) 1ba
1164 automation = trigger.automation 1ba
1166 offset = await get_events_clock_offset() 1ba
1167 as_of += timedelta(seconds=offset) 1ba
1169 logger.debug( 1ba
1170 "Evaluating automation %s trigger %s proactively as of %s (offset %ss)",
1171 automation.id,
1172 trigger.id,
1173 as_of,
1174 offset,
1175 )
1177 # By default, the next run will come after the full trigger window, but it
1178 # may be sooner based on the state of the buckets
1179 run_again_at = as_of + trigger.within 1ba
1181 async with automations_session() as session: 1gba
1182 try: 1ba
1183 if not trigger.for_each: 1ba
1184 await ensure_bucket( 1ba
1185 session,
1186 trigger,
1187 bucketing_key=tuple(),
1188 start=as_of,
1189 end=as_of + trigger.within,
1190 last_event=None,
1191 )
1193 # preemptively delete buckets where possible without
1194 # evaluating them in memory
1195 await remove_buckets_exceeding_threshold(session, trigger) 1gba
1197 async for bucket in read_buckets_for_automation(session, trigger): 1gba
1198 next_bucket = await evaluate( 1ba
1199 session, trigger, bucket, as_of, triggering_event=None
1200 )
1201 if next_bucket and as_of < next_bucket.end < run_again_at:
1202 run_again_at = prefect.types._datetime.create_datetime_instance(
1203 next_bucket.end
1204 )
1206 return run_again_at 1ba
1207 finally:
1208 await session.commit() 1gba
1211async def evaluate_proactive_triggers() -> None: 1c
1212 for trigger in list(triggers.values()): 1cegba
1213 if trigger.posture != Posture.Proactive: 1eba
1214 continue 1eba
1216 next_run = next_proactive_runs.get( 1ba
1217 trigger.id, prefect.types._datetime.now("UTC")
1218 )
1219 if next_run > prefect.types._datetime.now("UTC"): 1ba
1220 continue 1ba
1222 try: 1ba
1223 run_again_at = await proactive_evaluation( 1gba
1224 trigger, prefect.types._datetime.now("UTC")
1225 )
1226 logger.debug( 1gba
1227 "Automation %s trigger %s will run again at %s",
1228 trigger.automation.id,
1229 trigger.id,
1230 run_again_at,
1231 )
1232 next_proactive_runs[trigger.id] = run_again_at 1gba
1233 except Exception:
1234 logger.exception(
1235 "Error evaluating automation %s trigger %s proactively",
1236 trigger.automation.id,
1237 trigger.id,
1238 )