Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/schemas/events.py: 30%

152 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import copy 1a

2from collections import defaultdict 1a

3from typing import ( 1a

4 TYPE_CHECKING, 

5 Any, 

6 ClassVar, 

7 Dict, 

8 Iterable, 

9 List, 

10 Mapping, 

11 Optional, 

12 Sequence, 

13 Tuple, 

14 Union, 

15) 

16from uuid import UUID 1a

17 

18from pydantic import ( 1a

19 AfterValidator, 

20 ConfigDict, 

21 Field, 

22 RootModel, 

23 model_validator, 

24) 

25from typing_extensions import Annotated, Self 1a

26 

27import prefect.types._datetime 1a

28from prefect._internal.schemas.bases import PrefectBaseModel 1a

29from prefect._internal.uuid7 import uuid7 1a

30from prefect.logging import get_logger 1a

31from prefect.settings import ( 1a

32 PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE, 

33) 

34 

35from .labelling import Labelled 1a

36 

37if TYPE_CHECKING: 37 ↛ 38line 37 didn't jump to line 38 because the condition on line 37 was never true1a

38 import logging 

39 

40logger: "logging.Logger" = get_logger(__name__) 1a

41 

42 

43class Resource(Labelled): 1a

44 """An observable business object of interest to the user""" 

45 

46 @model_validator(mode="after") 1a

47 def enforce_maximum_labels(self) -> Self: 1a

48 if len(self.root) > PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value(): 

49 raise ValueError( 

50 "The maximum number of labels per resource " 

51 f"is {PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value()}" 

52 ) 

53 

54 return self 

55 

56 @model_validator(mode="after") 1a

57 def requires_resource_id(self) -> Self: 1a

58 if "prefect.resource.id" not in self.root: 

59 raise ValueError("Resources must include the prefect.resource.id label") 

60 if not self.root["prefect.resource.id"]: 

61 raise ValueError("The prefect.resource.id label must be non-empty") 

62 

63 return self 

64 

65 @property 1a

66 def id(self) -> str: 1a

67 return self["prefect.resource.id"] 

68 

69 @property 1a

70 def name(self) -> Optional[str]: 1a

71 return self.get("prefect.resource.name") 

72 

73 def prefect_object_id(self, kind: str) -> UUID: 1a

74 """Extracts the UUID from an event's resource ID if it's the expected kind 

75 of prefect resource""" 

76 prefix = f"{kind}." if not kind.endswith(".") else kind 

77 

78 if not self.id.startswith(prefix): 

79 raise ValueError(f"Resource ID {self.id} does not start with {prefix}") 

80 

81 return UUID(self.id[len(prefix) :]) 

82 

83 

84class RelatedResource(Resource): 1a

85 """A Resource with a specific role in an Event""" 

86 

87 @model_validator(mode="after") 1a

88 def requires_resource_role(self) -> Self: 1a

89 if "prefect.resource.role" not in self.root: 

90 raise ValueError( 

91 "Related Resources must include the prefect.resource.role label" 

92 ) 

93 if not self.root["prefect.resource.role"]: 

94 raise ValueError("The prefect.resource.role label must be non-empty") 

95 

96 return self 

97 

98 @property 1a

99 def role(self) -> str: 1a

100 return self["prefect.resource.role"] 

101 

102 

103def _validate_related_resources(value) -> List: 1a

104 from prefect.settings import PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES 

105 

106 if len(value) > PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value(): 

107 raise ValueError( 

108 "The maximum number of related resources " 

109 f"is {PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value()}" 

110 ) 

111 return value 

112 

113 

114class Event(PrefectBaseModel): 1a

115 """The client-side view of an event that has happened to a Resource""" 

116 

117 model_config: ClassVar[ConfigDict] = ConfigDict(extra="ignore") 1a

118 

119 occurred: prefect.types._datetime.DateTime = Field( 1a

120 default_factory=lambda: prefect.types._datetime.now("UTC"), 

121 description="When the event happened from the sender's perspective", 

122 ) 

123 event: str = Field(description="The name of the event that happened") 1a

124 resource: Resource = Field( 1a

125 description="The primary Resource this event concerns", 

126 ) 

127 related: Annotated[ 1a

128 List[RelatedResource], 

129 AfterValidator(_validate_related_resources), 

130 ] = Field( 

131 default_factory=list, 

132 description="A list of additional Resources involved in this event", 

133 ) 

134 payload: Dict[str, Any] = Field( 1a

135 default_factory=dict, 

136 description="An open-ended set of data describing what happened", 

137 ) 

138 id: UUID = Field( 1a

139 default_factory=uuid7, 

140 description="The client-provided identifier of this event", 

141 ) 

142 follows: Optional[UUID] = Field( 1a

143 default=None, 

144 description=( 

145 "The ID of an event that is known to have occurred prior to this one. " 

146 "If set, this may be used to establish a more precise ordering of causally-" 

147 "related events when they occur close enough together in time that the " 

148 "system may receive them out-of-order." 

149 ), 

150 ) 

151 

152 @property 1a

153 def involved_resources(self) -> Sequence[Resource]: 1a

154 return [self.resource] + list(self.related) 

155 

156 @property 1a

157 def resource_in_role(self) -> Mapping[str, RelatedResource]: 1a

158 """Returns a mapping of roles to the first related resource in that role""" 

159 return {related.role: related for related in reversed(self.related)} 

160 

161 @property 1a

162 def resources_in_role(self) -> Mapping[str, Sequence[RelatedResource]]: 1a

163 """Returns a mapping of roles to related resources in that role""" 

164 resources: Dict[str, List[RelatedResource]] = defaultdict(list) 

165 for related in self.related: 

166 resources[related.role].append(related) 

167 return resources 

168 

169 def find_resource_label(self, label: str) -> Optional[str]: 1a

170 """Finds the value of the given label in this event's resource or one of its 

171 related resources. If the label starts with `related:<role>:`, search for the 

172 first matching label in a related resource with that role.""" 

173 directive, _, related_label = label.rpartition(":") 

174 directive, _, role = directive.partition(":") 

175 if directive == "related": 

176 for related in self.related: 

177 if related.role == role: 

178 return related.get(related_label) 

179 return self.resource.get(label) 

180 

181 

182class ReceivedEvent(Event): 1a

183 """The server-side view of an event that has happened to a Resource after it has 

184 been received by the server""" 

185 

186 model_config: ClassVar[ConfigDict] = ConfigDict(from_attributes=True) 1a

187 

188 received: prefect.types._datetime.DateTime = Field( 1a

189 ..., 

190 description="When the event was received by Prefect Cloud", 

191 ) 

192 

193 

194def matches(expected: str, value: Optional[str]) -> bool: 1a

195 """Returns true if the given value matches the expected string, which may 

196 include a a negation prefix ("!this-value") or a wildcard suffix 

197 ("any-value-starting-with*")""" 

198 if value is None: 

199 return False 

200 

201 positive = True 

202 if expected.startswith("!"): 

203 expected = expected[1:] 

204 positive = False 

205 

206 if expected.endswith("*"): 

207 match = value.startswith(expected[:-1]) 

208 else: 

209 match = value == expected 

210 

211 return match if positive else not match 

212 

213 

214class ResourceSpecification(RootModel[Dict[str, Union[str, List[str]]]]): 1a

215 def matches_every_resource(self) -> bool: 1a

216 return len(self.root) == 0 

217 

218 def matches_every_resource_of_kind(self, prefix: str) -> bool: 1a

219 if self.matches_every_resource(): 

220 return True 

221 if len(self.root) == 1: 

222 resource_id = self.root.get("prefect.resource.id") 

223 if resource_id: 

224 values = [resource_id] if isinstance(resource_id, str) else resource_id 

225 return any(value == f"{prefix}.*" for value in values) 

226 return False 

227 

228 def includes(self, candidates: Iterable[Resource]) -> bool: 1a

229 if self.matches_every_resource(): 

230 return True 

231 for candidate in candidates: 

232 if self.matches(candidate): 

233 return True 

234 return False 

235 

236 def matches(self, resource: Resource) -> bool: 1a

237 for label, expected in self.items(): 

238 value = resource.get(label) 

239 if not any(matches(candidate, value) for candidate in expected): 

240 return False 

241 return True 

242 

243 def items(self) -> Iterable[Tuple[str, List[str]]]: 1a

244 return [ 

245 (label, [value] if isinstance(value, str) else value) 

246 for label, value in self.root.items() 

247 ] 

248 

249 def __contains__(self, key: str) -> bool: 1a

250 return key in self.root 

251 

252 def __getitem__(self, key: str) -> List[str]: 1a

253 value = self.root[key] 

254 if not value: 

255 return [] 

256 if not isinstance(value, list): 

257 value = [value] 

258 return value 

259 

260 def pop( 1a

261 self, key: str, default: Optional[Union[str, List[str]]] = None 

262 ) -> Optional[List[str]]: 

263 value = self.root.pop(key, default) 

264 if not value: 

265 return [] 

266 if not isinstance(value, list): 

267 value = [value] 

268 return value 

269 

270 def get( 1a

271 self, key: str, default: Optional[Union[str, List[str]]] = None 

272 ) -> Optional[List[str]]: 

273 value = self.root.get(key, default) 

274 if not value: 

275 return [] 

276 if not isinstance(value, list): 

277 value = [value] 

278 return value 

279 

280 def __len__(self) -> int: 1a

281 return len(self.root) 

282 

283 def deepcopy(self) -> "ResourceSpecification": 1a

284 return ResourceSpecification(root=copy.deepcopy(self.root))