Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_triggers.py: 21%
42 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import json 1a
4from typing import Any 1a
5from uuid import UUID 1a
7from pydantic import TypeAdapter 1a
9from prefect.client.orchestration import PrefectClient 1a
10from prefect.events import DeploymentTriggerTypes, TriggerTypes 1a
11from prefect.exceptions import PrefectHTTPStatusError 1a
13DeploymentTriggerAdapter: TypeAdapter[DeploymentTriggerTypes] = TypeAdapter( 1a
14 DeploymentTriggerTypes
15)
18def _initialize_deployment_triggers( 1a
19 deployment_name: str, triggers_spec: list[dict[str, Any]]
20) -> list[DeploymentTriggerTypes]:
21 triggers: list[DeploymentTriggerTypes] = []
22 for i, spec in enumerate(triggers_spec, start=1):
23 spec.setdefault("name", f"{deployment_name}__automation_{i}")
24 triggers.append(DeploymentTriggerAdapter.validate_python(spec))
25 return triggers
28async def _create_deployment_triggers( 1a
29 client: PrefectClient,
30 deployment_id: UUID,
31 triggers: list[DeploymentTriggerTypes | TriggerTypes],
32):
33 try:
34 await client.delete_resource_owned_automations(
35 f"prefect.deployment.{deployment_id}"
36 )
37 except PrefectHTTPStatusError as e:
38 if e.response.status_code == 404:
39 return
40 raise e
42 for trigger in triggers:
43 trigger.set_deployment_id(deployment_id)
44 await client.create_automation(trigger.as_automation())
47def _gather_deployment_trigger_definitions( 1a
48 trigger_flags: list[str], existing_triggers: list[dict[str, Any]]
49) -> list[dict[str, Any]]:
50 if trigger_flags:
51 trigger_specs: list[dict[str, Any]] = []
52 for t in trigger_flags:
53 try:
54 if t.endswith(".yaml"):
55 import yaml
57 with open(t, "r") as f:
58 trigger_specs.extend(yaml.safe_load(f).get("triggers", []))
59 elif t.endswith(".json"):
60 with open(t, "r") as f:
61 trigger_specs.extend(json.load(f).get("triggers", []))
62 else:
63 trigger_specs.append(json.loads(t))
64 except Exception as e:
65 raise ValueError(f"Failed to parse trigger: {t}. Error: {str(e)}")
66 return trigger_specs
68 return existing_triggers