Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/actions.py: 24%
704 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2The actions consumer watches for actions that have been triggered by Automations
3and carries them out. Also includes the various concrete subtypes of Actions
4"""
6from __future__ import annotations 1a
8import abc 1a
9import asyncio 1a
10import copy 1a
11from base64 import b64encode 1a
12from contextlib import asynccontextmanager 1a
13from datetime import datetime, timedelta, timezone 1a
14from typing import ( 1a
15 TYPE_CHECKING,
16 Any,
17 AsyncGenerator,
18 Awaitable,
19 Callable,
20 ClassVar,
21 Coroutine,
22 Dict,
23 List,
24 Literal,
25 MutableMapping,
26 Optional,
27 Tuple,
28 Type,
29 Union,
30 cast,
31)
32from uuid import UUID 1a
34import jinja2 1a
35import orjson 1a
36from cachetools import TTLCache 1a
37from httpx import Response 1a
38from pydantic import ( 1a
39 Field,
40 PrivateAttr,
41 ValidationInfo,
42 field_validator,
43 model_validator,
44)
45from typing_extensions import Self, TypeAlias 1a
47from prefect._internal.uuid7 import uuid7 1a
48from prefect.blocks.abstract import NotificationBlock, NotificationError 1a
49from prefect.blocks.core import Block 1a
50from prefect.blocks.webhook import Webhook 1a
51from prefect.logging import get_logger 1a
52from prefect.server.events.clients import ( 1a
53 PrefectServerEventsAPIClient,
54 PrefectServerEventsClient,
55)
56from prefect.server.events.schemas.events import Event, RelatedResource, Resource 1a
57from prefect.server.events.schemas.labelling import LabelDiver 1a
58from prefect.server.schemas.actions import DeploymentFlowRunCreate, StateCreate 1a
59from prefect.server.schemas.core import ( 1a
60 BlockDocument,
61 ConcurrencyLimitV2,
62 Flow,
63 TaskRun,
64 WorkPool,
65)
66from prefect.server.schemas.responses import ( 1a
67 DeploymentResponse,
68 FlowRunResponse,
69 OrchestrationResult,
70 StateAcceptDetails,
71 WorkQueueWithStatus,
72)
73from prefect.server.schemas.states import Scheduled, State, StateType, Suspended 1a
74from prefect.server.utilities.http import should_redact_header 1a
75from prefect.server.utilities.messaging import Message, MessageHandler 1a
76from prefect.server.utilities.schemas import PrefectBaseModel 1a
77from prefect.server.utilities.user_templates import ( 1a
78 TemplateSecurityError,
79 matching_types_in_templates,
80 maybe_template,
81 register_user_template_filters,
82 render_user_template,
83 validate_user_template,
84)
85from prefect.types import DateTime, NonNegativeTimeDelta, StrictVariableValue 1a
86from prefect.types._datetime import now, parse_datetime 1a
87from prefect.utilities.schema_tools.hydration import ( 1a
88 HydrationContext,
89 HydrationError,
90 Placeholder,
91 ValidJinja,
92 WorkspaceVariable,
93 hydrate,
94)
95from prefect.utilities.text import truncated_to 1a
97if TYPE_CHECKING: # pragma: no cover 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true1a
98 import logging
100 from prefect.server.api.clients import OrchestrationClient
101 from prefect.server.events.schemas.automations import TriggeredAction
103 Parameters: TypeAlias = dict[str, Any | dict[str, Any] | list[Any | dict[str, Any]]]
105logger: "logging.Logger" = get_logger(__name__) 1a
108class ActionFailed(Exception): 1a
109 def __init__(self, reason: str): 1a
110 self.reason = reason
113class Action(PrefectBaseModel, abc.ABC): 1a
114 """An Action that may be performed when an Automation is triggered"""
116 type: str 1a
118 # Captures any additional information about the result of the action we'd like to
119 # make available in the payload of the executed or failed events
120 _result_details: Dict[str, Any] = PrivateAttr(default_factory=dict) 1a
121 _resulting_related_resources: List[RelatedResource] = PrivateAttr( 1a
122 default_factory=list
123 )
125 @abc.abstractmethod 1a
126 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
127 """Perform the requested Action"""
129 async def fail(self, triggered_action: "TriggeredAction", reason: str) -> None: 1a
130 from prefect.server.events.schemas.automations import EventTrigger
132 automation = triggered_action.automation
133 action = triggered_action.action
134 action_index = triggered_action.action_index
136 automation_resource_id = f"prefect.automation.{automation.id}"
138 action_details = {
139 "action_index": action_index,
140 "action_type": action.type,
141 "invocation": str(triggered_action.id),
142 }
143 resource = Resource(
144 {
145 "prefect.resource.id": automation_resource_id,
146 "prefect.resource.name": automation.name,
147 "prefect.trigger-type": automation.trigger.type,
148 }
149 )
150 if isinstance(automation.trigger, EventTrigger):
151 resource["prefect.posture"] = automation.trigger.posture
153 logger.warning(
154 "Action failed: %r",
155 reason,
156 extra={**self.logging_context(triggered_action)},
157 )
159 async with PrefectServerEventsClient() as events:
160 triggered_event_id = uuid7()
161 # Link to the triggering event if available and recent to establish causal chain.
162 # Only set follows if timing is tight (within 5 minutes) to avoid unnecessary
163 # waiting at CausalOrdering when events arrive >15 min after their follows event.
164 follows_id = None
165 if (
166 triggered_action.triggering_event
167 and triggered_action.triggering_event.occurred
168 ):
169 time_since_trigger = (
170 triggered_action.triggered
171 - triggered_action.triggering_event.occurred
172 )
173 TIGHT_TIMING = timedelta(minutes=5)
174 if abs(time_since_trigger) < TIGHT_TIMING:
175 follows_id = triggered_action.triggering_event.id
177 # Build related resources including triggering event reference
178 related_resources = list(self._resulting_related_resources)
179 if triggered_action.triggering_event:
180 related_resources.append(
181 RelatedResource(
182 {
183 "prefect.resource.id": f"prefect.event.{triggered_action.triggering_event.id}",
184 "prefect.resource.role": "triggering-event",
185 }
186 )
187 )
188 await events.emit(
189 Event(
190 occurred=triggered_action.triggered,
191 event="prefect.automation.action.triggered",
192 resource=resource,
193 related=related_resources,
194 payload=action_details,
195 id=triggered_event_id,
196 follows=follows_id,
197 )
198 )
199 await events.emit(
200 Event(
201 occurred=now("UTC"),
202 event="prefect.automation.action.failed",
203 resource=resource,
204 related=self._resulting_related_resources,
205 payload={
206 **action_details,
207 "reason": reason,
208 **self._result_details,
209 },
210 follows=triggered_event_id,
211 id=uuid7(),
212 )
213 )
215 async def succeed(self, triggered_action: "TriggeredAction") -> None: 1a
216 from prefect.server.events.schemas.automations import EventTrigger
218 automation = triggered_action.automation
219 action = triggered_action.action
220 action_index = triggered_action.action_index
222 automation_resource_id = f"prefect.automation.{automation.id}"
224 action_details = {
225 "action_index": action_index,
226 "action_type": action.type,
227 "invocation": str(triggered_action.id),
228 }
229 resource = Resource(
230 {
231 "prefect.resource.id": automation_resource_id,
232 "prefect.resource.name": automation.name,
233 "prefect.trigger-type": automation.trigger.type,
234 }
235 )
236 if isinstance(automation.trigger, EventTrigger):
237 resource["prefect.posture"] = automation.trigger.posture
239 async with PrefectServerEventsClient() as events:
240 triggered_event_id = uuid7()
241 # Link to the triggering event if available and recent to establish causal chain.
242 # Only set follows if timing is tight (within 5 minutes) to avoid unnecessary
243 # waiting at CausalOrdering when events arrive >15 min after their follows event.
244 follows_id = None
245 if (
246 triggered_action.triggering_event
247 and triggered_action.triggering_event.occurred
248 ):
249 time_since_trigger = (
250 triggered_action.triggered
251 - triggered_action.triggering_event.occurred
252 )
253 TIGHT_TIMING = timedelta(minutes=5)
254 if abs(time_since_trigger) < TIGHT_TIMING:
255 follows_id = triggered_action.triggering_event.id
257 # Build related resources including triggering event reference
258 related_resources = list(self._resulting_related_resources)
259 if triggered_action.triggering_event:
260 related_resources.append(
261 RelatedResource(
262 {
263 "prefect.resource.id": f"prefect.event.{triggered_action.triggering_event.id}",
264 "prefect.resource.role": "triggering-event",
265 }
266 )
267 )
268 await events.emit(
269 Event(
270 occurred=triggered_action.triggered,
271 event="prefect.automation.action.triggered",
272 resource=Resource(
273 {
274 "prefect.resource.id": automation_resource_id,
275 "prefect.resource.name": automation.name,
276 "prefect.trigger-type": automation.trigger.type,
277 }
278 ),
279 related=related_resources,
280 payload=action_details,
281 id=triggered_event_id,
282 follows=follows_id,
283 )
284 )
285 await events.emit(
286 Event(
287 occurred=now("UTC"),
288 event="prefect.automation.action.executed",
289 resource=Resource(
290 {
291 "prefect.resource.id": automation_resource_id,
292 "prefect.resource.name": automation.name,
293 "prefect.trigger-type": automation.trigger.type,
294 }
295 ),
296 related=self._resulting_related_resources,
297 payload={
298 **action_details,
299 **self._result_details,
300 },
301 id=uuid7(),
302 follows=triggered_event_id,
303 )
304 )
306 def logging_context(self, triggered_action: "TriggeredAction") -> Dict[str, Any]: 1a
307 """Common logging context for all actions"""
308 return {
309 "automation": str(triggered_action.automation.id),
310 "action": self.model_dump(mode="json"),
311 "triggering_event": (
312 {
313 "id": triggered_action.triggering_event.id,
314 "event": triggered_action.triggering_event.event,
315 }
316 if triggered_action.triggering_event
317 else None
318 ),
319 "triggering_labels": triggered_action.triggering_labels,
320 }
323class DoNothing(Action): 1a
324 """Do nothing when an Automation is triggered"""
326 type: Literal["do-nothing"] = "do-nothing" 1a
328 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
329 logger.info(
330 "Doing nothing",
331 extra={**self.logging_context(triggered_action)},
332 )
335class EmitEventAction(Action): 1a
336 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
337 event = await self.create_event(triggered_action)
339 self._result_details["emitted_event"] = str(event.id)
341 async with PrefectServerEventsClient() as events:
342 await events.emit(event)
344 @abc.abstractmethod 1a
345 async def create_event(self, triggered_action: "TriggeredAction") -> "Event": 1a
346 """Create an event from the TriggeredAction"""
349class ExternalDataAction(Action): 1a
350 """Base class for Actions that require data from an external source such as
351 the Orchestration API"""
353 async def orchestration_client( 1a
354 self, triggered_action: "TriggeredAction"
355 ) -> "OrchestrationClient":
356 from prefect.server.api.clients import OrchestrationClient
358 return OrchestrationClient(
359 additional_headers={
360 "Prefect-Automation-ID": str(triggered_action.automation.id),
361 "Prefect-Automation-Name": (
362 b64encode(triggered_action.automation.name.encode()).decode()
363 ),
364 },
365 )
367 async def events_api_client( 1a
368 self, triggered_action: "TriggeredAction"
369 ) -> PrefectServerEventsAPIClient:
370 return PrefectServerEventsAPIClient(
371 additional_headers={
372 "Prefect-Automation-ID": str(triggered_action.automation.id),
373 "Prefect-Automation-Name": (
374 b64encode(triggered_action.automation.name.encode()).decode()
375 ),
376 },
377 )
379 def reason_from_response(self, response: Response) -> str: 1a
380 error_detail = None
381 if response.status_code in {409, 422}:
382 try:
383 error_detail = response.json().get("detail")
384 except Exception:
385 pass
387 if response.status_code == 422 or error_detail:
388 return f"Validation error occurred for {self.type!r}" + (
389 f" - {error_detail}" if error_detail else ""
390 )
391 else:
392 return f"Conflict (409) occurred for {self.type!r} - {error_detail or response.text!r}"
393 else:
394 return (
395 f"Unexpected status from {self.type!r} action: {response.status_code}"
396 )
399def _first_resource_of_kind(event: "Event", expected_kind: str) -> Optional["Resource"]: 1a
400 for resource in event.involved_resources:
401 kind, _, _ = resource.id.rpartition(".")
402 if kind == expected_kind:
403 return resource
405 return None
408def _kind_and_id_from_resource( 1a
409 resource: Resource,
410) -> tuple[str, UUID] | tuple[None, None]:
411 kind, _, id = resource.id.rpartition(".")
413 try:
414 return kind, UUID(id)
415 except ValueError:
416 pass
418 return None, None
421def _id_from_resource_id(resource_id: str, expected_kind: str) -> Optional[UUID]: 1a
422 kind, _, id = resource_id.rpartition(".")
423 if kind == expected_kind:
424 try:
425 return UUID(id)
426 except ValueError:
427 pass
428 return None
431def _id_of_first_resource_of_kind(event: "Event", expected_kind: str) -> Optional[UUID]: 1a
432 resource = _first_resource_of_kind(event, expected_kind)
433 if resource:
434 if id := _id_from_resource_id(resource.id, expected_kind):
435 return id
436 return None
439WorkspaceVariables: TypeAlias = Dict[str, StrictVariableValue] 1a
440TemplateContextObject: TypeAlias = Union[PrefectBaseModel, WorkspaceVariables, None] 1a
443class JinjaTemplateAction(ExternalDataAction): 1a
444 """Base class for Actions that use Jinja templates supplied by the user and
445 are rendered with a context containing data from the triggered action,
446 and the orchestration API."""
448 _object_cache: Dict[str, TemplateContextObject] = PrivateAttr(default_factory=dict) 1a
450 _registered_filters: ClassVar[bool] = False 1a
452 @classmethod 1a
453 def _register_filters_if_needed(cls) -> None: 1a
454 if not cls._registered_filters:
455 # Register our event-related filters
456 from prefect.server.events.jinja_filters import all_filters
458 register_user_template_filters(all_filters)
459 cls._registered_filters = True
461 @classmethod 1a
462 def validate_template(cls, template: str, field_name: str) -> str: 1a
463 cls._register_filters_if_needed()
465 try:
466 validate_user_template(template)
467 except (jinja2.exceptions.TemplateSyntaxError, TemplateSecurityError) as exc:
468 raise ValueError(f"{field_name!r} is not a valid template: {exc}")
470 return template
472 @classmethod 1a
473 def templates_in_dictionary( 1a
474 cls, dict_: dict[Any, Any | dict[Any, Any]]
475 ) -> list[tuple[dict[Any, Any], dict[Any, str]]]:
476 to_traverse: list[dict[Any, Any]] = []
477 templates_at_layer: dict[Any, str] = {}
478 for key, value in dict_.items():
479 if isinstance(value, str) and maybe_template(value):
480 templates_at_layer[key] = value
481 elif isinstance(value, dict):
482 to_traverse.append(value)
484 templates: list[tuple[dict[Any, Any], dict[Any, str]]] = []
486 if templates_at_layer:
487 templates.append((dict_, templates_at_layer))
489 for item in to_traverse:
490 templates += cls.templates_in_dictionary(item)
492 return templates
494 def instantiate_object( 1a
495 self,
496 model: Type[PrefectBaseModel],
497 data: Dict[str, Any],
498 triggered_action: "TriggeredAction",
499 resource: Optional["Resource"] = None,
500 ) -> PrefectBaseModel:
501 object = model.model_validate(data)
503 if isinstance(object, FlowRunResponse) or isinstance(object, TaskRun):
504 # The flow/task run was fetched from the API, but between when its
505 # state changed and now it's possible that the state in the API has
506 # changed again from what's contained in the event. Use the event's
507 # data to rebuild the state object and attach it to the object
508 # received from the API.
509 # https://github.com/PrefectHQ/nebula/issues/3310
510 state_fields = [
511 "prefect.state-message",
512 "prefect.state-name",
513 "prefect.state-timestamp",
514 "prefect.state-type",
515 ]
517 if resource and all(field in resource for field in state_fields):
518 try:
519 timestamp = parse_datetime(resource["prefect.state-timestamp"])
520 if TYPE_CHECKING:
521 assert isinstance(timestamp, DateTime)
522 object.state = State(
523 message=resource["prefect.state-message"],
524 name=resource["prefect.state-name"],
525 timestamp=timestamp,
526 type=StateType(resource["prefect.state-type"]),
527 )
528 except Exception:
529 logger.exception(
530 "Failed to parse state from event resource",
531 extra={
532 **self.logging_context(triggered_action),
533 },
534 )
536 return object
538 async def _get_object_from_prefect_api( 1a
539 self,
540 orchestration_client: "OrchestrationClient",
541 triggered_action: "TriggeredAction",
542 resource: Optional["Resource"],
543 ) -> Optional[PrefectBaseModel]:
544 if not resource:
545 return None
547 kind, obj_id = _kind_and_id_from_resource(resource)
549 if not obj_id:
550 return None
552 kind_to_model_and_methods: Dict[
553 str,
554 Tuple[
555 Type[PrefectBaseModel],
556 List[Callable[..., Coroutine[Any, Any, Response]]],
557 ],
558 ] = {
559 "prefect.deployment": (
560 DeploymentResponse,
561 [orchestration_client.read_deployment_raw],
562 ),
563 "prefect.flow": (Flow, [orchestration_client.read_flow_raw]),
564 "prefect.flow-run": (
565 FlowRunResponse,
566 [orchestration_client.read_flow_run_raw],
567 ),
568 "prefect.task-run": (TaskRun, [orchestration_client.read_task_run_raw]),
569 "prefect.work-pool": (
570 WorkPool,
571 [orchestration_client.read_work_pool_raw],
572 ),
573 "prefect.work-queue": (
574 WorkQueueWithStatus,
575 [
576 orchestration_client.read_work_queue_raw,
577 orchestration_client.read_work_queue_status_raw,
578 ],
579 ),
580 "prefect.concurrency-limit": (
581 ConcurrencyLimitV2,
582 [orchestration_client.read_concurrency_limit_v2_raw],
583 ),
584 }
586 if kind not in kind_to_model_and_methods:
587 return None
589 model, client_methods = kind_to_model_and_methods[kind]
591 responses = await asyncio.gather(
592 *[client_method(obj_id) for client_method in client_methods]
593 )
595 if any(response.status_code >= 300 for response in responses):
596 return None
598 combined_response: dict[Any, Any] = {}
599 for response in responses:
600 data: Any | list[Any] = response.json()
602 # Sometimes we have to call filter endpoints that return a list of 0..1
603 if isinstance(data, list):
604 if len(data) == 0:
605 return None
606 data = data[0]
608 combined_response.update(data)
610 return self.instantiate_object(
611 model, combined_response, triggered_action, resource=resource
612 )
614 async def _relevant_native_objects( 1a
615 self, templates: List[str], triggered_action: "TriggeredAction"
616 ) -> Dict[str, TemplateContextObject]:
617 if not triggered_action.triggering_event:
618 return {}
620 orchestration_types = {
621 "deployment",
622 "flow",
623 "flow_run",
624 "task_run",
625 "work_pool",
626 "work_queue",
627 "concurrency_limit",
628 }
629 special_types = {"variables"}
631 types = matching_types_in_templates(
632 templates, types=orchestration_types | special_types
633 )
634 if not types:
635 return {}
637 needed_types = list(set(types) - set(self._object_cache.keys()))
639 async with await self.orchestration_client(triggered_action) as orchestration:
640 calls: List[Awaitable[TemplateContextObject]] = []
641 for type_ in needed_types:
642 if type_ in orchestration_types:
643 calls.append(
644 self._get_object_from_prefect_api(
645 orchestration,
646 triggered_action,
647 _first_resource_of_kind(
648 triggered_action.triggering_event,
649 f"prefect.{type_.replace('_', '-')}",
650 ),
651 )
652 )
653 elif type_ == "variables":
654 calls.append(orchestration.read_workspace_variables())
656 objects = await asyncio.gather(*calls)
658 self._object_cache.update(dict(zip(needed_types, objects)))
660 return self._object_cache
662 async def _template_context( 1a
663 self, templates: List[str], triggered_action: "TriggeredAction"
664 ) -> dict[str, Any]:
665 context: dict[str, Any] = {
666 "automation": triggered_action.automation,
667 "event": triggered_action.triggering_event,
668 "labels": LabelDiver(triggered_action.triggering_labels),
669 "firing": triggered_action.firing,
670 "firings": triggered_action.all_firings(),
671 "events": triggered_action.all_events(),
672 }
673 context.update(await self._relevant_native_objects(templates, triggered_action))
674 return context
676 async def _render( 1a
677 self, templates: List[str], triggered_action: "TriggeredAction"
678 ) -> List[str]:
679 self._register_filters_if_needed()
681 context = await self._template_context(templates, triggered_action)
683 return await asyncio.gather(
684 *[render_user_template(template, context) for template in templates]
685 )
688class DeploymentAction(Action): 1a
689 """Base class for Actions that operate on Deployments and need to infer them from
690 events"""
692 source: Literal["selected", "inferred"] = Field( 1a
693 "selected",
694 description=(
695 "Whether this Action applies to a specific selected "
696 "deployment (given by `deployment_id`), or to a deployment that is "
697 "inferred from the triggering event. If the source is 'inferred', "
698 "the `deployment_id` may not be set. If the source is 'selected', the "
699 "`deployment_id` must be set."
700 ),
701 )
702 deployment_id: Optional[UUID] = Field( 1a
703 None, description="The identifier of the deployment"
704 )
706 @model_validator(mode="after") 1a
707 def selected_deployment_requires_id(self) -> Self: 1a
708 wants_selected_deployment = self.source == "selected"
709 has_deployment_id = bool(self.deployment_id)
710 if wants_selected_deployment != has_deployment_id:
711 raise ValueError(
712 "deployment_id is "
713 + ("not allowed" if has_deployment_id else "required")
714 )
715 return self
717 async def deployment_id_to_use(self, triggered_action: "TriggeredAction") -> UUID: 1a
718 if self.source == "selected":
719 assert self.deployment_id
720 return self.deployment_id
722 event = triggered_action.triggering_event
723 if not event:
724 raise ActionFailed("No event to infer the deployment")
726 assert event
727 if id := _id_of_first_resource_of_kind(event, "prefect.deployment"):
728 return id
730 raise ActionFailed("No deployment could be inferred")
733class DeploymentCommandAction(DeploymentAction, ExternalDataAction): 1a
734 """Executes a command against a matching deployment"""
736 _action_description: ClassVar[str] 1a
738 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
739 deployment_id = await self.deployment_id_to_use(triggered_action)
741 self._resulting_related_resources.append(
742 RelatedResource.model_validate(
743 {
744 "prefect.resource.id": f"prefect.deployment.{deployment_id}",
745 "prefect.resource.role": "target",
746 }
747 )
748 )
750 logger.info(
751 self._action_description,
752 extra={
753 "deployment_id": deployment_id,
754 **self.logging_context(triggered_action),
755 },
756 )
758 async with await self.orchestration_client(triggered_action) as orchestration:
759 response = await self.command(
760 orchestration, deployment_id, triggered_action
761 )
763 self._result_details["status_code"] = response.status_code
764 if response.status_code >= 300:
765 raise ActionFailed(self.reason_from_response(response))
767 @abc.abstractmethod 1a
768 async def command( 1a
769 self,
770 orchestration: "OrchestrationClient",
771 deployment_id: UUID,
772 triggered_action: "TriggeredAction",
773 ) -> Response:
774 """Execute the deployment command"""
777class RunDeployment(JinjaTemplateAction, DeploymentCommandAction): 1a
778 """Runs the given deployment with the given parameters"""
780 type: Literal["run-deployment"] = "run-deployment" 1a
782 parameters: Optional[Dict[str, Any]] = Field( 1a
783 None,
784 description=(
785 "The parameters to pass to the deployment, or None to use the "
786 "deployment's default parameters"
787 ),
788 )
789 job_variables: Optional[Dict[str, Any]] = Field( 1a
790 None,
791 description=(
792 "The job variables to pass to the created flow run, or None "
793 "to use the deployment's default job variables"
794 ),
795 )
796 schedule_after: NonNegativeTimeDelta = Field( 1a
797 default_factory=lambda: timedelta(0),
798 description=(
799 "The amount of time to wait before running the deployment. "
800 "Defaults to running the deployment immediately."
801 ),
802 )
804 _action_description: ClassVar[str] = "Running deployment" 1a
806 async def command( 1a
807 self,
808 orchestration: "OrchestrationClient",
809 deployment_id: UUID,
810 triggered_action: "TriggeredAction",
811 ) -> Response:
812 # Calculate when to schedule the deployment
813 scheduled_time = datetime.now(timezone.utc) + self.schedule_after
814 state = Scheduled(scheduled_time=scheduled_time)
816 try:
817 flow_run_create = DeploymentFlowRunCreate( # type: ignore
818 state=StateCreate(
819 type=state.type,
820 name=state.name,
821 message=state.message,
822 state_details=state.state_details,
823 ),
824 parameters=await self.render_parameters(triggered_action),
825 idempotency_key=triggered_action.idempotency_key(),
826 job_variables=self.job_variables,
827 )
828 except Exception as exc:
829 raise ActionFailed(f"Unable to create flow run from deployment: {exc!r}")
831 response = await orchestration.create_flow_run(deployment_id, flow_run_create)
833 if response.status_code < 300:
834 flow_run = FlowRunResponse.model_validate(response.json())
836 self._resulting_related_resources.append(
837 RelatedResource.model_validate(
838 {
839 "prefect.resource.id": f"prefect.flow-run.{flow_run.id}",
840 "prefect.resource.role": "flow-run",
841 "prefect.resource.name": flow_run.name,
842 }
843 )
844 )
846 logger.info(
847 "Started flow run",
848 extra={
849 "flow_run": {
850 "id": str(flow_run.id),
851 "name": flow_run.name,
852 },
853 **self.logging_context(triggered_action),
854 },
855 )
857 if response.status_code == 409:
858 self._result_details["validation_error"] = response.json().get("detail")
860 return response
862 @field_validator("parameters") 1a
863 def validate_parameters(cls, value: dict[str, Any] | None) -> dict[str, Any] | None: 1a
864 if not value:
865 return value
867 for_testing = copy.deepcopy(value) or {}
868 cls._upgrade_v1_templates(for_testing)
870 problems = cls._collect_errors(
871 hydrate(
872 for_testing,
873 HydrationContext(
874 raise_on_error=False,
875 render_workspace_variables=False,
876 render_jinja=False,
877 ),
878 )
879 )
880 if not problems:
881 return value
883 raise ValueError(
884 "Invalid parameters: \n"
885 + "\n ".join(
886 f"{k + ':' if k else ''} {e.message}" for k, e in problems.items()
887 )
888 )
890 @classmethod 1a
891 def _collect_errors( 1a
892 cls,
893 hydrated: Union[dict[str, Any | dict[str, Any] | list[Any]], Placeholder],
894 prefix: str = "",
895 ) -> dict[str, HydrationError]:
896 problems: dict[str, HydrationError] = {}
898 if isinstance(hydrated, HydrationError):
899 problems[prefix] = hydrated
901 if isinstance(hydrated, Placeholder):
902 return problems
904 for key, value in hydrated.items():
905 if isinstance(value, dict):
906 problems.update(cls._collect_errors(value, f"{prefix}{key}."))
907 elif isinstance(value, list):
908 for item, index in enumerate(value):
909 if isinstance(item, dict):
910 problems.update(
911 cls._collect_errors(item, f"{prefix}{key}[{index}].")
912 )
913 elif isinstance(item, HydrationError):
914 problems[f"{prefix}{key}[{index}]"] = item
915 elif isinstance(value, HydrationError):
916 problems[f"{prefix}{key}"] = value
918 return problems
920 async def render_parameters( 1a
921 self, triggered_action: "TriggeredAction"
922 ) -> Dict[str, Any]:
923 parameters = copy.deepcopy(self.parameters) or {}
925 # pre-process the parameters to upgrade any v1-style template values to v2
926 self._upgrade_v1_templates(parameters)
928 # first-pass, hydrate parameters without rendering in order to collect all of
929 # the embedded Jinja templates, workspace variables, etc
930 placeholders = self._collect_placeholders(
931 hydrate(
932 parameters,
933 HydrationContext(
934 raise_on_error=False,
935 render_jinja=False,
936 render_workspace_variables=False,
937 ),
938 )
939 )
941 # collect all templates so we can build up the context variables they need
942 templates = [p.template for p in placeholders if isinstance(p, ValidJinja)]
943 template_context = await self._template_context(templates, triggered_action)
945 # collect any referenced workspace variables so we can fetch them
946 variable_names = [
947 p.variable_name for p in placeholders if isinstance(p, WorkspaceVariable)
948 ]
949 workspace_variables: Dict[str, StrictVariableValue] = {}
950 if variable_names:
951 async with await self.orchestration_client(triggered_action) as client:
952 workspace_variables = await client.read_workspace_variables(
953 variable_names
954 )
956 # second-pass, render the parameters with the full context
957 parameters = hydrate(
958 parameters,
959 HydrationContext(
960 raise_on_error=True,
961 render_jinja=True,
962 jinja_context=template_context,
963 render_workspace_variables=True,
964 workspace_variables=workspace_variables,
965 ),
966 )
968 return parameters
970 @classmethod 1a
971 def _upgrade_v1_templates(cls, parameters: Parameters): 1a
972 """
973 Upgrades all v1-style template values from the parameters dictionary, changing
974 the values in the given dictionary. v1-style templates are any plain strings
975 that include Jinja2 template syntax.
976 """
977 for key, value in parameters.items():
978 if isinstance(value, dict):
979 # if we already have a __prefect_kind, don't upgrade or recurse
980 if "__prefect_kind" in value:
981 continue
982 cls._upgrade_v1_templates(value)
983 elif isinstance(value, list):
984 for i, item in enumerate(value):
985 if isinstance(item, dict):
986 cls._upgrade_v1_templates(item)
987 elif isinstance(item, str) and maybe_template(item):
988 value[i] = {"__prefect_kind": "jinja", "template": item}
989 elif isinstance(value, str) and maybe_template(value): # pyright: ignore[reportUnnecessaryIsInstance]
990 parameters[key] = {"__prefect_kind": "jinja", "template": value}
992 def _collect_placeholders( 1a
993 self, parameters: Parameters | Placeholder
994 ) -> list[Placeholder]:
995 """
996 Recursively collects all placeholder values embedded within the parameters
997 dictionary, including templates and workspace variables
998 """
999 placeholders: list[Placeholder] = []
1001 if isinstance(parameters, Placeholder):
1002 return [parameters]
1004 for _, value in parameters.items():
1005 if isinstance(value, dict):
1006 placeholders += self._collect_placeholders(value)
1007 elif isinstance(value, list):
1008 for item in value:
1009 if isinstance(item, dict):
1010 placeholders += self._collect_placeholders(item)
1011 elif isinstance(item, Placeholder):
1012 placeholders.append(item)
1013 elif isinstance(value, Placeholder):
1014 placeholders.append(value)
1015 return placeholders
1018class PauseDeployment(DeploymentCommandAction): 1a
1019 """Pauses the given Deployment"""
1021 type: Literal["pause-deployment"] = "pause-deployment" 1a
1023 _action_description: ClassVar[str] = "Pausing deployment" 1a
1025 async def command( 1a
1026 self,
1027 orchestration: "OrchestrationClient",
1028 deployment_id: UUID,
1029 triggered_action: "TriggeredAction",
1030 ) -> Response:
1031 return await orchestration.pause_deployment(deployment_id)
1034class ResumeDeployment(DeploymentCommandAction): 1a
1035 """Resumes the given Deployment"""
1037 type: Literal["resume-deployment"] = "resume-deployment" 1a
1039 _action_description: ClassVar[str] = "Resuming deployment" 1a
1041 async def command( 1a
1042 self,
1043 orchestration: "OrchestrationClient",
1044 deployment_id: UUID,
1045 triggered_action: "TriggeredAction",
1046 ) -> Response:
1047 return await orchestration.resume_deployment(deployment_id)
1050class FlowRunAction(ExternalDataAction): 1a
1051 """An action that operates on a flow run"""
1053 async def flow_run(self, triggered_action: "TriggeredAction") -> UUID: 1a
1054 # Proactive triggers won't have an event, but they might be tracking
1055 # buckets per-resource, so check for that first
1056 labels = triggered_action.triggering_labels
1057 if triggering_resource_id := labels.get("prefect.resource.id"):
1058 if id := _id_from_resource_id(triggering_resource_id, "prefect.flow-run"):
1059 return id
1061 event = triggered_action.triggering_event
1062 if event:
1063 if id := _id_of_first_resource_of_kind(event, "prefect.flow-run"):
1064 return id
1066 raise ActionFailed("No flow run could be inferred")
1069class FlowRunStateChangeAction(FlowRunAction): 1a
1070 """Changes the state of a flow run associated with the trigger"""
1072 @abc.abstractmethod 1a
1073 async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate: 1a
1074 """Return the new state for the flow run"""
1076 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
1077 flow_run_id = await self.flow_run(triggered_action)
1079 self._resulting_related_resources.append(
1080 RelatedResource.model_validate(
1081 {
1082 "prefect.resource.id": f"prefect.flow-run.{flow_run_id}",
1083 "prefect.resource.role": "target",
1084 }
1085 )
1086 )
1088 logger.info(
1089 "Changing flow run state",
1090 extra={
1091 "flow_run_id": str(flow_run_id),
1092 **self.logging_context(triggered_action),
1093 },
1094 )
1096 async with await self.orchestration_client(triggered_action) as orchestration:
1097 response = await orchestration.set_flow_run_state(
1098 flow_run_id, await self.new_state(triggered_action=triggered_action)
1099 )
1101 self._result_details["status_code"] = response.status_code
1102 if response.status_code >= 300:
1103 raise ActionFailed(self.reason_from_response(response))
1105 result = OrchestrationResult.model_validate(response.json())
1106 if not isinstance(result.details, StateAcceptDetails):
1107 raise ActionFailed(f"Failed to set state: {result.details.reason}")
1110class ChangeFlowRunState(FlowRunStateChangeAction): 1a
1111 """Changes the state of a flow run associated with the trigger"""
1113 type: Literal["change-flow-run-state"] = "change-flow-run-state" 1a
1115 name: Optional[str] = Field( 1a
1116 None,
1117 description="The name of the state to change the flow run to",
1118 )
1119 state: StateType = Field( 1a
1120 ...,
1121 description="The type of the state to change the flow run to",
1122 )
1123 message: Optional[str] = Field( 1a
1124 None,
1125 description="An optional message to associate with the state change",
1126 )
1128 async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate: 1a
1129 message = (
1130 self.message
1131 or f"State changed by Automation {triggered_action.automation.id}"
1132 )
1134 return StateCreate(
1135 name=self.name,
1136 type=self.state,
1137 message=message,
1138 )
1141class CancelFlowRun(FlowRunStateChangeAction): 1a
1142 """Cancels a flow run associated with the trigger"""
1144 type: Literal["cancel-flow-run"] = "cancel-flow-run" 1a
1146 async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate: 1a
1147 return StateCreate(
1148 type=StateType.CANCELLING,
1149 message=f"Cancelled by Automation {triggered_action.automation.id}",
1150 )
1153class SuspendFlowRun(FlowRunStateChangeAction): 1a
1154 """Suspends a flow run associated with the trigger"""
1156 type: Literal["suspend-flow-run"] = "suspend-flow-run" 1a
1158 async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate: 1a
1159 state = Suspended(
1160 timeout_seconds=3600,
1161 message=f"Suspended by Automation {triggered_action.automation.id}",
1162 )
1164 return StateCreate(
1165 type=state.type,
1166 name=state.name,
1167 message=state.message,
1168 state_details=state.state_details,
1169 )
1172class ResumeFlowRun(FlowRunAction): 1a
1173 """Resumes a paused or suspended flow run associated with the trigger"""
1175 type: Literal["resume-flow-run"] = "resume-flow-run" 1a
1177 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
1178 flow_run_id = await self.flow_run(triggered_action)
1180 self._resulting_related_resources.append(
1181 RelatedResource.model_validate(
1182 {
1183 "prefect.resource.id": f"prefect.flow-run.{flow_run_id}",
1184 "prefect.resource.role": "target",
1185 }
1186 )
1187 )
1189 logger.debug(
1190 "Resuming flow run",
1191 extra={
1192 "flow_run_id": str(flow_run_id),
1193 **self.logging_context(triggered_action),
1194 },
1195 )
1197 async with await self.orchestration_client(triggered_action) as orchestration:
1198 result = await orchestration.resume_flow_run(flow_run_id)
1200 if not isinstance(result.details, StateAcceptDetails):
1201 raise ActionFailed(
1202 f"Failed to resume flow run: {result.details.reason}"
1203 )
1206class CallWebhook(JinjaTemplateAction): 1a
1207 """Call a webhook when an Automation is triggered."""
1209 type: Literal["call-webhook"] = "call-webhook" 1a
1210 block_document_id: UUID = Field( 1a
1211 description="The identifier of the webhook block to use"
1212 )
1213 payload: str = Field( 1a
1214 default="",
1215 description="An optional templatable payload to send when calling the webhook.",
1216 )
1218 @field_validator("payload", mode="before") 1a
1219 @classmethod 1a
1220 def ensure_payload_is_a_string( 1a
1221 cls, value: Union[str, Dict[str, Any], None]
1222 ) -> Optional[str]:
1223 """Temporary measure while we migrate payloads from being a dictionary to
1224 a string template. This covers both reading from the database where values
1225 may currently be a dictionary, as well as the API, where older versions of the
1226 frontend may be sending a JSON object with the single `"message"` key."""
1227 if value is None:
1228 return value
1230 if isinstance(value, str):
1231 return value
1233 return orjson.dumps(value, option=orjson.OPT_INDENT_2).decode()
1235 @field_validator("payload") 1a
1236 @classmethod 1a
1237 def validate_payload_templates(cls, value: Optional[str]) -> Optional[str]: 1a
1238 """
1239 Validate user-provided payload template.
1240 """
1241 if not value:
1242 return value
1244 cls.validate_template(value, "payload")
1246 return value
1248 async def _get_webhook_block(self, triggered_action: "TriggeredAction") -> Webhook: 1a
1249 async with await self.orchestration_client(triggered_action) as orchestration:
1250 response = await orchestration.read_block_document_raw(
1251 self.block_document_id
1252 )
1253 if response.status_code >= 300:
1254 raise ActionFailed(self.reason_from_response(response))
1256 try:
1257 block_document = BlockDocument.model_validate(response.json())
1258 block = await _load_block_from_block_document(block_document)
1259 except Exception as e:
1260 raise ActionFailed(f"The webhook block was invalid: {e!r}")
1262 if not isinstance(block, Webhook):
1263 raise ActionFailed("The referenced block was not a webhook block")
1265 self._resulting_related_resources += [
1266 RelatedResource.model_validate(
1267 {
1268 "prefect.resource.id": f"prefect.block-document.{self.block_document_id}",
1269 "prefect.resource.role": "block",
1270 "prefect.resource.name": block_document.name,
1271 }
1272 ),
1273 RelatedResource.model_validate(
1274 {
1275 "prefect.resource.id": f"prefect.block-type.{block.get_block_type_slug()}",
1276 "prefect.resource.role": "block-type",
1277 }
1278 ),
1279 ]
1281 return block
1283 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
1284 block = await self._get_webhook_block(triggered_action=triggered_action)
1286 (payload,) = await self._render([self.payload], triggered_action)
1288 try:
1289 response = await block.call(payload=payload)
1291 ok_headers = {
1292 k: v for k, v in response.headers.items() if not should_redact_header(k)
1293 }
1295 self._result_details.update(
1296 {
1297 "status_code": response.status_code,
1298 "response_body": truncated_to(1000, response.text),
1299 "response_headers": {**(ok_headers or {})},
1300 }
1301 )
1302 except Exception as e:
1303 raise ActionFailed(f"Webhook call failed: {e!r}")
1306class SendNotification(JinjaTemplateAction): 1a
1307 """Send a notification when an Automation is triggered"""
1309 type: Literal["send-notification"] = "send-notification" 1a
1310 block_document_id: UUID = Field( 1a
1311 description="The identifier of the notification block to use"
1312 )
1313 subject: str = Field("Prefect automated notification") 1a
1314 body: str = Field(description="The text of the notification to send") 1a
1316 @field_validator("subject", "body") 1a
1317 def is_valid_template(cls, value: str, info: ValidationInfo) -> str: 1a
1318 if TYPE_CHECKING:
1319 assert isinstance(info.field_name, str)
1320 return cls.validate_template(value, info.field_name)
1322 async def _get_notification_block( 1a
1323 self, triggered_action: "TriggeredAction"
1324 ) -> NotificationBlock:
1325 async with await self.orchestration_client(triggered_action) as orion:
1326 response = await orion.read_block_document_raw(self.block_document_id)
1327 if response.status_code >= 300:
1328 raise ActionFailed(self.reason_from_response(response))
1330 try:
1331 block_document = BlockDocument.model_validate(response.json())
1332 block = await _load_block_from_block_document(block_document)
1333 except Exception as e:
1334 raise ActionFailed(f"The notification block was invalid: {e!r}")
1336 if "notify" not in block.get_block_capabilities():
1337 raise ActionFailed("The referenced block was not a notification block")
1339 self._resulting_related_resources += [
1340 RelatedResource.model_validate(
1341 {
1342 "prefect.resource.id": f"prefect.block-document.{self.block_document_id}",
1343 "prefect.resource.role": "block",
1344 "prefect.resource.name": block_document.name,
1345 }
1346 ),
1347 RelatedResource.model_validate(
1348 {
1349 "prefect.resource.id": f"prefect.block-type.{block.get_block_type_slug()}",
1350 "prefect.resource.role": "block-type",
1351 }
1352 ),
1353 ]
1355 return cast(NotificationBlock, block)
1357 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
1358 block = await self._get_notification_block(triggered_action=triggered_action)
1360 subject, body = await self.render(triggered_action)
1362 with block.raise_on_failure():
1363 try:
1364 await block.notify(subject=subject, body=body)
1365 except NotificationError as e:
1366 self._result_details["notification_log"] = e.log
1367 raise ActionFailed("Notification failed")
1369 async def render(self, triggered_action: "TriggeredAction") -> List[str]: 1a
1370 return await self._render([self.subject, self.body], triggered_action)
1373class WorkPoolAction(Action): 1a
1374 """Base class for Actions that operate on Work Pools and need to infer them from
1375 events"""
1377 source: Literal["selected", "inferred"] = Field( 1a
1378 "selected",
1379 description=(
1380 "Whether this Action applies to a specific selected "
1381 "work pool (given by `work_pool_id`), or to a work pool that is "
1382 "inferred from the triggering event. If the source is 'inferred', "
1383 "the `work_pool_id` may not be set. If the source is 'selected', the "
1384 "`work_pool_id` must be set."
1385 ),
1386 )
1387 work_pool_id: Optional[UUID] = Field( 1a
1388 None,
1389 description="The identifier of the work pool to pause",
1390 )
1392 @model_validator(mode="after") 1a
1393 def selected_work_pool_requires_id(self) -> Self: 1a
1394 wants_selected_work_pool = self.source == "selected"
1395 has_work_pool_id = bool(self.work_pool_id)
1396 if wants_selected_work_pool != has_work_pool_id:
1397 raise ValueError(
1398 "work_pool_id is " + ("not allowed" if has_work_pool_id else "required")
1399 )
1400 return self
1402 async def work_pool_id_to_use(self, triggered_action: "TriggeredAction") -> UUID: 1a
1403 if self.source == "selected":
1404 assert self.work_pool_id
1405 return self.work_pool_id
1407 event = triggered_action.triggering_event
1408 if not event:
1409 raise ActionFailed("No event to infer the work pool")
1411 assert event
1412 if id := _id_of_first_resource_of_kind(event, "prefect.work-pool"):
1413 return id
1415 raise ActionFailed("No work pool could be inferred")
1418class WorkPoolCommandAction(WorkPoolAction, ExternalDataAction): 1a
1419 _action_description: ClassVar[str] 1a
1421 _target_work_pool: Optional[WorkPool] = PrivateAttr(default=None) 1a
1423 async def target_work_pool(self, triggered_action: "TriggeredAction") -> WorkPool: 1a
1424 if not self._target_work_pool:
1425 work_pool_id = await self.work_pool_id_to_use(triggered_action)
1427 async with await self.orchestration_client(
1428 triggered_action
1429 ) as orchestration:
1430 work_pool = await orchestration.read_work_pool(work_pool_id)
1432 if not work_pool:
1433 raise ActionFailed(f"Work pool {work_pool_id} not found")
1434 self._target_work_pool = work_pool
1435 return self._target_work_pool
1437 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
1438 work_pool = await self.target_work_pool(triggered_action)
1440 self._resulting_related_resources += [
1441 RelatedResource.model_validate(
1442 {
1443 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}",
1444 "prefect.resource.name": work_pool.name,
1445 "prefect.resource.role": "target",
1446 }
1447 )
1448 ]
1450 logger.info(
1451 self._action_description,
1452 extra={
1453 "work_pool_id": work_pool.id,
1454 **self.logging_context(triggered_action),
1455 },
1456 )
1458 async with await self.orchestration_client(triggered_action) as orchestration:
1459 response = await self.command(orchestration, work_pool, triggered_action)
1461 self._result_details["status_code"] = response.status_code
1462 if response.status_code >= 300:
1463 raise ActionFailed(self.reason_from_response(response))
1465 @abc.abstractmethod 1a
1466 async def command( 1a
1467 self,
1468 orchestration: "OrchestrationClient",
1469 work_pool: WorkPool,
1470 triggered_action: "TriggeredAction",
1471 ) -> Response:
1472 """Issue the command to the Work Pool"""
1475class PauseWorkPool(WorkPoolCommandAction): 1a
1476 """Pauses a Work Pool"""
1478 type: Literal["pause-work-pool"] = "pause-work-pool" 1a
1480 _action_description: ClassVar[str] = "Pausing work pool" 1a
1482 async def command( 1a
1483 self,
1484 orchestration: "OrchestrationClient",
1485 work_pool: WorkPool,
1486 triggered_action: "TriggeredAction",
1487 ) -> Response:
1488 return await orchestration.pause_work_pool(work_pool.name)
1491class ResumeWorkPool(WorkPoolCommandAction): 1a
1492 """Resumes a Work Pool"""
1494 type: Literal["resume-work-pool"] = "resume-work-pool" 1a
1496 _action_description: ClassVar[str] = "Resuming work pool" 1a
1498 async def command( 1a
1499 self,
1500 orchestration: "OrchestrationClient",
1501 work_pool: WorkPool,
1502 triggered_action: "TriggeredAction",
1503 ) -> Response:
1504 return await orchestration.resume_work_pool(work_pool.name)
1507class WorkQueueAction(Action): 1a
1508 """Base class for Actions that operate on Work Queues and need to infer them from
1509 events"""
1511 source: Literal["selected", "inferred"] = Field( 1a
1512 "selected",
1513 description=(
1514 "Whether this Action applies to a specific selected "
1515 "work queue (given by `work_queue_id`), or to a work queue that is "
1516 "inferred from the triggering event. If the source is 'inferred', "
1517 "the `work_queue_id` may not be set. If the source is 'selected', the "
1518 "`work_queue_id` must be set."
1519 ),
1520 )
1521 work_queue_id: Optional[UUID] = Field( 1a
1522 None, description="The identifier of the work queue to pause"
1523 )
1525 @model_validator(mode="after") 1a
1526 def selected_work_queue_requires_id(self) -> Self: 1a
1527 wants_selected_work_queue = self.source == "selected"
1528 has_work_queue_id = bool(self.work_queue_id)
1529 if wants_selected_work_queue != has_work_queue_id:
1530 raise ValueError(
1531 "work_queue_id is "
1532 + ("not allowed" if has_work_queue_id else "required")
1533 )
1534 return self
1536 async def work_queue_id_to_use(self, triggered_action: "TriggeredAction") -> UUID: 1a
1537 if self.source == "selected":
1538 assert self.work_queue_id
1539 return self.work_queue_id
1541 event = triggered_action.triggering_event
1542 if not event:
1543 raise ActionFailed("No event to infer the work queue")
1545 assert event
1546 if id := _id_of_first_resource_of_kind(event, "prefect.work-queue"):
1547 return id
1549 raise ActionFailed("No work queue could be inferred")
1552class WorkQueueCommandAction(WorkQueueAction, ExternalDataAction): 1a
1553 _action_description: ClassVar[str] 1a
1555 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
1556 work_queue_id = await self.work_queue_id_to_use(triggered_action)
1558 self._resulting_related_resources += [
1559 RelatedResource.model_validate(
1560 {
1561 "prefect.resource.id": f"prefect.work-queue.{work_queue_id}",
1562 "prefect.resource.role": "target",
1563 }
1564 )
1565 ]
1567 logger.info(
1568 self._action_description,
1569 extra={
1570 "work_queue_id": work_queue_id,
1571 **self.logging_context(triggered_action),
1572 },
1573 )
1575 async with await self.orchestration_client(triggered_action) as orchestration:
1576 response = await self.command(
1577 orchestration, work_queue_id, triggered_action
1578 )
1580 self._result_details["status_code"] = response.status_code
1581 if response.status_code >= 300:
1582 raise ActionFailed(self.reason_from_response(response))
1584 @abc.abstractmethod 1a
1585 async def command( 1a
1586 self,
1587 orchestration: "OrchestrationClient",
1588 work_queue_id: UUID,
1589 triggered_action: "TriggeredAction",
1590 ) -> Response:
1591 """Issue the command to the Work Queue"""
1594class PauseWorkQueue(WorkQueueCommandAction): 1a
1595 """Pauses a Work Queue"""
1597 type: Literal["pause-work-queue"] = "pause-work-queue" 1a
1599 _action_description: ClassVar[str] = "Pausing work queue" 1a
1601 async def command( 1a
1602 self,
1603 orchestration: "OrchestrationClient",
1604 work_queue_id: UUID,
1605 triggered_action: "TriggeredAction",
1606 ) -> Response:
1607 return await orchestration.pause_work_queue(work_queue_id)
1610class ResumeWorkQueue(WorkQueueCommandAction): 1a
1611 """Resumes a Work Queue"""
1613 type: Literal["resume-work-queue"] = "resume-work-queue" 1a
1615 _action_description: ClassVar[str] = "Resuming work queue" 1a
1617 async def command( 1a
1618 self,
1619 orchestration: "OrchestrationClient",
1620 work_queue_id: UUID,
1621 triggered_action: "TriggeredAction",
1622 ) -> Response:
1623 return await orchestration.resume_work_queue(work_queue_id)
1626class AutomationAction(Action): 1a
1627 """Base class for Actions that operate on Automations and need to infer them from
1628 events"""
1630 source: Literal["selected", "inferred"] = Field( 1a
1631 "selected",
1632 description=(
1633 "Whether this Action applies to a specific selected "
1634 "automation (given by `automation_id`), or to an automation that is "
1635 "inferred from the triggering event. If the source is 'inferred', "
1636 "the `automation_id` may not be set. If the source is 'selected', the "
1637 "`automation_id` must be set."
1638 ),
1639 )
1640 automation_id: Optional[UUID] = Field( 1a
1641 None, description="The identifier of the automation to act on"
1642 )
1644 @model_validator(mode="after") 1a
1645 def selected_automation_requires_id(self) -> Self: 1a
1646 wants_selected_automation = self.source == "selected"
1647 has_automation_id = bool(self.automation_id)
1648 if wants_selected_automation != has_automation_id:
1649 raise ValueError(
1650 "automation_id is "
1651 + ("not allowed" if has_automation_id else "required")
1652 )
1653 return self
1655 async def automation_id_to_use(self, triggered_action: "TriggeredAction") -> UUID: 1a
1656 if self.source == "selected":
1657 assert self.automation_id
1658 return self.automation_id
1660 event = triggered_action.triggering_event
1661 if not event:
1662 raise ActionFailed("No event to infer the automation")
1664 assert event
1665 if id := _id_of_first_resource_of_kind(event, "prefect.automation"):
1666 return id
1668 raise ActionFailed("No automation could be inferred")
1671class AutomationCommandAction(AutomationAction, ExternalDataAction): 1a
1672 _action_description: ClassVar[str] 1a
1674 async def act(self, triggered_action: "TriggeredAction") -> None: 1a
1675 automation_id = await self.automation_id_to_use(triggered_action)
1677 self._resulting_related_resources += [
1678 RelatedResource.model_validate(
1679 {
1680 "prefect.resource.id": f"prefect.automation.{automation_id}",
1681 "prefect.resource.role": "target",
1682 }
1683 )
1684 ]
1686 logger.info(
1687 self._action_description,
1688 extra={
1689 "automation_id": automation_id,
1690 **self.logging_context(triggered_action),
1691 },
1692 )
1694 async with await self.events_api_client(triggered_action) as events:
1695 response = await self.command(events, automation_id, triggered_action)
1697 self._result_details["status_code"] = response.status_code
1698 if response.status_code >= 300:
1699 raise ActionFailed(self.reason_from_response(response))
1701 @abc.abstractmethod 1a
1702 async def command( 1a
1703 self,
1704 events: PrefectServerEventsAPIClient,
1705 automation_id: UUID,
1706 triggered_action: "TriggeredAction",
1707 ) -> Response:
1708 """Issue the command to the Work Queue"""
1711class PauseAutomation(AutomationCommandAction): 1a
1712 """Pauses a Work Queue"""
1714 type: Literal["pause-automation"] = "pause-automation" 1a
1716 _action_description: ClassVar[str] = "Pausing automation" 1a
1718 async def command( 1a
1719 self,
1720 events: PrefectServerEventsAPIClient,
1721 automation_id: UUID,
1722 triggered_action: "TriggeredAction",
1723 ) -> Response:
1724 return await events.pause_automation(automation_id)
1727class ResumeAutomation(AutomationCommandAction): 1a
1728 """Resumes a Work Queue"""
1730 type: Literal["resume-automation"] = "resume-automation" 1a
1732 _action_description: ClassVar[str] = "Resuming auitomation" 1a
1734 async def command( 1a
1735 self,
1736 events: PrefectServerEventsAPIClient,
1737 automation_id: UUID,
1738 triggered_action: "TriggeredAction",
1739 ) -> Response:
1740 return await events.resume_automation(automation_id)
1743# The actual action types that we support. It's important to update this
1744# Union when adding new subclasses of Action so that they are available for clients
1745# and in the OpenAPI docs
1746ServerActionTypes: TypeAlias = Union[ 1a
1747 DoNothing,
1748 RunDeployment,
1749 PauseDeployment,
1750 ResumeDeployment,
1751 CancelFlowRun,
1752 ChangeFlowRunState,
1753 PauseWorkQueue,
1754 ResumeWorkQueue,
1755 SendNotification,
1756 CallWebhook,
1757 PauseAutomation,
1758 ResumeAutomation,
1759 SuspendFlowRun,
1760 ResumeFlowRun,
1761 PauseWorkPool,
1762 ResumeWorkPool,
1763]
1766_recent_actions: MutableMapping[UUID, bool] = TTLCache(maxsize=10000, ttl=3600) 1a
1769async def record_action_happening(id: UUID) -> None: 1a
1770 """Record that an action has happened, with an expiration of an hour."""
1771 _recent_actions[id] = True
1774async def action_has_already_happened(id: UUID) -> bool: 1a
1775 """Check if the action has already happened"""
1776 return _recent_actions.get(id, False)
1779@asynccontextmanager 1a
1780async def consumer() -> AsyncGenerator[MessageHandler, None]: 1a
1781 from prefect.server.events.schemas.automations import TriggeredAction
1783 async def message_handler(message: Message):
1784 if not message.data:
1785 return
1787 triggered_action = TriggeredAction.model_validate_json(message.data)
1788 action = triggered_action.action
1790 if await action_has_already_happened(triggered_action.id):
1791 logger.info(
1792 "Action %s has already been executed, skipping",
1793 triggered_action.id,
1794 )
1795 return
1797 try:
1798 await action.act(triggered_action)
1799 except ActionFailed as e:
1800 # ActionFailed errors are expected errors and will not be retried
1801 await action.fail(triggered_action, e.reason)
1802 else:
1803 await action.succeed(triggered_action)
1804 await record_action_happening(triggered_action.id)
1806 logger.info("Starting action message handler")
1807 yield message_handler
1810async def _load_block_from_block_document( 1a
1811 block_document: BlockDocument,
1812) -> Block:
1813 if block_document.block_schema is None:
1814 raise ValueError("Unable to determine block schema for provided block document")
1816 block_cls = Block.get_block_class_from_schema(block_document.block_schema)
1818 block = block_cls.model_validate(block_document.data)
1819 block._block_document_id = block_document.id
1820 block.__class__._block_schema_id = block_document.block_schema_id
1821 block.__class__._block_type_id = block_document.block_type_id
1822 block._block_document_name = block_document.name
1823 block._is_anonymous = block_document.is_anonymous
1824 block._define_metadata_on_nested_blocks(block_document.block_document_references)
1826 resources = block._event_method_called_resources()
1827 if resources:
1828 kind = block._event_kind()
1829 resource, related = resources
1830 async with PrefectServerEventsClient() as events_client:
1831 await events_client.emit(
1832 Event(
1833 id=uuid7(),
1834 occurred=now("UTC"),
1835 event=f"{kind}.loaded",
1836 resource=Resource.model_validate(resource),
1837 related=[RelatedResource.model_validate(r) for r in related],
1838 )
1839 )
1841 return block