Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/schemas/events.py: 31%

184 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import copy 1a

2from collections import defaultdict 1a

3from typing import ( 1a

4 TYPE_CHECKING, 

5 Any, 

6 ClassVar, 

7 Dict, 

8 Iterable, 

9 List, 

10 Mapping, 

11 Optional, 

12 Sequence, 

13 Tuple, 

14 Union, 

15) 

16from uuid import UUID 1a

17 

18from pydantic import ( 1a

19 AfterValidator, 

20 AnyHttpUrl, 

21 ConfigDict, 

22 Field, 

23 RootModel, 

24 field_validator, 

25 model_validator, 

26) 

27from typing_extensions import Annotated, Self 1a

28 

29import prefect.types._datetime 1a

30from prefect.logging import get_logger 1a

31from prefect.server.events.schemas.labelling import Labelled 1a

32from prefect.server.utilities.schemas import PrefectBaseModel 1a

33from prefect.settings import ( 1a

34 PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE, 

35 PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES, 

36) 

37 

38if TYPE_CHECKING: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true1a

39 import logging 

40 

41 logger: "logging.Logger" = get_logger(__name__) 

42 

43 

44class Resource(Labelled): 1a

45 """An observable business object of interest to the user""" 

46 

47 @model_validator(mode="after") 1a

48 def enforce_maximum_labels(self) -> Self: 1a

49 if len(self.root) > PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value(): 

50 raise ValueError( 

51 "The maximum number of labels per resource " 

52 f"is {PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value()}" 

53 ) 

54 

55 return self 

56 

57 @model_validator(mode="after") 1a

58 def requires_resource_id(self) -> Self: 1a

59 if "prefect.resource.id" not in self.root: 

60 raise ValueError("Resources must include the prefect.resource.id label") 

61 if not self.root["prefect.resource.id"]: 

62 raise ValueError("The prefect.resource.id label must be non-empty") 

63 

64 return self 

65 

66 @property 1a

67 def id(self) -> str: 1a

68 return self["prefect.resource.id"] 

69 

70 @property 1a

71 def name(self) -> Optional[str]: 1a

72 return self.get("prefect.resource.name") 

73 

74 def prefect_object_id(self, kind: str) -> UUID: 1a

75 """Extracts the UUID from an event's resource ID if it's the expected kind 

76 of prefect resource""" 

77 prefix = f"{kind}." if not kind.endswith(".") else kind 

78 

79 if not self.id.startswith(prefix): 

80 raise ValueError(f"Resource ID {self.id} does not start with {prefix}") 

81 

82 return UUID(self.id[len(prefix) :]) 

83 

84 

85class RelatedResource(Resource): 1a

86 """A Resource with a specific role in an Event""" 

87 

88 @model_validator(mode="after") 1a

89 def requires_resource_role(self) -> Self: 1a

90 if "prefect.resource.role" not in self.root: 

91 raise ValueError( 

92 "Related Resources must include the prefect.resource.role label" 

93 ) 

94 if not self.root["prefect.resource.role"]: 

95 raise ValueError("The prefect.resource.role label must be non-empty") 

96 

97 return self 

98 

99 @property 1a

100 def role(self) -> str: 1a

101 return self["prefect.resource.role"] 

102 

103 

104def _validate_event_name_length(value: str) -> str: 1a

105 from prefect.settings import PREFECT_SERVER_EVENTS_MAXIMUM_EVENT_NAME_LENGTH 

106 

107 if len(value) > PREFECT_SERVER_EVENTS_MAXIMUM_EVENT_NAME_LENGTH.value(): 

108 raise ValueError( 

109 f"Event name must be at most {PREFECT_SERVER_EVENTS_MAXIMUM_EVENT_NAME_LENGTH.value()} characters" 

110 ) 

111 return value 

112 

113 

114class Event(PrefectBaseModel): 1a

115 """The client-side view of an event that has happened to a Resource""" 

116 

117 occurred: prefect.types._datetime.DateTime = Field( 1a

118 description="When the event happened from the sender's perspective", 

119 ) 

120 event: Annotated[str, AfterValidator(_validate_event_name_length)] = Field( 1a

121 description="The name of the event that happened", 

122 ) 

123 resource: Resource = Field( 1a

124 description="The primary Resource this event concerns", 

125 ) 

126 related: list[RelatedResource] = Field( 1a

127 default_factory=list, 

128 description="A list of additional Resources involved in this event", 

129 ) 

130 payload: dict[str, Any] = Field( 1a

131 default_factory=dict, 

132 description="An open-ended set of data describing what happened", 

133 ) 

134 id: UUID = Field( 1a

135 description="The client-provided identifier of this event", 

136 ) 

137 follows: Optional[UUID] = Field( 1a

138 default=None, 

139 description=( 

140 "The ID of an event that is known to have occurred prior to this one. " 

141 "If set, this may be used to establish a more precise ordering of causally-" 

142 "related events when they occur close enough together in time that the " 

143 "system may receive them out-of-order." 

144 ), 

145 ) 

146 

147 @property 1a

148 def involved_resources(self) -> Sequence[Resource]: 1a

149 return [self.resource] + list(self.related) 

150 

151 @property 1a

152 def resource_in_role(self) -> Mapping[str, RelatedResource]: 1a

153 """Returns a mapping of roles to the first related resource in that role""" 

154 return {related.role: related for related in reversed(self.related)} 

155 

156 @property 1a

157 def resources_in_role(self) -> Mapping[str, Sequence[RelatedResource]]: 1a

158 """Returns a mapping of roles to related resources in that role""" 

159 resources: Dict[str, List[RelatedResource]] = defaultdict(list) 

160 for related in self.related: 

161 resources[related.role].append(related) 

162 return resources 

163 

164 @field_validator("related") 1a

165 @classmethod 1a

166 def enforce_maximum_related_resources( 1a

167 cls, value: List[RelatedResource] 

168 ) -> List[RelatedResource]: 

169 if len(value) > PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value(): 

170 raise ValueError( 

171 "The maximum number of related resources " 

172 f"is {PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value()}" 

173 ) 

174 

175 return value 

176 

177 def receive( 1a

178 self, received: Optional[prefect.types._datetime.DateTime] = None 

179 ) -> "ReceivedEvent": 

180 kwargs = self.model_dump() 

181 if received is not None: 

182 kwargs["received"] = received 

183 return ReceivedEvent(**kwargs) 

184 

185 def find_resource_label(self, label: str) -> Optional[str]: 1a

186 """Finds the value of the given label in this event's resource or one of its 

187 related resources. If the label starts with `related:<role>:`, search for the 

188 first matching label in a related resource with that role.""" 

189 directive, _, related_label = label.rpartition(":") 

190 directive, _, role = directive.partition(":") 

191 if directive == "related": 

192 for related in self.related: 

193 if related.role == role: 

194 return related.get(related_label) 

195 return self.resource.get(label) 

196 

197 

198class ReceivedEvent(Event): 1a

199 """The server-side view of an event that has happened to a Resource after it has 

200 been received by the server""" 

201 

202 model_config: ClassVar[ConfigDict] = ConfigDict( 1a

203 extra="ignore", from_attributes=True 

204 ) 

205 

206 received: prefect.types._datetime.DateTime = Field( 1a

207 default_factory=lambda: prefect.types._datetime.now("UTC"), 

208 description="When the event was received by Prefect Cloud", 

209 ) 

210 

211 def as_database_row(self) -> dict[str, Any]: 1a

212 row = self.model_dump() 

213 row["resource_id"] = self.resource.id 

214 row["recorded"] = prefect.types._datetime.now("UTC") 

215 row["related_resource_ids"] = [related.id for related in self.related] 

216 return row 

217 

218 def as_database_resource_rows(self) -> List[Dict[str, Any]]: 1a

219 def without_id_and_role(resource: Resource) -> Dict[str, str]: 

220 d: Dict[str, str] = resource.root.copy() 

221 d.pop("prefect.resource.id", None) 

222 d.pop("prefect.resource.role", None) 

223 return d 

224 

225 return [ 

226 { 

227 "occurred": self.occurred, 

228 "resource_id": resource.id, 

229 "resource_role": ( 

230 resource.role if isinstance(resource, RelatedResource) else "" 

231 ), 

232 "resource": without_id_and_role(resource), 

233 "event_id": self.id, 

234 } 

235 for resource in [self.resource, *self.related] 

236 ] 

237 

238 

239def matches(expected: str, value: Optional[str]) -> bool: 1a

240 """Returns true if the given value matches the expected string, which may 

241 include a a negation prefix ("!this-value") or a wildcard suffix 

242 ("any-value-starting-with*")""" 

243 if value is None: 

244 return False 

245 

246 positive = True 

247 if expected.startswith("!"): 

248 expected = expected[1:] 

249 positive = False 

250 

251 if expected.endswith("*"): 

252 match = value.startswith(expected[:-1]) 

253 else: 

254 match = value == expected 

255 

256 return match if positive else not match 

257 

258 

259class ResourceSpecification(RootModel[Dict[str, Union[str, List[str]]]]): 1a

260 def matches_every_resource(self) -> bool: 1a

261 return len(self.root) == 0 

262 

263 def matches_every_resource_of_kind(self, prefix: str) -> bool: 1a

264 if self.matches_every_resource(): 

265 return True 

266 if len(self.root) == 1: 

267 resource_id = self.root.get("prefect.resource.id") 

268 if resource_id: 

269 values = [resource_id] if isinstance(resource_id, str) else resource_id 

270 return any(value == f"{prefix}.*" for value in values) 

271 return False 

272 

273 def includes(self, candidates: Iterable[Resource]) -> bool: 1a

274 if self.matches_every_resource(): 

275 return True 

276 for candidate in candidates: 

277 if self.matches(candidate): 

278 return True 

279 return False 

280 

281 def matches(self, resource: Resource) -> bool: 1a

282 for label, expected in self.items(): 

283 value = resource.get(label) 

284 if not any(matches(candidate, value) for candidate in expected): 

285 return False 

286 return True 

287 

288 def items(self) -> Iterable[Tuple[str, List[str]]]: 1a

289 return [ 

290 (label, [value] if isinstance(value, str) else value) 

291 for label, value in self.root.items() 

292 ] 

293 

294 def __contains__(self, key: str) -> bool: 1a

295 return key in self.root 

296 

297 def __getitem__(self, key: str) -> List[str]: 1a

298 value = self.root[key] 

299 if not value: 

300 return [] 

301 if not isinstance(value, list): 

302 value = [value] 

303 return value 

304 

305 def pop( 1a

306 self, key: str, default: Optional[Union[str, List[str]]] = None 

307 ) -> Optional[List[str]]: 

308 value = self.root.pop(key, default) 

309 if not value: 

310 return [] 

311 if not isinstance(value, list): 

312 value = [value] 

313 return value 

314 

315 def get( 1a

316 self, key: str, default: Optional[Union[str, List[str]]] = None 

317 ) -> Optional[List[str]]: 

318 value = self.root.get(key, default) 

319 if not value: 

320 return [] 

321 if not isinstance(value, list): 

322 value = [value] 

323 return value 

324 

325 def __len__(self) -> int: 1a

326 return len(self.root) 

327 

328 def deepcopy(self) -> "ResourceSpecification": 1a

329 return ResourceSpecification(root=copy.deepcopy(self.root)) 

330 

331 

332class EventPage(PrefectBaseModel): 1a

333 """A single page of events returned from the API, with an optional link to the 

334 next page of results""" 

335 

336 events: List[ReceivedEvent] = Field( 1a

337 ..., description="The Events matching the query" 

338 ) 

339 total: int = Field(..., description="The total number of matching Events") 1a

340 next_page: Optional[AnyHttpUrl] = Field( 1a

341 ..., description="The URL for the next page of results, if there are more" 

342 ) 

343 

344 

345class EventCount(PrefectBaseModel): 1a

346 """The count of events with the given filter value""" 

347 

348 value: str = Field(..., description="The value to use for filtering") 1a

349 label: str = Field(..., description="The value to display for this count") 1a

350 count: int = Field(..., description="The count of matching events") 1a

351 start_time: prefect.types._datetime.DateTime = Field( 1a

352 ..., description="The start time of this group of events" 

353 ) 

354 end_time: prefect.types._datetime.DateTime = Field( 1a

355 ..., description="The end time of this group of events" 

356 )