Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/actions.py: 78%

92 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import abc 1a

2from datetime import timedelta 1a

3from typing import Any, Dict, Optional, Union 1a

4from uuid import UUID 1a

5 

6from pydantic import Field, model_validator 1a

7from typing_extensions import Literal, Self, TypeAlias 1a

8 

9from prefect._internal.schemas.bases import PrefectBaseModel 1a

10from prefect.client.schemas.objects import StateType 1a

11from prefect.types import NonNegativeTimeDelta 1a

12 

13 

14class Action(PrefectBaseModel, abc.ABC): 1a

15 """An Action that may be performed when an Automation is triggered""" 

16 

17 type: str 1a

18 

19 def describe_for_cli(self) -> str: 1a

20 """A human-readable description of the action""" 

21 return self.type.replace("-", " ").capitalize() 

22 

23 

24class DoNothing(Action): 1a

25 """Do nothing when an Automation is triggered""" 

26 

27 type: Literal["do-nothing"] = "do-nothing" 1a

28 

29 

30class DeploymentAction(Action): 1a

31 """Base class for Actions that operate on Deployments and need to infer them from 

32 events""" 

33 

34 source: Literal["selected", "inferred"] = Field( 1a

35 "selected", 

36 description=( 

37 "Whether this Action applies to a specific selected " 

38 "deployment (given by `deployment_id`), or to a deployment that is " 

39 "inferred from the triggering event. If the source is 'inferred', " 

40 "the `deployment_id` may not be set. If the source is 'selected', the " 

41 "`deployment_id` must be set." 

42 ), 

43 ) 

44 deployment_id: Optional[UUID] = Field( 1a

45 None, description="The identifier of the deployment" 

46 ) 

47 

48 @model_validator(mode="after") 1a

49 def selected_deployment_requires_id(self): 1a

50 wants_selected_deployment = self.source == "selected" 

51 has_deployment_id = bool(self.deployment_id) 

52 if wants_selected_deployment != has_deployment_id: 

53 raise ValueError( 

54 "deployment_id is " 

55 + ("not allowed" if has_deployment_id else "required") 

56 ) 

57 return self 

58 

59 

60class RunDeployment(DeploymentAction): 1a

61 """Runs the given deployment with the given parameters""" 

62 

63 type: Literal["run-deployment"] = "run-deployment" 1a

64 

65 parameters: Optional[Dict[str, Any]] = Field( 1a

66 None, 

67 description=( 

68 "The parameters to pass to the deployment, or None to use the " 

69 "deployment's default parameters" 

70 ), 

71 ) 

72 job_variables: Optional[Dict[str, Any]] = Field( 1a

73 None, 

74 description=( 

75 "The job variables to pass to the created flow run, or None " 

76 "to use the deployment's default job variables" 

77 ), 

78 ) 

79 schedule_after: NonNegativeTimeDelta = Field( 1a

80 default_factory=lambda: timedelta(0), 

81 description=( 

82 "The amount of time to wait before running the deployment. " 

83 "Defaults to running the deployment immediately." 

84 ), 

85 ) 

86 

87 

88class PauseDeployment(DeploymentAction): 1a

89 """Pauses the given Deployment""" 

90 

91 type: Literal["pause-deployment"] = "pause-deployment" 1a

92 

93 

94class ResumeDeployment(DeploymentAction): 1a

95 """Resumes the given Deployment""" 

96 

97 type: Literal["resume-deployment"] = "resume-deployment" 1a

98 

99 

100class ChangeFlowRunState(Action): 1a

101 """Changes the state of a flow run associated with the trigger""" 

102 

103 type: Literal["change-flow-run-state"] = "change-flow-run-state" 1a

104 

105 name: Optional[str] = Field( 1a

106 None, 

107 description="The name of the state to change the flow run to", 

108 ) 

109 state: StateType = Field( 1a

110 ..., 

111 description="The type of the state to change the flow run to", 

112 ) 

113 message: Optional[str] = Field( 1a

114 None, 

115 description="An optional message to associate with the state change", 

116 ) 

117 

118 

119class CancelFlowRun(Action): 1a

120 """Cancels a flow run associated with the trigger""" 

121 

122 type: Literal["cancel-flow-run"] = "cancel-flow-run" 1a

123 

124 

125class ResumeFlowRun(Action): 1a

126 """Resumes a flow run associated with the trigger""" 

127 

128 type: Literal["resume-flow-run"] = "resume-flow-run" 1a

129 

130 

131class SuspendFlowRun(Action): 1a

132 """Suspends a flow run associated with the trigger""" 

133 

134 type: Literal["suspend-flow-run"] = "suspend-flow-run" 1a

135 

136 

137class CallWebhook(Action): 1a

138 """Call a webhook when an Automation is triggered.""" 

139 

140 type: Literal["call-webhook"] = "call-webhook" 1a

141 block_document_id: UUID = Field( 1a

142 description="The identifier of the webhook block to use" 

143 ) 

144 payload: str = Field( 1a

145 default="", 

146 description="An optional templatable payload to send when calling the webhook.", 

147 ) 

148 

149 

150class SendNotification(Action): 1a

151 """Send a notification when an Automation is triggered""" 

152 

153 type: Literal["send-notification"] = "send-notification" 1a

154 block_document_id: UUID = Field( 1a

155 description="The identifier of the notification block to use" 

156 ) 

157 subject: str = Field("Prefect automated notification") 1a

158 body: str = Field(description="The text of the notification to send") 1a

159 

160 

161class WorkPoolAction(Action): 1a

162 """Base class for Actions that operate on Work Pools and need to infer them from 

163 events""" 

164 

165 source: Literal["selected", "inferred"] = Field( 1a

166 "selected", 

167 description=( 

168 "Whether this Action applies to a specific selected " 

169 "work pool (given by `work_pool_id`), or to a work pool that is " 

170 "inferred from the triggering event. If the source is 'inferred', " 

171 "the `work_pool_id` may not be set. If the source is 'selected', the " 

172 "`work_pool_id` must be set." 

173 ), 

174 ) 

175 work_pool_id: Optional[UUID] = Field( 1a

176 None, 

177 description="The identifier of the work pool to pause", 

178 ) 

179 

180 

181class PauseWorkPool(WorkPoolAction): 1a

182 """Pauses a Work Pool""" 

183 

184 type: Literal["pause-work-pool"] = "pause-work-pool" 1a

185 

186 

187class ResumeWorkPool(WorkPoolAction): 1a

188 """Resumes a Work Pool""" 

189 

190 type: Literal["resume-work-pool"] = "resume-work-pool" 1a

191 

192 

193class WorkQueueAction(Action): 1a

194 """Base class for Actions that operate on Work Queues and need to infer them from 

195 events""" 

196 

197 source: Literal["selected", "inferred"] = Field( 1a

198 "selected", 

199 description=( 

200 "Whether this Action applies to a specific selected " 

201 "work queue (given by `work_queue_id`), or to a work queue that is " 

202 "inferred from the triggering event. If the source is 'inferred', " 

203 "the `work_queue_id` may not be set. If the source is 'selected', the " 

204 "`work_queue_id` must be set." 

205 ), 

206 ) 

207 work_queue_id: Optional[UUID] = Field( 1a

208 None, description="The identifier of the work queue to pause" 

209 ) 

210 

211 @model_validator(mode="after") 1a

212 def selected_work_queue_requires_id(self) -> Self: 1a

213 wants_selected_work_queue = self.source == "selected" 

214 has_work_queue_id = bool(self.work_queue_id) 

215 if wants_selected_work_queue != has_work_queue_id: 

216 raise ValueError( 

217 "work_queue_id is " 

218 + ("not allowed" if has_work_queue_id else "required") 

219 ) 

220 return self 

221 

222 

223class PauseWorkQueue(WorkQueueAction): 1a

224 """Pauses a Work Queue""" 

225 

226 type: Literal["pause-work-queue"] = "pause-work-queue" 1a

227 

228 

229class ResumeWorkQueue(WorkQueueAction): 1a

230 """Resumes a Work Queue""" 

231 

232 type: Literal["resume-work-queue"] = "resume-work-queue" 1a

233 

234 

235class AutomationAction(Action): 1a

236 """Base class for Actions that operate on Automations and need to infer them from 

237 events""" 

238 

239 source: Literal["selected", "inferred"] = Field( 1a

240 "selected", 

241 description=( 

242 "Whether this Action applies to a specific selected " 

243 "automation (given by `automation_id`), or to an automation that is " 

244 "inferred from the triggering event. If the source is 'inferred', " 

245 "the `automation_id` may not be set. If the source is 'selected', the " 

246 "`automation_id` must be set." 

247 ), 

248 ) 

249 automation_id: Optional[UUID] = Field( 1a

250 None, description="The identifier of the automation to act on" 

251 ) 

252 

253 @model_validator(mode="after") 1a

254 def selected_automation_requires_id(self) -> Self: 1a

255 wants_selected_automation = self.source == "selected" 

256 has_automation_id = bool(self.automation_id) 

257 if wants_selected_automation != has_automation_id: 

258 raise ValueError( 

259 "automation_id is " 

260 + ("not allowed" if has_automation_id else "required") 

261 ) 

262 return self 

263 

264 

265class PauseAutomation(AutomationAction): 1a

266 """Pauses a Work Queue""" 

267 

268 type: Literal["pause-automation"] = "pause-automation" 1a

269 

270 

271class ResumeAutomation(AutomationAction): 1a

272 """Resumes a Work Queue""" 

273 

274 type: Literal["resume-automation"] = "resume-automation" 1a

275 

276 

277class DeclareIncident(Action): 1a

278 """Declares an incident for the triggering event. Only available on Prefect Cloud""" 

279 

280 type: Literal["declare-incident"] = "declare-incident" 1a

281 

282 

283# The actual action types that we support. It's important to update this 

284# Union when adding new subclasses of Action so that they are available for clients 

285# and in the OpenAPI docs 

286ActionTypes: TypeAlias = Union[ 1a

287 DoNothing, 

288 RunDeployment, 

289 PauseDeployment, 

290 ResumeDeployment, 

291 CancelFlowRun, 

292 ChangeFlowRunState, 

293 PauseWorkQueue, 

294 ResumeWorkQueue, 

295 SendNotification, 

296 CallWebhook, 

297 PauseAutomation, 

298 ResumeAutomation, 

299 SuspendFlowRun, 

300 PauseWorkPool, 

301 ResumeWorkPool, 

302 # Prefect Cloud only 

303 DeclareIncident, 

304]