Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/schemas/deployment_triggers.py: 53%

37 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Schemas for defining triggers within a Prefect deployment YAML. This is a separate 

3parallel hierarchy for representing triggers so that they can also include the 

4information necessary to create an automation. 

5 

6These triggers should follow the validation rules of the main Trigger class hierarchy as 

7closely as possible (because otherwise users will get validation errors creating 

8triggers), but we can be more liberal with the defaults here to make it simpler to 

9create them from YAML. 

10""" 

11 

12import abc 1a

13from datetime import timedelta 1a

14from typing import ( 1a

15 Annotated, 

16 Any, 

17 ClassVar, 

18 Dict, 

19 Optional, 

20 Type, 

21 Union, 

22) 

23 

24from pydantic import Discriminator, Field, Tag 1a

25from typing_extensions import TypeAlias 1a

26 

27from prefect._internal.schemas.bases import PrefectBaseModel 1a

28from prefect.types import NonNegativeTimeDelta 1a

29 

30from .automations import ( 1a

31 CompoundTrigger, 

32 EventTrigger, 

33 MetricTrigger, 

34 SequenceTrigger, 

35 TriggerTypes, 

36) 

37 

38 

39class BaseDeploymentTrigger(PrefectBaseModel, abc.ABC, extra="ignore"): # type: ignore[call-arg] 1a

40 """ 

41 Base class describing a set of criteria that must be satisfied in order to trigger 

42 an automation. 

43 """ 

44 

45 # Fields from Automation 

46 

47 name: Optional[str] = Field( 1a

48 default=None, 

49 description="The name to give to the automation created for this trigger.", 

50 ) 

51 description: str = Field( 1a

52 default="", description="A longer description of this automation" 

53 ) 

54 enabled: bool = Field( 1a

55 default=True, description="Whether this automation will be evaluated" 

56 ) 

57 

58 # Fields from the RunDeployment action 

59 

60 parameters: Optional[Dict[str, Any]] = Field( 1a

61 default=None, 

62 description=( 

63 "The parameters to pass to the deployment, or None to use the " 

64 "deployment's default parameters" 

65 ), 

66 ) 

67 job_variables: Optional[Dict[str, Any]] = Field( 1a

68 default=None, 

69 description=( 

70 "Job variables to pass to the deployment, or None to use the " 

71 "deployment's default job variables" 

72 ), 

73 ) 

74 schedule_after: NonNegativeTimeDelta = Field( 1a

75 default_factory=lambda: timedelta(0), 

76 description=( 

77 "The amount of time to wait before running the deployment. " 

78 "Defaults to running the deployment immediately." 

79 ), 

80 ) 

81 

82 

83class DeploymentEventTrigger(BaseDeploymentTrigger, EventTrigger): 1a

84 """ 

85 A trigger that fires based on the presence or absence of events within a given 

86 period of time. 

87 """ 

88 

89 trigger_type: ClassVar[Type[TriggerTypes]] = EventTrigger 1a

90 

91 

92class DeploymentMetricTrigger(BaseDeploymentTrigger, MetricTrigger): 1a

93 """ 

94 A trigger that fires based on the results of a metric query. 

95 """ 

96 

97 trigger_type: ClassVar[Type[TriggerTypes]] = MetricTrigger 1a

98 

99 

100class DeploymentCompoundTrigger(BaseDeploymentTrigger, CompoundTrigger): 1a

101 """A composite trigger that requires some number of triggers to have 

102 fired within the given time period""" 

103 

104 trigger_type: ClassVar[Type[TriggerTypes]] = CompoundTrigger 1a

105 

106 

107class DeploymentSequenceTrigger(BaseDeploymentTrigger, SequenceTrigger): 1a

108 """A composite trigger that requires some number of triggers to have fired 

109 within the given time period in a specific order""" 

110 

111 trigger_type: ClassVar[Type[TriggerTypes]] = SequenceTrigger 1a

112 

113 

114def deployment_trigger_discriminator(value: Any) -> str: 1a

115 """Custom discriminator for deployment triggers that defaults to 'event' if no type is specified.""" 

116 if isinstance(value, dict): 

117 # Check for explicit type first 

118 if "type" in value: 

119 return value["type"] 

120 # Infer from posture for backward compatibility 

121 posture = value.get("posture") 

122 if posture == "Metric": 

123 return "metric" 

124 # Check for compound/sequence specific fields 

125 if "triggers" in value and "require" in value: 

126 return "compound" 

127 if "triggers" in value and "require" not in value: 

128 return "sequence" 

129 # Default to event 

130 return "event" 

131 return getattr(value, "type", "event") 

132 

133 

134# Concrete deployment trigger types 

135DeploymentTriggerTypes: TypeAlias = Annotated[ 1a

136 Union[ 

137 Annotated[DeploymentEventTrigger, Tag("event")], 

138 Annotated[DeploymentMetricTrigger, Tag("metric")], 

139 Annotated[DeploymentCompoundTrigger, Tag("compound")], 

140 Annotated[DeploymentSequenceTrigger, Tag("sequence")], 

141 ], 

142 Discriminator(deployment_trigger_discriminator), 

143]