Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/actions.py: 24%

704 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2The actions consumer watches for actions that have been triggered by Automations 

3and carries them out. Also includes the various concrete subtypes of Actions 

4""" 

5 

6from __future__ import annotations 1a

7 

8import abc 1a

9import asyncio 1a

10import copy 1a

11from base64 import b64encode 1a

12from contextlib import asynccontextmanager 1a

13from datetime import datetime, timedelta, timezone 1a

14from typing import ( 1a

15 TYPE_CHECKING, 

16 Any, 

17 AsyncGenerator, 

18 Awaitable, 

19 Callable, 

20 ClassVar, 

21 Coroutine, 

22 Dict, 

23 List, 

24 Literal, 

25 MutableMapping, 

26 Optional, 

27 Tuple, 

28 Type, 

29 Union, 

30 cast, 

31) 

32from uuid import UUID 1a

33 

34import jinja2 1a

35import orjson 1a

36from cachetools import TTLCache 1a

37from httpx import Response 1a

38from pydantic import ( 1a

39 Field, 

40 PrivateAttr, 

41 ValidationInfo, 

42 field_validator, 

43 model_validator, 

44) 

45from typing_extensions import Self, TypeAlias 1a

46 

47from prefect._internal.uuid7 import uuid7 1a

48from prefect.blocks.abstract import NotificationBlock, NotificationError 1a

49from prefect.blocks.core import Block 1a

50from prefect.blocks.webhook import Webhook 1a

51from prefect.logging import get_logger 1a

52from prefect.server.events.clients import ( 1a

53 PrefectServerEventsAPIClient, 

54 PrefectServerEventsClient, 

55) 

56from prefect.server.events.schemas.events import Event, RelatedResource, Resource 1a

57from prefect.server.events.schemas.labelling import LabelDiver 1a

58from prefect.server.schemas.actions import DeploymentFlowRunCreate, StateCreate 1a

59from prefect.server.schemas.core import ( 1a

60 BlockDocument, 

61 ConcurrencyLimitV2, 

62 Flow, 

63 TaskRun, 

64 WorkPool, 

65) 

66from prefect.server.schemas.responses import ( 1a

67 DeploymentResponse, 

68 FlowRunResponse, 

69 OrchestrationResult, 

70 StateAcceptDetails, 

71 WorkQueueWithStatus, 

72) 

73from prefect.server.schemas.states import Scheduled, State, StateType, Suspended 1a

74from prefect.server.utilities.http import should_redact_header 1a

75from prefect.server.utilities.messaging import Message, MessageHandler 1a

76from prefect.server.utilities.schemas import PrefectBaseModel 1a

77from prefect.server.utilities.user_templates import ( 1a

78 TemplateSecurityError, 

79 matching_types_in_templates, 

80 maybe_template, 

81 register_user_template_filters, 

82 render_user_template, 

83 validate_user_template, 

84) 

85from prefect.types import DateTime, NonNegativeTimeDelta, StrictVariableValue 1a

86from prefect.types._datetime import now, parse_datetime 1a

87from prefect.utilities.schema_tools.hydration import ( 1a

88 HydrationContext, 

89 HydrationError, 

90 Placeholder, 

91 ValidJinja, 

92 WorkspaceVariable, 

93 hydrate, 

94) 

95from prefect.utilities.text import truncated_to 1a

96 

97if TYPE_CHECKING: # pragma: no cover 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true1a

98 import logging 

99 

100 from prefect.server.api.clients import OrchestrationClient 

101 from prefect.server.events.schemas.automations import TriggeredAction 

102 

103 Parameters: TypeAlias = dict[str, Any | dict[str, Any] | list[Any | dict[str, Any]]] 

104 

105logger: "logging.Logger" = get_logger(__name__) 1a

106 

107 

108class ActionFailed(Exception): 1a

109 def __init__(self, reason: str): 1a

110 self.reason = reason 

111 

112 

113class Action(PrefectBaseModel, abc.ABC): 1a

114 """An Action that may be performed when an Automation is triggered""" 

115 

116 type: str 1a

117 

118 # Captures any additional information about the result of the action we'd like to 

119 # make available in the payload of the executed or failed events 

120 _result_details: Dict[str, Any] = PrivateAttr(default_factory=dict) 1a

121 _resulting_related_resources: List[RelatedResource] = PrivateAttr( 1a

122 default_factory=list 

123 ) 

124 

125 @abc.abstractmethod 1a

126 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

127 """Perform the requested Action""" 

128 

129 async def fail(self, triggered_action: "TriggeredAction", reason: str) -> None: 1a

130 from prefect.server.events.schemas.automations import EventTrigger 

131 

132 automation = triggered_action.automation 

133 action = triggered_action.action 

134 action_index = triggered_action.action_index 

135 

136 automation_resource_id = f"prefect.automation.{automation.id}" 

137 

138 action_details = { 

139 "action_index": action_index, 

140 "action_type": action.type, 

141 "invocation": str(triggered_action.id), 

142 } 

143 resource = Resource( 

144 { 

145 "prefect.resource.id": automation_resource_id, 

146 "prefect.resource.name": automation.name, 

147 "prefect.trigger-type": automation.trigger.type, 

148 } 

149 ) 

150 if isinstance(automation.trigger, EventTrigger): 

151 resource["prefect.posture"] = automation.trigger.posture 

152 

153 logger.warning( 

154 "Action failed: %r", 

155 reason, 

156 extra={**self.logging_context(triggered_action)}, 

157 ) 

158 

159 async with PrefectServerEventsClient() as events: 

160 triggered_event_id = uuid7() 

161 # Link to the triggering event if available and recent to establish causal chain. 

162 # Only set follows if timing is tight (within 5 minutes) to avoid unnecessary 

163 # waiting at CausalOrdering when events arrive >15 min after their follows event. 

164 follows_id = None 

165 if ( 

166 triggered_action.triggering_event 

167 and triggered_action.triggering_event.occurred 

168 ): 

169 time_since_trigger = ( 

170 triggered_action.triggered 

171 - triggered_action.triggering_event.occurred 

172 ) 

173 TIGHT_TIMING = timedelta(minutes=5) 

174 if abs(time_since_trigger) < TIGHT_TIMING: 

175 follows_id = triggered_action.triggering_event.id 

176 

177 # Build related resources including triggering event reference 

178 related_resources = list(self._resulting_related_resources) 

179 if triggered_action.triggering_event: 

180 related_resources.append( 

181 RelatedResource( 

182 { 

183 "prefect.resource.id": f"prefect.event.{triggered_action.triggering_event.id}", 

184 "prefect.resource.role": "triggering-event", 

185 } 

186 ) 

187 ) 

188 await events.emit( 

189 Event( 

190 occurred=triggered_action.triggered, 

191 event="prefect.automation.action.triggered", 

192 resource=resource, 

193 related=related_resources, 

194 payload=action_details, 

195 id=triggered_event_id, 

196 follows=follows_id, 

197 ) 

198 ) 

199 await events.emit( 

200 Event( 

201 occurred=now("UTC"), 

202 event="prefect.automation.action.failed", 

203 resource=resource, 

204 related=self._resulting_related_resources, 

205 payload={ 

206 **action_details, 

207 "reason": reason, 

208 **self._result_details, 

209 }, 

210 follows=triggered_event_id, 

211 id=uuid7(), 

212 ) 

213 ) 

214 

215 async def succeed(self, triggered_action: "TriggeredAction") -> None: 1a

216 from prefect.server.events.schemas.automations import EventTrigger 

217 

218 automation = triggered_action.automation 

219 action = triggered_action.action 

220 action_index = triggered_action.action_index 

221 

222 automation_resource_id = f"prefect.automation.{automation.id}" 

223 

224 action_details = { 

225 "action_index": action_index, 

226 "action_type": action.type, 

227 "invocation": str(triggered_action.id), 

228 } 

229 resource = Resource( 

230 { 

231 "prefect.resource.id": automation_resource_id, 

232 "prefect.resource.name": automation.name, 

233 "prefect.trigger-type": automation.trigger.type, 

234 } 

235 ) 

236 if isinstance(automation.trigger, EventTrigger): 

237 resource["prefect.posture"] = automation.trigger.posture 

238 

239 async with PrefectServerEventsClient() as events: 

240 triggered_event_id = uuid7() 

241 # Link to the triggering event if available and recent to establish causal chain. 

242 # Only set follows if timing is tight (within 5 minutes) to avoid unnecessary 

243 # waiting at CausalOrdering when events arrive >15 min after their follows event. 

244 follows_id = None 

245 if ( 

246 triggered_action.triggering_event 

247 and triggered_action.triggering_event.occurred 

248 ): 

249 time_since_trigger = ( 

250 triggered_action.triggered 

251 - triggered_action.triggering_event.occurred 

252 ) 

253 TIGHT_TIMING = timedelta(minutes=5) 

254 if abs(time_since_trigger) < TIGHT_TIMING: 

255 follows_id = triggered_action.triggering_event.id 

256 

257 # Build related resources including triggering event reference 

258 related_resources = list(self._resulting_related_resources) 

259 if triggered_action.triggering_event: 

260 related_resources.append( 

261 RelatedResource( 

262 { 

263 "prefect.resource.id": f"prefect.event.{triggered_action.triggering_event.id}", 

264 "prefect.resource.role": "triggering-event", 

265 } 

266 ) 

267 ) 

268 await events.emit( 

269 Event( 

270 occurred=triggered_action.triggered, 

271 event="prefect.automation.action.triggered", 

272 resource=Resource( 

273 { 

274 "prefect.resource.id": automation_resource_id, 

275 "prefect.resource.name": automation.name, 

276 "prefect.trigger-type": automation.trigger.type, 

277 } 

278 ), 

279 related=related_resources, 

280 payload=action_details, 

281 id=triggered_event_id, 

282 follows=follows_id, 

283 ) 

284 ) 

285 await events.emit( 

286 Event( 

287 occurred=now("UTC"), 

288 event="prefect.automation.action.executed", 

289 resource=Resource( 

290 { 

291 "prefect.resource.id": automation_resource_id, 

292 "prefect.resource.name": automation.name, 

293 "prefect.trigger-type": automation.trigger.type, 

294 } 

295 ), 

296 related=self._resulting_related_resources, 

297 payload={ 

298 **action_details, 

299 **self._result_details, 

300 }, 

301 id=uuid7(), 

302 follows=triggered_event_id, 

303 ) 

304 ) 

305 

306 def logging_context(self, triggered_action: "TriggeredAction") -> Dict[str, Any]: 1a

307 """Common logging context for all actions""" 

308 return { 

309 "automation": str(triggered_action.automation.id), 

310 "action": self.model_dump(mode="json"), 

311 "triggering_event": ( 

312 { 

313 "id": triggered_action.triggering_event.id, 

314 "event": triggered_action.triggering_event.event, 

315 } 

316 if triggered_action.triggering_event 

317 else None 

318 ), 

319 "triggering_labels": triggered_action.triggering_labels, 

320 } 

321 

322 

323class DoNothing(Action): 1a

324 """Do nothing when an Automation is triggered""" 

325 

326 type: Literal["do-nothing"] = "do-nothing" 1a

327 

328 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

329 logger.info( 

330 "Doing nothing", 

331 extra={**self.logging_context(triggered_action)}, 

332 ) 

333 

334 

335class EmitEventAction(Action): 1a

336 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

337 event = await self.create_event(triggered_action) 

338 

339 self._result_details["emitted_event"] = str(event.id) 

340 

341 async with PrefectServerEventsClient() as events: 

342 await events.emit(event) 

343 

344 @abc.abstractmethod 1a

345 async def create_event(self, triggered_action: "TriggeredAction") -> "Event": 1a

346 """Create an event from the TriggeredAction""" 

347 

348 

349class ExternalDataAction(Action): 1a

350 """Base class for Actions that require data from an external source such as 

351 the Orchestration API""" 

352 

353 async def orchestration_client( 1a

354 self, triggered_action: "TriggeredAction" 

355 ) -> "OrchestrationClient": 

356 from prefect.server.api.clients import OrchestrationClient 

357 

358 return OrchestrationClient( 

359 additional_headers={ 

360 "Prefect-Automation-ID": str(triggered_action.automation.id), 

361 "Prefect-Automation-Name": ( 

362 b64encode(triggered_action.automation.name.encode()).decode() 

363 ), 

364 }, 

365 ) 

366 

367 async def events_api_client( 1a

368 self, triggered_action: "TriggeredAction" 

369 ) -> PrefectServerEventsAPIClient: 

370 return PrefectServerEventsAPIClient( 

371 additional_headers={ 

372 "Prefect-Automation-ID": str(triggered_action.automation.id), 

373 "Prefect-Automation-Name": ( 

374 b64encode(triggered_action.automation.name.encode()).decode() 

375 ), 

376 }, 

377 ) 

378 

379 def reason_from_response(self, response: Response) -> str: 1a

380 error_detail = None 

381 if response.status_code in {409, 422}: 

382 try: 

383 error_detail = response.json().get("detail") 

384 except Exception: 

385 pass 

386 

387 if response.status_code == 422 or error_detail: 

388 return f"Validation error occurred for {self.type!r}" + ( 

389 f" - {error_detail}" if error_detail else "" 

390 ) 

391 else: 

392 return f"Conflict (409) occurred for {self.type!r} - {error_detail or response.text!r}" 

393 else: 

394 return ( 

395 f"Unexpected status from {self.type!r} action: {response.status_code}" 

396 ) 

397 

398 

399def _first_resource_of_kind(event: "Event", expected_kind: str) -> Optional["Resource"]: 1a

400 for resource in event.involved_resources: 

401 kind, _, _ = resource.id.rpartition(".") 

402 if kind == expected_kind: 

403 return resource 

404 

405 return None 

406 

407 

408def _kind_and_id_from_resource( 1a

409 resource: Resource, 

410) -> tuple[str, UUID] | tuple[None, None]: 

411 kind, _, id = resource.id.rpartition(".") 

412 

413 try: 

414 return kind, UUID(id) 

415 except ValueError: 

416 pass 

417 

418 return None, None 

419 

420 

421def _id_from_resource_id(resource_id: str, expected_kind: str) -> Optional[UUID]: 1a

422 kind, _, id = resource_id.rpartition(".") 

423 if kind == expected_kind: 

424 try: 

425 return UUID(id) 

426 except ValueError: 

427 pass 

428 return None 

429 

430 

431def _id_of_first_resource_of_kind(event: "Event", expected_kind: str) -> Optional[UUID]: 1a

432 resource = _first_resource_of_kind(event, expected_kind) 

433 if resource: 

434 if id := _id_from_resource_id(resource.id, expected_kind): 

435 return id 

436 return None 

437 

438 

439WorkspaceVariables: TypeAlias = Dict[str, StrictVariableValue] 1a

440TemplateContextObject: TypeAlias = Union[PrefectBaseModel, WorkspaceVariables, None] 1a

441 

442 

443class JinjaTemplateAction(ExternalDataAction): 1a

444 """Base class for Actions that use Jinja templates supplied by the user and 

445 are rendered with a context containing data from the triggered action, 

446 and the orchestration API.""" 

447 

448 _object_cache: Dict[str, TemplateContextObject] = PrivateAttr(default_factory=dict) 1a

449 

450 _registered_filters: ClassVar[bool] = False 1a

451 

452 @classmethod 1a

453 def _register_filters_if_needed(cls) -> None: 1a

454 if not cls._registered_filters: 

455 # Register our event-related filters 

456 from prefect.server.events.jinja_filters import all_filters 

457 

458 register_user_template_filters(all_filters) 

459 cls._registered_filters = True 

460 

461 @classmethod 1a

462 def validate_template(cls, template: str, field_name: str) -> str: 1a

463 cls._register_filters_if_needed() 

464 

465 try: 

466 validate_user_template(template) 

467 except (jinja2.exceptions.TemplateSyntaxError, TemplateSecurityError) as exc: 

468 raise ValueError(f"{field_name!r} is not a valid template: {exc}") 

469 

470 return template 

471 

472 @classmethod 1a

473 def templates_in_dictionary( 1a

474 cls, dict_: dict[Any, Any | dict[Any, Any]] 

475 ) -> list[tuple[dict[Any, Any], dict[Any, str]]]: 

476 to_traverse: list[dict[Any, Any]] = [] 

477 templates_at_layer: dict[Any, str] = {} 

478 for key, value in dict_.items(): 

479 if isinstance(value, str) and maybe_template(value): 

480 templates_at_layer[key] = value 

481 elif isinstance(value, dict): 

482 to_traverse.append(value) 

483 

484 templates: list[tuple[dict[Any, Any], dict[Any, str]]] = [] 

485 

486 if templates_at_layer: 

487 templates.append((dict_, templates_at_layer)) 

488 

489 for item in to_traverse: 

490 templates += cls.templates_in_dictionary(item) 

491 

492 return templates 

493 

494 def instantiate_object( 1a

495 self, 

496 model: Type[PrefectBaseModel], 

497 data: Dict[str, Any], 

498 triggered_action: "TriggeredAction", 

499 resource: Optional["Resource"] = None, 

500 ) -> PrefectBaseModel: 

501 object = model.model_validate(data) 

502 

503 if isinstance(object, FlowRunResponse) or isinstance(object, TaskRun): 

504 # The flow/task run was fetched from the API, but between when its 

505 # state changed and now it's possible that the state in the API has 

506 # changed again from what's contained in the event. Use the event's 

507 # data to rebuild the state object and attach it to the object 

508 # received from the API. 

509 # https://github.com/PrefectHQ/nebula/issues/3310 

510 state_fields = [ 

511 "prefect.state-message", 

512 "prefect.state-name", 

513 "prefect.state-timestamp", 

514 "prefect.state-type", 

515 ] 

516 

517 if resource and all(field in resource for field in state_fields): 

518 try: 

519 timestamp = parse_datetime(resource["prefect.state-timestamp"]) 

520 if TYPE_CHECKING: 

521 assert isinstance(timestamp, DateTime) 

522 object.state = State( 

523 message=resource["prefect.state-message"], 

524 name=resource["prefect.state-name"], 

525 timestamp=timestamp, 

526 type=StateType(resource["prefect.state-type"]), 

527 ) 

528 except Exception: 

529 logger.exception( 

530 "Failed to parse state from event resource", 

531 extra={ 

532 **self.logging_context(triggered_action), 

533 }, 

534 ) 

535 

536 return object 

537 

538 async def _get_object_from_prefect_api( 1a

539 self, 

540 orchestration_client: "OrchestrationClient", 

541 triggered_action: "TriggeredAction", 

542 resource: Optional["Resource"], 

543 ) -> Optional[PrefectBaseModel]: 

544 if not resource: 

545 return None 

546 

547 kind, obj_id = _kind_and_id_from_resource(resource) 

548 

549 if not obj_id: 

550 return None 

551 

552 kind_to_model_and_methods: Dict[ 

553 str, 

554 Tuple[ 

555 Type[PrefectBaseModel], 

556 List[Callable[..., Coroutine[Any, Any, Response]]], 

557 ], 

558 ] = { 

559 "prefect.deployment": ( 

560 DeploymentResponse, 

561 [orchestration_client.read_deployment_raw], 

562 ), 

563 "prefect.flow": (Flow, [orchestration_client.read_flow_raw]), 

564 "prefect.flow-run": ( 

565 FlowRunResponse, 

566 [orchestration_client.read_flow_run_raw], 

567 ), 

568 "prefect.task-run": (TaskRun, [orchestration_client.read_task_run_raw]), 

569 "prefect.work-pool": ( 

570 WorkPool, 

571 [orchestration_client.read_work_pool_raw], 

572 ), 

573 "prefect.work-queue": ( 

574 WorkQueueWithStatus, 

575 [ 

576 orchestration_client.read_work_queue_raw, 

577 orchestration_client.read_work_queue_status_raw, 

578 ], 

579 ), 

580 "prefect.concurrency-limit": ( 

581 ConcurrencyLimitV2, 

582 [orchestration_client.read_concurrency_limit_v2_raw], 

583 ), 

584 } 

585 

586 if kind not in kind_to_model_and_methods: 

587 return None 

588 

589 model, client_methods = kind_to_model_and_methods[kind] 

590 

591 responses = await asyncio.gather( 

592 *[client_method(obj_id) for client_method in client_methods] 

593 ) 

594 

595 if any(response.status_code >= 300 for response in responses): 

596 return None 

597 

598 combined_response: dict[Any, Any] = {} 

599 for response in responses: 

600 data: Any | list[Any] = response.json() 

601 

602 # Sometimes we have to call filter endpoints that return a list of 0..1 

603 if isinstance(data, list): 

604 if len(data) == 0: 

605 return None 

606 data = data[0] 

607 

608 combined_response.update(data) 

609 

610 return self.instantiate_object( 

611 model, combined_response, triggered_action, resource=resource 

612 ) 

613 

614 async def _relevant_native_objects( 1a

615 self, templates: List[str], triggered_action: "TriggeredAction" 

616 ) -> Dict[str, TemplateContextObject]: 

617 if not triggered_action.triggering_event: 

618 return {} 

619 

620 orchestration_types = { 

621 "deployment", 

622 "flow", 

623 "flow_run", 

624 "task_run", 

625 "work_pool", 

626 "work_queue", 

627 "concurrency_limit", 

628 } 

629 special_types = {"variables"} 

630 

631 types = matching_types_in_templates( 

632 templates, types=orchestration_types | special_types 

633 ) 

634 if not types: 

635 return {} 

636 

637 needed_types = list(set(types) - set(self._object_cache.keys())) 

638 

639 async with await self.orchestration_client(triggered_action) as orchestration: 

640 calls: List[Awaitable[TemplateContextObject]] = [] 

641 for type_ in needed_types: 

642 if type_ in orchestration_types: 

643 calls.append( 

644 self._get_object_from_prefect_api( 

645 orchestration, 

646 triggered_action, 

647 _first_resource_of_kind( 

648 triggered_action.triggering_event, 

649 f"prefect.{type_.replace('_', '-')}", 

650 ), 

651 ) 

652 ) 

653 elif type_ == "variables": 

654 calls.append(orchestration.read_workspace_variables()) 

655 

656 objects = await asyncio.gather(*calls) 

657 

658 self._object_cache.update(dict(zip(needed_types, objects))) 

659 

660 return self._object_cache 

661 

662 async def _template_context( 1a

663 self, templates: List[str], triggered_action: "TriggeredAction" 

664 ) -> dict[str, Any]: 

665 context: dict[str, Any] = { 

666 "automation": triggered_action.automation, 

667 "event": triggered_action.triggering_event, 

668 "labels": LabelDiver(triggered_action.triggering_labels), 

669 "firing": triggered_action.firing, 

670 "firings": triggered_action.all_firings(), 

671 "events": triggered_action.all_events(), 

672 } 

673 context.update(await self._relevant_native_objects(templates, triggered_action)) 

674 return context 

675 

676 async def _render( 1a

677 self, templates: List[str], triggered_action: "TriggeredAction" 

678 ) -> List[str]: 

679 self._register_filters_if_needed() 

680 

681 context = await self._template_context(templates, triggered_action) 

682 

683 return await asyncio.gather( 

684 *[render_user_template(template, context) for template in templates] 

685 ) 

686 

687 

688class DeploymentAction(Action): 1a

689 """Base class for Actions that operate on Deployments and need to infer them from 

690 events""" 

691 

692 source: Literal["selected", "inferred"] = Field( 1a

693 "selected", 

694 description=( 

695 "Whether this Action applies to a specific selected " 

696 "deployment (given by `deployment_id`), or to a deployment that is " 

697 "inferred from the triggering event. If the source is 'inferred', " 

698 "the `deployment_id` may not be set. If the source is 'selected', the " 

699 "`deployment_id` must be set." 

700 ), 

701 ) 

702 deployment_id: Optional[UUID] = Field( 1a

703 None, description="The identifier of the deployment" 

704 ) 

705 

706 @model_validator(mode="after") 1a

707 def selected_deployment_requires_id(self) -> Self: 1a

708 wants_selected_deployment = self.source == "selected" 

709 has_deployment_id = bool(self.deployment_id) 

710 if wants_selected_deployment != has_deployment_id: 

711 raise ValueError( 

712 "deployment_id is " 

713 + ("not allowed" if has_deployment_id else "required") 

714 ) 

715 return self 

716 

717 async def deployment_id_to_use(self, triggered_action: "TriggeredAction") -> UUID: 1a

718 if self.source == "selected": 

719 assert self.deployment_id 

720 return self.deployment_id 

721 

722 event = triggered_action.triggering_event 

723 if not event: 

724 raise ActionFailed("No event to infer the deployment") 

725 

726 assert event 

727 if id := _id_of_first_resource_of_kind(event, "prefect.deployment"): 

728 return id 

729 

730 raise ActionFailed("No deployment could be inferred") 

731 

732 

733class DeploymentCommandAction(DeploymentAction, ExternalDataAction): 1a

734 """Executes a command against a matching deployment""" 

735 

736 _action_description: ClassVar[str] 1a

737 

738 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

739 deployment_id = await self.deployment_id_to_use(triggered_action) 

740 

741 self._resulting_related_resources.append( 

742 RelatedResource.model_validate( 

743 { 

744 "prefect.resource.id": f"prefect.deployment.{deployment_id}", 

745 "prefect.resource.role": "target", 

746 } 

747 ) 

748 ) 

749 

750 logger.info( 

751 self._action_description, 

752 extra={ 

753 "deployment_id": deployment_id, 

754 **self.logging_context(triggered_action), 

755 }, 

756 ) 

757 

758 async with await self.orchestration_client(triggered_action) as orchestration: 

759 response = await self.command( 

760 orchestration, deployment_id, triggered_action 

761 ) 

762 

763 self._result_details["status_code"] = response.status_code 

764 if response.status_code >= 300: 

765 raise ActionFailed(self.reason_from_response(response)) 

766 

767 @abc.abstractmethod 1a

768 async def command( 1a

769 self, 

770 orchestration: "OrchestrationClient", 

771 deployment_id: UUID, 

772 triggered_action: "TriggeredAction", 

773 ) -> Response: 

774 """Execute the deployment command""" 

775 

776 

777class RunDeployment(JinjaTemplateAction, DeploymentCommandAction): 1a

778 """Runs the given deployment with the given parameters""" 

779 

780 type: Literal["run-deployment"] = "run-deployment" 1a

781 

782 parameters: Optional[Dict[str, Any]] = Field( 1a

783 None, 

784 description=( 

785 "The parameters to pass to the deployment, or None to use the " 

786 "deployment's default parameters" 

787 ), 

788 ) 

789 job_variables: Optional[Dict[str, Any]] = Field( 1a

790 None, 

791 description=( 

792 "The job variables to pass to the created flow run, or None " 

793 "to use the deployment's default job variables" 

794 ), 

795 ) 

796 schedule_after: NonNegativeTimeDelta = Field( 1a

797 default_factory=lambda: timedelta(0), 

798 description=( 

799 "The amount of time to wait before running the deployment. " 

800 "Defaults to running the deployment immediately." 

801 ), 

802 ) 

803 

804 _action_description: ClassVar[str] = "Running deployment" 1a

805 

806 async def command( 1a

807 self, 

808 orchestration: "OrchestrationClient", 

809 deployment_id: UUID, 

810 triggered_action: "TriggeredAction", 

811 ) -> Response: 

812 # Calculate when to schedule the deployment 

813 scheduled_time = datetime.now(timezone.utc) + self.schedule_after 

814 state = Scheduled(scheduled_time=scheduled_time) 

815 

816 try: 

817 flow_run_create = DeploymentFlowRunCreate( # type: ignore 

818 state=StateCreate( 

819 type=state.type, 

820 name=state.name, 

821 message=state.message, 

822 state_details=state.state_details, 

823 ), 

824 parameters=await self.render_parameters(triggered_action), 

825 idempotency_key=triggered_action.idempotency_key(), 

826 job_variables=self.job_variables, 

827 ) 

828 except Exception as exc: 

829 raise ActionFailed(f"Unable to create flow run from deployment: {exc!r}") 

830 

831 response = await orchestration.create_flow_run(deployment_id, flow_run_create) 

832 

833 if response.status_code < 300: 

834 flow_run = FlowRunResponse.model_validate(response.json()) 

835 

836 self._resulting_related_resources.append( 

837 RelatedResource.model_validate( 

838 { 

839 "prefect.resource.id": f"prefect.flow-run.{flow_run.id}", 

840 "prefect.resource.role": "flow-run", 

841 "prefect.resource.name": flow_run.name, 

842 } 

843 ) 

844 ) 

845 

846 logger.info( 

847 "Started flow run", 

848 extra={ 

849 "flow_run": { 

850 "id": str(flow_run.id), 

851 "name": flow_run.name, 

852 }, 

853 **self.logging_context(triggered_action), 

854 }, 

855 ) 

856 

857 if response.status_code == 409: 

858 self._result_details["validation_error"] = response.json().get("detail") 

859 

860 return response 

861 

862 @field_validator("parameters") 1a

863 def validate_parameters(cls, value: dict[str, Any] | None) -> dict[str, Any] | None: 1a

864 if not value: 

865 return value 

866 

867 for_testing = copy.deepcopy(value) or {} 

868 cls._upgrade_v1_templates(for_testing) 

869 

870 problems = cls._collect_errors( 

871 hydrate( 

872 for_testing, 

873 HydrationContext( 

874 raise_on_error=False, 

875 render_workspace_variables=False, 

876 render_jinja=False, 

877 ), 

878 ) 

879 ) 

880 if not problems: 

881 return value 

882 

883 raise ValueError( 

884 "Invalid parameters: \n" 

885 + "\n ".join( 

886 f"{k + ':' if k else ''} {e.message}" for k, e in problems.items() 

887 ) 

888 ) 

889 

890 @classmethod 1a

891 def _collect_errors( 1a

892 cls, 

893 hydrated: Union[dict[str, Any | dict[str, Any] | list[Any]], Placeholder], 

894 prefix: str = "", 

895 ) -> dict[str, HydrationError]: 

896 problems: dict[str, HydrationError] = {} 

897 

898 if isinstance(hydrated, HydrationError): 

899 problems[prefix] = hydrated 

900 

901 if isinstance(hydrated, Placeholder): 

902 return problems 

903 

904 for key, value in hydrated.items(): 

905 if isinstance(value, dict): 

906 problems.update(cls._collect_errors(value, f"{prefix}{key}.")) 

907 elif isinstance(value, list): 

908 for item, index in enumerate(value): 

909 if isinstance(item, dict): 

910 problems.update( 

911 cls._collect_errors(item, f"{prefix}{key}[{index}].") 

912 ) 

913 elif isinstance(item, HydrationError): 

914 problems[f"{prefix}{key}[{index}]"] = item 

915 elif isinstance(value, HydrationError): 

916 problems[f"{prefix}{key}"] = value 

917 

918 return problems 

919 

920 async def render_parameters( 1a

921 self, triggered_action: "TriggeredAction" 

922 ) -> Dict[str, Any]: 

923 parameters = copy.deepcopy(self.parameters) or {} 

924 

925 # pre-process the parameters to upgrade any v1-style template values to v2 

926 self._upgrade_v1_templates(parameters) 

927 

928 # first-pass, hydrate parameters without rendering in order to collect all of 

929 # the embedded Jinja templates, workspace variables, etc 

930 placeholders = self._collect_placeholders( 

931 hydrate( 

932 parameters, 

933 HydrationContext( 

934 raise_on_error=False, 

935 render_jinja=False, 

936 render_workspace_variables=False, 

937 ), 

938 ) 

939 ) 

940 

941 # collect all templates so we can build up the context variables they need 

942 templates = [p.template for p in placeholders if isinstance(p, ValidJinja)] 

943 template_context = await self._template_context(templates, triggered_action) 

944 

945 # collect any referenced workspace variables so we can fetch them 

946 variable_names = [ 

947 p.variable_name for p in placeholders if isinstance(p, WorkspaceVariable) 

948 ] 

949 workspace_variables: Dict[str, StrictVariableValue] = {} 

950 if variable_names: 

951 async with await self.orchestration_client(triggered_action) as client: 

952 workspace_variables = await client.read_workspace_variables( 

953 variable_names 

954 ) 

955 

956 # second-pass, render the parameters with the full context 

957 parameters = hydrate( 

958 parameters, 

959 HydrationContext( 

960 raise_on_error=True, 

961 render_jinja=True, 

962 jinja_context=template_context, 

963 render_workspace_variables=True, 

964 workspace_variables=workspace_variables, 

965 ), 

966 ) 

967 

968 return parameters 

969 

970 @classmethod 1a

971 def _upgrade_v1_templates(cls, parameters: Parameters): 1a

972 """ 

973 Upgrades all v1-style template values from the parameters dictionary, changing 

974 the values in the given dictionary. v1-style templates are any plain strings 

975 that include Jinja2 template syntax. 

976 """ 

977 for key, value in parameters.items(): 

978 if isinstance(value, dict): 

979 # if we already have a __prefect_kind, don't upgrade or recurse 

980 if "__prefect_kind" in value: 

981 continue 

982 cls._upgrade_v1_templates(value) 

983 elif isinstance(value, list): 

984 for i, item in enumerate(value): 

985 if isinstance(item, dict): 

986 cls._upgrade_v1_templates(item) 

987 elif isinstance(item, str) and maybe_template(item): 

988 value[i] = {"__prefect_kind": "jinja", "template": item} 

989 elif isinstance(value, str) and maybe_template(value): # pyright: ignore[reportUnnecessaryIsInstance] 

990 parameters[key] = {"__prefect_kind": "jinja", "template": value} 

991 

992 def _collect_placeholders( 1a

993 self, parameters: Parameters | Placeholder 

994 ) -> list[Placeholder]: 

995 """ 

996 Recursively collects all placeholder values embedded within the parameters 

997 dictionary, including templates and workspace variables 

998 """ 

999 placeholders: list[Placeholder] = [] 

1000 

1001 if isinstance(parameters, Placeholder): 

1002 return [parameters] 

1003 

1004 for _, value in parameters.items(): 

1005 if isinstance(value, dict): 

1006 placeholders += self._collect_placeholders(value) 

1007 elif isinstance(value, list): 

1008 for item in value: 

1009 if isinstance(item, dict): 

1010 placeholders += self._collect_placeholders(item) 

1011 elif isinstance(item, Placeholder): 

1012 placeholders.append(item) 

1013 elif isinstance(value, Placeholder): 

1014 placeholders.append(value) 

1015 return placeholders 

1016 

1017 

1018class PauseDeployment(DeploymentCommandAction): 1a

1019 """Pauses the given Deployment""" 

1020 

1021 type: Literal["pause-deployment"] = "pause-deployment" 1a

1022 

1023 _action_description: ClassVar[str] = "Pausing deployment" 1a

1024 

1025 async def command( 1a

1026 self, 

1027 orchestration: "OrchestrationClient", 

1028 deployment_id: UUID, 

1029 triggered_action: "TriggeredAction", 

1030 ) -> Response: 

1031 return await orchestration.pause_deployment(deployment_id) 

1032 

1033 

1034class ResumeDeployment(DeploymentCommandAction): 1a

1035 """Resumes the given Deployment""" 

1036 

1037 type: Literal["resume-deployment"] = "resume-deployment" 1a

1038 

1039 _action_description: ClassVar[str] = "Resuming deployment" 1a

1040 

1041 async def command( 1a

1042 self, 

1043 orchestration: "OrchestrationClient", 

1044 deployment_id: UUID, 

1045 triggered_action: "TriggeredAction", 

1046 ) -> Response: 

1047 return await orchestration.resume_deployment(deployment_id) 

1048 

1049 

1050class FlowRunAction(ExternalDataAction): 1a

1051 """An action that operates on a flow run""" 

1052 

1053 async def flow_run(self, triggered_action: "TriggeredAction") -> UUID: 1a

1054 # Proactive triggers won't have an event, but they might be tracking 

1055 # buckets per-resource, so check for that first 

1056 labels = triggered_action.triggering_labels 

1057 if triggering_resource_id := labels.get("prefect.resource.id"): 

1058 if id := _id_from_resource_id(triggering_resource_id, "prefect.flow-run"): 

1059 return id 

1060 

1061 event = triggered_action.triggering_event 

1062 if event: 

1063 if id := _id_of_first_resource_of_kind(event, "prefect.flow-run"): 

1064 return id 

1065 

1066 raise ActionFailed("No flow run could be inferred") 

1067 

1068 

1069class FlowRunStateChangeAction(FlowRunAction): 1a

1070 """Changes the state of a flow run associated with the trigger""" 

1071 

1072 @abc.abstractmethod 1a

1073 async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate: 1a

1074 """Return the new state for the flow run""" 

1075 

1076 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

1077 flow_run_id = await self.flow_run(triggered_action) 

1078 

1079 self._resulting_related_resources.append( 

1080 RelatedResource.model_validate( 

1081 { 

1082 "prefect.resource.id": f"prefect.flow-run.{flow_run_id}", 

1083 "prefect.resource.role": "target", 

1084 } 

1085 ) 

1086 ) 

1087 

1088 logger.info( 

1089 "Changing flow run state", 

1090 extra={ 

1091 "flow_run_id": str(flow_run_id), 

1092 **self.logging_context(triggered_action), 

1093 }, 

1094 ) 

1095 

1096 async with await self.orchestration_client(triggered_action) as orchestration: 

1097 response = await orchestration.set_flow_run_state( 

1098 flow_run_id, await self.new_state(triggered_action=triggered_action) 

1099 ) 

1100 

1101 self._result_details["status_code"] = response.status_code 

1102 if response.status_code >= 300: 

1103 raise ActionFailed(self.reason_from_response(response)) 

1104 

1105 result = OrchestrationResult.model_validate(response.json()) 

1106 if not isinstance(result.details, StateAcceptDetails): 

1107 raise ActionFailed(f"Failed to set state: {result.details.reason}") 

1108 

1109 

1110class ChangeFlowRunState(FlowRunStateChangeAction): 1a

1111 """Changes the state of a flow run associated with the trigger""" 

1112 

1113 type: Literal["change-flow-run-state"] = "change-flow-run-state" 1a

1114 

1115 name: Optional[str] = Field( 1a

1116 None, 

1117 description="The name of the state to change the flow run to", 

1118 ) 

1119 state: StateType = Field( 1a

1120 ..., 

1121 description="The type of the state to change the flow run to", 

1122 ) 

1123 message: Optional[str] = Field( 1a

1124 None, 

1125 description="An optional message to associate with the state change", 

1126 ) 

1127 

1128 async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate: 1a

1129 message = ( 

1130 self.message 

1131 or f"State changed by Automation {triggered_action.automation.id}" 

1132 ) 

1133 

1134 return StateCreate( 

1135 name=self.name, 

1136 type=self.state, 

1137 message=message, 

1138 ) 

1139 

1140 

1141class CancelFlowRun(FlowRunStateChangeAction): 1a

1142 """Cancels a flow run associated with the trigger""" 

1143 

1144 type: Literal["cancel-flow-run"] = "cancel-flow-run" 1a

1145 

1146 async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate: 1a

1147 return StateCreate( 

1148 type=StateType.CANCELLING, 

1149 message=f"Cancelled by Automation {triggered_action.automation.id}", 

1150 ) 

1151 

1152 

1153class SuspendFlowRun(FlowRunStateChangeAction): 1a

1154 """Suspends a flow run associated with the trigger""" 

1155 

1156 type: Literal["suspend-flow-run"] = "suspend-flow-run" 1a

1157 

1158 async def new_state(self, triggered_action: "TriggeredAction") -> StateCreate: 1a

1159 state = Suspended( 

1160 timeout_seconds=3600, 

1161 message=f"Suspended by Automation {triggered_action.automation.id}", 

1162 ) 

1163 

1164 return StateCreate( 

1165 type=state.type, 

1166 name=state.name, 

1167 message=state.message, 

1168 state_details=state.state_details, 

1169 ) 

1170 

1171 

1172class ResumeFlowRun(FlowRunAction): 1a

1173 """Resumes a paused or suspended flow run associated with the trigger""" 

1174 

1175 type: Literal["resume-flow-run"] = "resume-flow-run" 1a

1176 

1177 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

1178 flow_run_id = await self.flow_run(triggered_action) 

1179 

1180 self._resulting_related_resources.append( 

1181 RelatedResource.model_validate( 

1182 { 

1183 "prefect.resource.id": f"prefect.flow-run.{flow_run_id}", 

1184 "prefect.resource.role": "target", 

1185 } 

1186 ) 

1187 ) 

1188 

1189 logger.debug( 

1190 "Resuming flow run", 

1191 extra={ 

1192 "flow_run_id": str(flow_run_id), 

1193 **self.logging_context(triggered_action), 

1194 }, 

1195 ) 

1196 

1197 async with await self.orchestration_client(triggered_action) as orchestration: 

1198 result = await orchestration.resume_flow_run(flow_run_id) 

1199 

1200 if not isinstance(result.details, StateAcceptDetails): 

1201 raise ActionFailed( 

1202 f"Failed to resume flow run: {result.details.reason}" 

1203 ) 

1204 

1205 

1206class CallWebhook(JinjaTemplateAction): 1a

1207 """Call a webhook when an Automation is triggered.""" 

1208 

1209 type: Literal["call-webhook"] = "call-webhook" 1a

1210 block_document_id: UUID = Field( 1a

1211 description="The identifier of the webhook block to use" 

1212 ) 

1213 payload: str = Field( 1a

1214 default="", 

1215 description="An optional templatable payload to send when calling the webhook.", 

1216 ) 

1217 

1218 @field_validator("payload", mode="before") 1a

1219 @classmethod 1a

1220 def ensure_payload_is_a_string( 1a

1221 cls, value: Union[str, Dict[str, Any], None] 

1222 ) -> Optional[str]: 

1223 """Temporary measure while we migrate payloads from being a dictionary to 

1224 a string template. This covers both reading from the database where values 

1225 may currently be a dictionary, as well as the API, where older versions of the 

1226 frontend may be sending a JSON object with the single `"message"` key.""" 

1227 if value is None: 

1228 return value 

1229 

1230 if isinstance(value, str): 

1231 return value 

1232 

1233 return orjson.dumps(value, option=orjson.OPT_INDENT_2).decode() 

1234 

1235 @field_validator("payload") 1a

1236 @classmethod 1a

1237 def validate_payload_templates(cls, value: Optional[str]) -> Optional[str]: 1a

1238 """ 

1239 Validate user-provided payload template. 

1240 """ 

1241 if not value: 

1242 return value 

1243 

1244 cls.validate_template(value, "payload") 

1245 

1246 return value 

1247 

1248 async def _get_webhook_block(self, triggered_action: "TriggeredAction") -> Webhook: 1a

1249 async with await self.orchestration_client(triggered_action) as orchestration: 

1250 response = await orchestration.read_block_document_raw( 

1251 self.block_document_id 

1252 ) 

1253 if response.status_code >= 300: 

1254 raise ActionFailed(self.reason_from_response(response)) 

1255 

1256 try: 

1257 block_document = BlockDocument.model_validate(response.json()) 

1258 block = await _load_block_from_block_document(block_document) 

1259 except Exception as e: 

1260 raise ActionFailed(f"The webhook block was invalid: {e!r}") 

1261 

1262 if not isinstance(block, Webhook): 

1263 raise ActionFailed("The referenced block was not a webhook block") 

1264 

1265 self._resulting_related_resources += [ 

1266 RelatedResource.model_validate( 

1267 { 

1268 "prefect.resource.id": f"prefect.block-document.{self.block_document_id}", 

1269 "prefect.resource.role": "block", 

1270 "prefect.resource.name": block_document.name, 

1271 } 

1272 ), 

1273 RelatedResource.model_validate( 

1274 { 

1275 "prefect.resource.id": f"prefect.block-type.{block.get_block_type_slug()}", 

1276 "prefect.resource.role": "block-type", 

1277 } 

1278 ), 

1279 ] 

1280 

1281 return block 

1282 

1283 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

1284 block = await self._get_webhook_block(triggered_action=triggered_action) 

1285 

1286 (payload,) = await self._render([self.payload], triggered_action) 

1287 

1288 try: 

1289 response = await block.call(payload=payload) 

1290 

1291 ok_headers = { 

1292 k: v for k, v in response.headers.items() if not should_redact_header(k) 

1293 } 

1294 

1295 self._result_details.update( 

1296 { 

1297 "status_code": response.status_code, 

1298 "response_body": truncated_to(1000, response.text), 

1299 "response_headers": {**(ok_headers or {})}, 

1300 } 

1301 ) 

1302 except Exception as e: 

1303 raise ActionFailed(f"Webhook call failed: {e!r}") 

1304 

1305 

1306class SendNotification(JinjaTemplateAction): 1a

1307 """Send a notification when an Automation is triggered""" 

1308 

1309 type: Literal["send-notification"] = "send-notification" 1a

1310 block_document_id: UUID = Field( 1a

1311 description="The identifier of the notification block to use" 

1312 ) 

1313 subject: str = Field("Prefect automated notification") 1a

1314 body: str = Field(description="The text of the notification to send") 1a

1315 

1316 @field_validator("subject", "body") 1a

1317 def is_valid_template(cls, value: str, info: ValidationInfo) -> str: 1a

1318 if TYPE_CHECKING: 

1319 assert isinstance(info.field_name, str) 

1320 return cls.validate_template(value, info.field_name) 

1321 

1322 async def _get_notification_block( 1a

1323 self, triggered_action: "TriggeredAction" 

1324 ) -> NotificationBlock: 

1325 async with await self.orchestration_client(triggered_action) as orion: 

1326 response = await orion.read_block_document_raw(self.block_document_id) 

1327 if response.status_code >= 300: 

1328 raise ActionFailed(self.reason_from_response(response)) 

1329 

1330 try: 

1331 block_document = BlockDocument.model_validate(response.json()) 

1332 block = await _load_block_from_block_document(block_document) 

1333 except Exception as e: 

1334 raise ActionFailed(f"The notification block was invalid: {e!r}") 

1335 

1336 if "notify" not in block.get_block_capabilities(): 

1337 raise ActionFailed("The referenced block was not a notification block") 

1338 

1339 self._resulting_related_resources += [ 

1340 RelatedResource.model_validate( 

1341 { 

1342 "prefect.resource.id": f"prefect.block-document.{self.block_document_id}", 

1343 "prefect.resource.role": "block", 

1344 "prefect.resource.name": block_document.name, 

1345 } 

1346 ), 

1347 RelatedResource.model_validate( 

1348 { 

1349 "prefect.resource.id": f"prefect.block-type.{block.get_block_type_slug()}", 

1350 "prefect.resource.role": "block-type", 

1351 } 

1352 ), 

1353 ] 

1354 

1355 return cast(NotificationBlock, block) 

1356 

1357 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

1358 block = await self._get_notification_block(triggered_action=triggered_action) 

1359 

1360 subject, body = await self.render(triggered_action) 

1361 

1362 with block.raise_on_failure(): 

1363 try: 

1364 await block.notify(subject=subject, body=body) 

1365 except NotificationError as e: 

1366 self._result_details["notification_log"] = e.log 

1367 raise ActionFailed("Notification failed") 

1368 

1369 async def render(self, triggered_action: "TriggeredAction") -> List[str]: 1a

1370 return await self._render([self.subject, self.body], triggered_action) 

1371 

1372 

1373class WorkPoolAction(Action): 1a

1374 """Base class for Actions that operate on Work Pools and need to infer them from 

1375 events""" 

1376 

1377 source: Literal["selected", "inferred"] = Field( 1a

1378 "selected", 

1379 description=( 

1380 "Whether this Action applies to a specific selected " 

1381 "work pool (given by `work_pool_id`), or to a work pool that is " 

1382 "inferred from the triggering event. If the source is 'inferred', " 

1383 "the `work_pool_id` may not be set. If the source is 'selected', the " 

1384 "`work_pool_id` must be set." 

1385 ), 

1386 ) 

1387 work_pool_id: Optional[UUID] = Field( 1a

1388 None, 

1389 description="The identifier of the work pool to pause", 

1390 ) 

1391 

1392 @model_validator(mode="after") 1a

1393 def selected_work_pool_requires_id(self) -> Self: 1a

1394 wants_selected_work_pool = self.source == "selected" 

1395 has_work_pool_id = bool(self.work_pool_id) 

1396 if wants_selected_work_pool != has_work_pool_id: 

1397 raise ValueError( 

1398 "work_pool_id is " + ("not allowed" if has_work_pool_id else "required") 

1399 ) 

1400 return self 

1401 

1402 async def work_pool_id_to_use(self, triggered_action: "TriggeredAction") -> UUID: 1a

1403 if self.source == "selected": 

1404 assert self.work_pool_id 

1405 return self.work_pool_id 

1406 

1407 event = triggered_action.triggering_event 

1408 if not event: 

1409 raise ActionFailed("No event to infer the work pool") 

1410 

1411 assert event 

1412 if id := _id_of_first_resource_of_kind(event, "prefect.work-pool"): 

1413 return id 

1414 

1415 raise ActionFailed("No work pool could be inferred") 

1416 

1417 

1418class WorkPoolCommandAction(WorkPoolAction, ExternalDataAction): 1a

1419 _action_description: ClassVar[str] 1a

1420 

1421 _target_work_pool: Optional[WorkPool] = PrivateAttr(default=None) 1a

1422 

1423 async def target_work_pool(self, triggered_action: "TriggeredAction") -> WorkPool: 1a

1424 if not self._target_work_pool: 

1425 work_pool_id = await self.work_pool_id_to_use(triggered_action) 

1426 

1427 async with await self.orchestration_client( 

1428 triggered_action 

1429 ) as orchestration: 

1430 work_pool = await orchestration.read_work_pool(work_pool_id) 

1431 

1432 if not work_pool: 

1433 raise ActionFailed(f"Work pool {work_pool_id} not found") 

1434 self._target_work_pool = work_pool 

1435 return self._target_work_pool 

1436 

1437 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

1438 work_pool = await self.target_work_pool(triggered_action) 

1439 

1440 self._resulting_related_resources += [ 

1441 RelatedResource.model_validate( 

1442 { 

1443 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", 

1444 "prefect.resource.name": work_pool.name, 

1445 "prefect.resource.role": "target", 

1446 } 

1447 ) 

1448 ] 

1449 

1450 logger.info( 

1451 self._action_description, 

1452 extra={ 

1453 "work_pool_id": work_pool.id, 

1454 **self.logging_context(triggered_action), 

1455 }, 

1456 ) 

1457 

1458 async with await self.orchestration_client(triggered_action) as orchestration: 

1459 response = await self.command(orchestration, work_pool, triggered_action) 

1460 

1461 self._result_details["status_code"] = response.status_code 

1462 if response.status_code >= 300: 

1463 raise ActionFailed(self.reason_from_response(response)) 

1464 

1465 @abc.abstractmethod 1a

1466 async def command( 1a

1467 self, 

1468 orchestration: "OrchestrationClient", 

1469 work_pool: WorkPool, 

1470 triggered_action: "TriggeredAction", 

1471 ) -> Response: 

1472 """Issue the command to the Work Pool""" 

1473 

1474 

1475class PauseWorkPool(WorkPoolCommandAction): 1a

1476 """Pauses a Work Pool""" 

1477 

1478 type: Literal["pause-work-pool"] = "pause-work-pool" 1a

1479 

1480 _action_description: ClassVar[str] = "Pausing work pool" 1a

1481 

1482 async def command( 1a

1483 self, 

1484 orchestration: "OrchestrationClient", 

1485 work_pool: WorkPool, 

1486 triggered_action: "TriggeredAction", 

1487 ) -> Response: 

1488 return await orchestration.pause_work_pool(work_pool.name) 

1489 

1490 

1491class ResumeWorkPool(WorkPoolCommandAction): 1a

1492 """Resumes a Work Pool""" 

1493 

1494 type: Literal["resume-work-pool"] = "resume-work-pool" 1a

1495 

1496 _action_description: ClassVar[str] = "Resuming work pool" 1a

1497 

1498 async def command( 1a

1499 self, 

1500 orchestration: "OrchestrationClient", 

1501 work_pool: WorkPool, 

1502 triggered_action: "TriggeredAction", 

1503 ) -> Response: 

1504 return await orchestration.resume_work_pool(work_pool.name) 

1505 

1506 

1507class WorkQueueAction(Action): 1a

1508 """Base class for Actions that operate on Work Queues and need to infer them from 

1509 events""" 

1510 

1511 source: Literal["selected", "inferred"] = Field( 1a

1512 "selected", 

1513 description=( 

1514 "Whether this Action applies to a specific selected " 

1515 "work queue (given by `work_queue_id`), or to a work queue that is " 

1516 "inferred from the triggering event. If the source is 'inferred', " 

1517 "the `work_queue_id` may not be set. If the source is 'selected', the " 

1518 "`work_queue_id` must be set." 

1519 ), 

1520 ) 

1521 work_queue_id: Optional[UUID] = Field( 1a

1522 None, description="The identifier of the work queue to pause" 

1523 ) 

1524 

1525 @model_validator(mode="after") 1a

1526 def selected_work_queue_requires_id(self) -> Self: 1a

1527 wants_selected_work_queue = self.source == "selected" 

1528 has_work_queue_id = bool(self.work_queue_id) 

1529 if wants_selected_work_queue != has_work_queue_id: 

1530 raise ValueError( 

1531 "work_queue_id is " 

1532 + ("not allowed" if has_work_queue_id else "required") 

1533 ) 

1534 return self 

1535 

1536 async def work_queue_id_to_use(self, triggered_action: "TriggeredAction") -> UUID: 1a

1537 if self.source == "selected": 

1538 assert self.work_queue_id 

1539 return self.work_queue_id 

1540 

1541 event = triggered_action.triggering_event 

1542 if not event: 

1543 raise ActionFailed("No event to infer the work queue") 

1544 

1545 assert event 

1546 if id := _id_of_first_resource_of_kind(event, "prefect.work-queue"): 

1547 return id 

1548 

1549 raise ActionFailed("No work queue could be inferred") 

1550 

1551 

1552class WorkQueueCommandAction(WorkQueueAction, ExternalDataAction): 1a

1553 _action_description: ClassVar[str] 1a

1554 

1555 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

1556 work_queue_id = await self.work_queue_id_to_use(triggered_action) 

1557 

1558 self._resulting_related_resources += [ 

1559 RelatedResource.model_validate( 

1560 { 

1561 "prefect.resource.id": f"prefect.work-queue.{work_queue_id}", 

1562 "prefect.resource.role": "target", 

1563 } 

1564 ) 

1565 ] 

1566 

1567 logger.info( 

1568 self._action_description, 

1569 extra={ 

1570 "work_queue_id": work_queue_id, 

1571 **self.logging_context(triggered_action), 

1572 }, 

1573 ) 

1574 

1575 async with await self.orchestration_client(triggered_action) as orchestration: 

1576 response = await self.command( 

1577 orchestration, work_queue_id, triggered_action 

1578 ) 

1579 

1580 self._result_details["status_code"] = response.status_code 

1581 if response.status_code >= 300: 

1582 raise ActionFailed(self.reason_from_response(response)) 

1583 

1584 @abc.abstractmethod 1a

1585 async def command( 1a

1586 self, 

1587 orchestration: "OrchestrationClient", 

1588 work_queue_id: UUID, 

1589 triggered_action: "TriggeredAction", 

1590 ) -> Response: 

1591 """Issue the command to the Work Queue""" 

1592 

1593 

1594class PauseWorkQueue(WorkQueueCommandAction): 1a

1595 """Pauses a Work Queue""" 

1596 

1597 type: Literal["pause-work-queue"] = "pause-work-queue" 1a

1598 

1599 _action_description: ClassVar[str] = "Pausing work queue" 1a

1600 

1601 async def command( 1a

1602 self, 

1603 orchestration: "OrchestrationClient", 

1604 work_queue_id: UUID, 

1605 triggered_action: "TriggeredAction", 

1606 ) -> Response: 

1607 return await orchestration.pause_work_queue(work_queue_id) 

1608 

1609 

1610class ResumeWorkQueue(WorkQueueCommandAction): 1a

1611 """Resumes a Work Queue""" 

1612 

1613 type: Literal["resume-work-queue"] = "resume-work-queue" 1a

1614 

1615 _action_description: ClassVar[str] = "Resuming work queue" 1a

1616 

1617 async def command( 1a

1618 self, 

1619 orchestration: "OrchestrationClient", 

1620 work_queue_id: UUID, 

1621 triggered_action: "TriggeredAction", 

1622 ) -> Response: 

1623 return await orchestration.resume_work_queue(work_queue_id) 

1624 

1625 

1626class AutomationAction(Action): 1a

1627 """Base class for Actions that operate on Automations and need to infer them from 

1628 events""" 

1629 

1630 source: Literal["selected", "inferred"] = Field( 1a

1631 "selected", 

1632 description=( 

1633 "Whether this Action applies to a specific selected " 

1634 "automation (given by `automation_id`), or to an automation that is " 

1635 "inferred from the triggering event. If the source is 'inferred', " 

1636 "the `automation_id` may not be set. If the source is 'selected', the " 

1637 "`automation_id` must be set." 

1638 ), 

1639 ) 

1640 automation_id: Optional[UUID] = Field( 1a

1641 None, description="The identifier of the automation to act on" 

1642 ) 

1643 

1644 @model_validator(mode="after") 1a

1645 def selected_automation_requires_id(self) -> Self: 1a

1646 wants_selected_automation = self.source == "selected" 

1647 has_automation_id = bool(self.automation_id) 

1648 if wants_selected_automation != has_automation_id: 

1649 raise ValueError( 

1650 "automation_id is " 

1651 + ("not allowed" if has_automation_id else "required") 

1652 ) 

1653 return self 

1654 

1655 async def automation_id_to_use(self, triggered_action: "TriggeredAction") -> UUID: 1a

1656 if self.source == "selected": 

1657 assert self.automation_id 

1658 return self.automation_id 

1659 

1660 event = triggered_action.triggering_event 

1661 if not event: 

1662 raise ActionFailed("No event to infer the automation") 

1663 

1664 assert event 

1665 if id := _id_of_first_resource_of_kind(event, "prefect.automation"): 

1666 return id 

1667 

1668 raise ActionFailed("No automation could be inferred") 

1669 

1670 

1671class AutomationCommandAction(AutomationAction, ExternalDataAction): 1a

1672 _action_description: ClassVar[str] 1a

1673 

1674 async def act(self, triggered_action: "TriggeredAction") -> None: 1a

1675 automation_id = await self.automation_id_to_use(triggered_action) 

1676 

1677 self._resulting_related_resources += [ 

1678 RelatedResource.model_validate( 

1679 { 

1680 "prefect.resource.id": f"prefect.automation.{automation_id}", 

1681 "prefect.resource.role": "target", 

1682 } 

1683 ) 

1684 ] 

1685 

1686 logger.info( 

1687 self._action_description, 

1688 extra={ 

1689 "automation_id": automation_id, 

1690 **self.logging_context(triggered_action), 

1691 }, 

1692 ) 

1693 

1694 async with await self.events_api_client(triggered_action) as events: 

1695 response = await self.command(events, automation_id, triggered_action) 

1696 

1697 self._result_details["status_code"] = response.status_code 

1698 if response.status_code >= 300: 

1699 raise ActionFailed(self.reason_from_response(response)) 

1700 

1701 @abc.abstractmethod 1a

1702 async def command( 1a

1703 self, 

1704 events: PrefectServerEventsAPIClient, 

1705 automation_id: UUID, 

1706 triggered_action: "TriggeredAction", 

1707 ) -> Response: 

1708 """Issue the command to the Work Queue""" 

1709 

1710 

1711class PauseAutomation(AutomationCommandAction): 1a

1712 """Pauses a Work Queue""" 

1713 

1714 type: Literal["pause-automation"] = "pause-automation" 1a

1715 

1716 _action_description: ClassVar[str] = "Pausing automation" 1a

1717 

1718 async def command( 1a

1719 self, 

1720 events: PrefectServerEventsAPIClient, 

1721 automation_id: UUID, 

1722 triggered_action: "TriggeredAction", 

1723 ) -> Response: 

1724 return await events.pause_automation(automation_id) 

1725 

1726 

1727class ResumeAutomation(AutomationCommandAction): 1a

1728 """Resumes a Work Queue""" 

1729 

1730 type: Literal["resume-automation"] = "resume-automation" 1a

1731 

1732 _action_description: ClassVar[str] = "Resuming auitomation" 1a

1733 

1734 async def command( 1a

1735 self, 

1736 events: PrefectServerEventsAPIClient, 

1737 automation_id: UUID, 

1738 triggered_action: "TriggeredAction", 

1739 ) -> Response: 

1740 return await events.resume_automation(automation_id) 

1741 

1742 

1743# The actual action types that we support. It's important to update this 

1744# Union when adding new subclasses of Action so that they are available for clients 

1745# and in the OpenAPI docs 

1746ServerActionTypes: TypeAlias = Union[ 1a

1747 DoNothing, 

1748 RunDeployment, 

1749 PauseDeployment, 

1750 ResumeDeployment, 

1751 CancelFlowRun, 

1752 ChangeFlowRunState, 

1753 PauseWorkQueue, 

1754 ResumeWorkQueue, 

1755 SendNotification, 

1756 CallWebhook, 

1757 PauseAutomation, 

1758 ResumeAutomation, 

1759 SuspendFlowRun, 

1760 ResumeFlowRun, 

1761 PauseWorkPool, 

1762 ResumeWorkPool, 

1763] 

1764 

1765 

1766_recent_actions: MutableMapping[UUID, bool] = TTLCache(maxsize=10000, ttl=3600) 1a

1767 

1768 

1769async def record_action_happening(id: UUID) -> None: 1a

1770 """Record that an action has happened, with an expiration of an hour.""" 

1771 _recent_actions[id] = True 

1772 

1773 

1774async def action_has_already_happened(id: UUID) -> bool: 1a

1775 """Check if the action has already happened""" 

1776 return _recent_actions.get(id, False) 

1777 

1778 

1779@asynccontextmanager 1a

1780async def consumer() -> AsyncGenerator[MessageHandler, None]: 1a

1781 from prefect.server.events.schemas.automations import TriggeredAction 

1782 

1783 async def message_handler(message: Message): 

1784 if not message.data: 

1785 return 

1786 

1787 triggered_action = TriggeredAction.model_validate_json(message.data) 

1788 action = triggered_action.action 

1789 

1790 if await action_has_already_happened(triggered_action.id): 

1791 logger.info( 

1792 "Action %s has already been executed, skipping", 

1793 triggered_action.id, 

1794 ) 

1795 return 

1796 

1797 try: 

1798 await action.act(triggered_action) 

1799 except ActionFailed as e: 

1800 # ActionFailed errors are expected errors and will not be retried 

1801 await action.fail(triggered_action, e.reason) 

1802 else: 

1803 await action.succeed(triggered_action) 

1804 await record_action_happening(triggered_action.id) 

1805 

1806 logger.info("Starting action message handler") 

1807 yield message_handler 

1808 

1809 

1810async def _load_block_from_block_document( 1a

1811 block_document: BlockDocument, 

1812) -> Block: 

1813 if block_document.block_schema is None: 

1814 raise ValueError("Unable to determine block schema for provided block document") 

1815 

1816 block_cls = Block.get_block_class_from_schema(block_document.block_schema) 

1817 

1818 block = block_cls.model_validate(block_document.data) 

1819 block._block_document_id = block_document.id 

1820 block.__class__._block_schema_id = block_document.block_schema_id 

1821 block.__class__._block_type_id = block_document.block_type_id 

1822 block._block_document_name = block_document.name 

1823 block._is_anonymous = block_document.is_anonymous 

1824 block._define_metadata_on_nested_blocks(block_document.block_document_references) 

1825 

1826 resources = block._event_method_called_resources() 

1827 if resources: 

1828 kind = block._event_kind() 

1829 resource, related = resources 

1830 async with PrefectServerEventsClient() as events_client: 

1831 await events_client.emit( 

1832 Event( 

1833 id=uuid7(), 

1834 occurred=now("UTC"), 

1835 event=f"{kind}.loaded", 

1836 resource=Resource.model_validate(resource), 

1837 related=[RelatedResource.model_validate(r) for r in related], 

1838 ) 

1839 ) 

1840 

1841 return block