Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_triggers.py: 21%

42 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3import json 1a

4from typing import Any 1a

5from uuid import UUID 1a

6 

7from pydantic import TypeAdapter 1a

8 

9from prefect.client.orchestration import PrefectClient 1a

10from prefect.events import DeploymentTriggerTypes, TriggerTypes 1a

11from prefect.exceptions import PrefectHTTPStatusError 1a

12 

13DeploymentTriggerAdapter: TypeAdapter[DeploymentTriggerTypes] = TypeAdapter( 1a

14 DeploymentTriggerTypes 

15) 

16 

17 

18def _initialize_deployment_triggers( 1a

19 deployment_name: str, triggers_spec: list[dict[str, Any]] 

20) -> list[DeploymentTriggerTypes]: 

21 triggers: list[DeploymentTriggerTypes] = [] 

22 for i, spec in enumerate(triggers_spec, start=1): 

23 spec.setdefault("name", f"{deployment_name}__automation_{i}") 

24 triggers.append(DeploymentTriggerAdapter.validate_python(spec)) 

25 return triggers 

26 

27 

28async def _create_deployment_triggers( 1a

29 client: PrefectClient, 

30 deployment_id: UUID, 

31 triggers: list[DeploymentTriggerTypes | TriggerTypes], 

32): 

33 try: 

34 await client.delete_resource_owned_automations( 

35 f"prefect.deployment.{deployment_id}" 

36 ) 

37 except PrefectHTTPStatusError as e: 

38 if e.response.status_code == 404: 

39 return 

40 raise e 

41 

42 for trigger in triggers: 

43 trigger.set_deployment_id(deployment_id) 

44 await client.create_automation(trigger.as_automation()) 

45 

46 

47def _gather_deployment_trigger_definitions( 1a

48 trigger_flags: list[str], existing_triggers: list[dict[str, Any]] 

49) -> list[dict[str, Any]]: 

50 if trigger_flags: 

51 trigger_specs: list[dict[str, Any]] = [] 

52 for t in trigger_flags: 

53 try: 

54 if t.endswith(".yaml"): 

55 import yaml 

56 

57 with open(t, "r") as f: 

58 trigger_specs.extend(yaml.safe_load(f).get("triggers", [])) 

59 elif t.endswith(".json"): 

60 with open(t, "r") as f: 

61 trigger_specs.extend(json.load(f).get("triggers", [])) 

62 else: 

63 trigger_specs.append(json.loads(t)) 

64 except Exception as e: 

65 raise ValueError(f"Failed to parse trigger: {t}. Error: {str(e)}") 

66 return trigger_specs 

67 

68 return existing_triggers