Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/actions.py: 78%
92 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import abc 1a
2from datetime import timedelta 1a
3from typing import Any, Dict, Optional, Union 1a
4from uuid import UUID 1a
6from pydantic import Field, model_validator 1a
7from typing_extensions import Literal, Self, TypeAlias 1a
9from prefect._internal.schemas.bases import PrefectBaseModel 1a
10from prefect.client.schemas.objects import StateType 1a
11from prefect.types import NonNegativeTimeDelta 1a
14class Action(PrefectBaseModel, abc.ABC): 1a
15 """An Action that may be performed when an Automation is triggered"""
17 type: str 1a
19 def describe_for_cli(self) -> str: 1a
20 """A human-readable description of the action"""
21 return self.type.replace("-", " ").capitalize()
24class DoNothing(Action): 1a
25 """Do nothing when an Automation is triggered"""
27 type: Literal["do-nothing"] = "do-nothing" 1a
30class DeploymentAction(Action): 1a
31 """Base class for Actions that operate on Deployments and need to infer them from
32 events"""
34 source: Literal["selected", "inferred"] = Field( 1a
35 "selected",
36 description=(
37 "Whether this Action applies to a specific selected "
38 "deployment (given by `deployment_id`), or to a deployment that is "
39 "inferred from the triggering event. If the source is 'inferred', "
40 "the `deployment_id` may not be set. If the source is 'selected', the "
41 "`deployment_id` must be set."
42 ),
43 )
44 deployment_id: Optional[UUID] = Field( 1a
45 None, description="The identifier of the deployment"
46 )
48 @model_validator(mode="after") 1a
49 def selected_deployment_requires_id(self): 1a
50 wants_selected_deployment = self.source == "selected"
51 has_deployment_id = bool(self.deployment_id)
52 if wants_selected_deployment != has_deployment_id:
53 raise ValueError(
54 "deployment_id is "
55 + ("not allowed" if has_deployment_id else "required")
56 )
57 return self
60class RunDeployment(DeploymentAction): 1a
61 """Runs the given deployment with the given parameters"""
63 type: Literal["run-deployment"] = "run-deployment" 1a
65 parameters: Optional[Dict[str, Any]] = Field( 1a
66 None,
67 description=(
68 "The parameters to pass to the deployment, or None to use the "
69 "deployment's default parameters"
70 ),
71 )
72 job_variables: Optional[Dict[str, Any]] = Field( 1a
73 None,
74 description=(
75 "The job variables to pass to the created flow run, or None "
76 "to use the deployment's default job variables"
77 ),
78 )
79 schedule_after: NonNegativeTimeDelta = Field( 1a
80 default_factory=lambda: timedelta(0),
81 description=(
82 "The amount of time to wait before running the deployment. "
83 "Defaults to running the deployment immediately."
84 ),
85 )
88class PauseDeployment(DeploymentAction): 1a
89 """Pauses the given Deployment"""
91 type: Literal["pause-deployment"] = "pause-deployment" 1a
94class ResumeDeployment(DeploymentAction): 1a
95 """Resumes the given Deployment"""
97 type: Literal["resume-deployment"] = "resume-deployment" 1a
100class ChangeFlowRunState(Action): 1a
101 """Changes the state of a flow run associated with the trigger"""
103 type: Literal["change-flow-run-state"] = "change-flow-run-state" 1a
105 name: Optional[str] = Field( 1a
106 None,
107 description="The name of the state to change the flow run to",
108 )
109 state: StateType = Field( 1a
110 ...,
111 description="The type of the state to change the flow run to",
112 )
113 message: Optional[str] = Field( 1a
114 None,
115 description="An optional message to associate with the state change",
116 )
119class CancelFlowRun(Action): 1a
120 """Cancels a flow run associated with the trigger"""
122 type: Literal["cancel-flow-run"] = "cancel-flow-run" 1a
125class ResumeFlowRun(Action): 1a
126 """Resumes a flow run associated with the trigger"""
128 type: Literal["resume-flow-run"] = "resume-flow-run" 1a
131class SuspendFlowRun(Action): 1a
132 """Suspends a flow run associated with the trigger"""
134 type: Literal["suspend-flow-run"] = "suspend-flow-run" 1a
137class CallWebhook(Action): 1a
138 """Call a webhook when an Automation is triggered."""
140 type: Literal["call-webhook"] = "call-webhook" 1a
141 block_document_id: UUID = Field( 1a
142 description="The identifier of the webhook block to use"
143 )
144 payload: str = Field( 1a
145 default="",
146 description="An optional templatable payload to send when calling the webhook.",
147 )
150class SendNotification(Action): 1a
151 """Send a notification when an Automation is triggered"""
153 type: Literal["send-notification"] = "send-notification" 1a
154 block_document_id: UUID = Field( 1a
155 description="The identifier of the notification block to use"
156 )
157 subject: str = Field("Prefect automated notification") 1a
158 body: str = Field(description="The text of the notification to send") 1a
161class WorkPoolAction(Action): 1a
162 """Base class for Actions that operate on Work Pools and need to infer them from
163 events"""
165 source: Literal["selected", "inferred"] = Field( 1a
166 "selected",
167 description=(
168 "Whether this Action applies to a specific selected "
169 "work pool (given by `work_pool_id`), or to a work pool that is "
170 "inferred from the triggering event. If the source is 'inferred', "
171 "the `work_pool_id` may not be set. If the source is 'selected', the "
172 "`work_pool_id` must be set."
173 ),
174 )
175 work_pool_id: Optional[UUID] = Field( 1a
176 None,
177 description="The identifier of the work pool to pause",
178 )
181class PauseWorkPool(WorkPoolAction): 1a
182 """Pauses a Work Pool"""
184 type: Literal["pause-work-pool"] = "pause-work-pool" 1a
187class ResumeWorkPool(WorkPoolAction): 1a
188 """Resumes a Work Pool"""
190 type: Literal["resume-work-pool"] = "resume-work-pool" 1a
193class WorkQueueAction(Action): 1a
194 """Base class for Actions that operate on Work Queues and need to infer them from
195 events"""
197 source: Literal["selected", "inferred"] = Field( 1a
198 "selected",
199 description=(
200 "Whether this Action applies to a specific selected "
201 "work queue (given by `work_queue_id`), or to a work queue that is "
202 "inferred from the triggering event. If the source is 'inferred', "
203 "the `work_queue_id` may not be set. If the source is 'selected', the "
204 "`work_queue_id` must be set."
205 ),
206 )
207 work_queue_id: Optional[UUID] = Field( 1a
208 None, description="The identifier of the work queue to pause"
209 )
211 @model_validator(mode="after") 1a
212 def selected_work_queue_requires_id(self) -> Self: 1a
213 wants_selected_work_queue = self.source == "selected"
214 has_work_queue_id = bool(self.work_queue_id)
215 if wants_selected_work_queue != has_work_queue_id:
216 raise ValueError(
217 "work_queue_id is "
218 + ("not allowed" if has_work_queue_id else "required")
219 )
220 return self
223class PauseWorkQueue(WorkQueueAction): 1a
224 """Pauses a Work Queue"""
226 type: Literal["pause-work-queue"] = "pause-work-queue" 1a
229class ResumeWorkQueue(WorkQueueAction): 1a
230 """Resumes a Work Queue"""
232 type: Literal["resume-work-queue"] = "resume-work-queue" 1a
235class AutomationAction(Action): 1a
236 """Base class for Actions that operate on Automations and need to infer them from
237 events"""
239 source: Literal["selected", "inferred"] = Field( 1a
240 "selected",
241 description=(
242 "Whether this Action applies to a specific selected "
243 "automation (given by `automation_id`), or to an automation that is "
244 "inferred from the triggering event. If the source is 'inferred', "
245 "the `automation_id` may not be set. If the source is 'selected', the "
246 "`automation_id` must be set."
247 ),
248 )
249 automation_id: Optional[UUID] = Field( 1a
250 None, description="The identifier of the automation to act on"
251 )
253 @model_validator(mode="after") 1a
254 def selected_automation_requires_id(self) -> Self: 1a
255 wants_selected_automation = self.source == "selected"
256 has_automation_id = bool(self.automation_id)
257 if wants_selected_automation != has_automation_id:
258 raise ValueError(
259 "automation_id is "
260 + ("not allowed" if has_automation_id else "required")
261 )
262 return self
265class PauseAutomation(AutomationAction): 1a
266 """Pauses a Work Queue"""
268 type: Literal["pause-automation"] = "pause-automation" 1a
271class ResumeAutomation(AutomationAction): 1a
272 """Resumes a Work Queue"""
274 type: Literal["resume-automation"] = "resume-automation" 1a
277class DeclareIncident(Action): 1a
278 """Declares an incident for the triggering event. Only available on Prefect Cloud"""
280 type: Literal["declare-incident"] = "declare-incident" 1a
283# The actual action types that we support. It's important to update this
284# Union when adding new subclasses of Action so that they are available for clients
285# and in the OpenAPI docs
286ActionTypes: TypeAlias = Union[ 1a
287 DoNothing,
288 RunDeployment,
289 PauseDeployment,
290 ResumeDeployment,
291 CancelFlowRun,
292 ChangeFlowRunState,
293 PauseWorkQueue,
294 ResumeWorkQueue,
295 SendNotification,
296 CallWebhook,
297 PauseAutomation,
298 ResumeAutomation,
299 SuspendFlowRun,
300 PauseWorkPool,
301 ResumeWorkPool,
302 # Prefect Cloud only
303 DeclareIncident,
304]