Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/schemas/events.py: 31%
184 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import copy 1a
2from collections import defaultdict 1a
3from typing import ( 1a
4 TYPE_CHECKING,
5 Any,
6 ClassVar,
7 Dict,
8 Iterable,
9 List,
10 Mapping,
11 Optional,
12 Sequence,
13 Tuple,
14 Union,
15)
16from uuid import UUID 1a
18from pydantic import ( 1a
19 AfterValidator,
20 AnyHttpUrl,
21 ConfigDict,
22 Field,
23 RootModel,
24 field_validator,
25 model_validator,
26)
27from typing_extensions import Annotated, Self 1a
29import prefect.types._datetime 1a
30from prefect.logging import get_logger 1a
31from prefect.server.events.schemas.labelling import Labelled 1a
32from prefect.server.utilities.schemas import PrefectBaseModel 1a
33from prefect.settings import ( 1a
34 PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE,
35 PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES,
36)
38if TYPE_CHECKING: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true1a
39 import logging
41 logger: "logging.Logger" = get_logger(__name__)
44class Resource(Labelled): 1a
45 """An observable business object of interest to the user"""
47 @model_validator(mode="after") 1a
48 def enforce_maximum_labels(self) -> Self: 1a
49 if len(self.root) > PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value():
50 raise ValueError(
51 "The maximum number of labels per resource "
52 f"is {PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value()}"
53 )
55 return self
57 @model_validator(mode="after") 1a
58 def requires_resource_id(self) -> Self: 1a
59 if "prefect.resource.id" not in self.root:
60 raise ValueError("Resources must include the prefect.resource.id label")
61 if not self.root["prefect.resource.id"]:
62 raise ValueError("The prefect.resource.id label must be non-empty")
64 return self
66 @property 1a
67 def id(self) -> str: 1a
68 return self["prefect.resource.id"]
70 @property 1a
71 def name(self) -> Optional[str]: 1a
72 return self.get("prefect.resource.name")
74 def prefect_object_id(self, kind: str) -> UUID: 1a
75 """Extracts the UUID from an event's resource ID if it's the expected kind
76 of prefect resource"""
77 prefix = f"{kind}." if not kind.endswith(".") else kind
79 if not self.id.startswith(prefix):
80 raise ValueError(f"Resource ID {self.id} does not start with {prefix}")
82 return UUID(self.id[len(prefix) :])
85class RelatedResource(Resource): 1a
86 """A Resource with a specific role in an Event"""
88 @model_validator(mode="after") 1a
89 def requires_resource_role(self) -> Self: 1a
90 if "prefect.resource.role" not in self.root:
91 raise ValueError(
92 "Related Resources must include the prefect.resource.role label"
93 )
94 if not self.root["prefect.resource.role"]:
95 raise ValueError("The prefect.resource.role label must be non-empty")
97 return self
99 @property 1a
100 def role(self) -> str: 1a
101 return self["prefect.resource.role"]
104def _validate_event_name_length(value: str) -> str: 1a
105 from prefect.settings import PREFECT_SERVER_EVENTS_MAXIMUM_EVENT_NAME_LENGTH
107 if len(value) > PREFECT_SERVER_EVENTS_MAXIMUM_EVENT_NAME_LENGTH.value():
108 raise ValueError(
109 f"Event name must be at most {PREFECT_SERVER_EVENTS_MAXIMUM_EVENT_NAME_LENGTH.value()} characters"
110 )
111 return value
114class Event(PrefectBaseModel): 1a
115 """The client-side view of an event that has happened to a Resource"""
117 occurred: prefect.types._datetime.DateTime = Field( 1a
118 description="When the event happened from the sender's perspective",
119 )
120 event: Annotated[str, AfterValidator(_validate_event_name_length)] = Field( 1a
121 description="The name of the event that happened",
122 )
123 resource: Resource = Field( 1a
124 description="The primary Resource this event concerns",
125 )
126 related: list[RelatedResource] = Field( 1a
127 default_factory=list,
128 description="A list of additional Resources involved in this event",
129 )
130 payload: dict[str, Any] = Field( 1a
131 default_factory=dict,
132 description="An open-ended set of data describing what happened",
133 )
134 id: UUID = Field( 1a
135 description="The client-provided identifier of this event",
136 )
137 follows: Optional[UUID] = Field( 1a
138 default=None,
139 description=(
140 "The ID of an event that is known to have occurred prior to this one. "
141 "If set, this may be used to establish a more precise ordering of causally-"
142 "related events when they occur close enough together in time that the "
143 "system may receive them out-of-order."
144 ),
145 )
147 @property 1a
148 def involved_resources(self) -> Sequence[Resource]: 1a
149 return [self.resource] + list(self.related)
151 @property 1a
152 def resource_in_role(self) -> Mapping[str, RelatedResource]: 1a
153 """Returns a mapping of roles to the first related resource in that role"""
154 return {related.role: related for related in reversed(self.related)}
156 @property 1a
157 def resources_in_role(self) -> Mapping[str, Sequence[RelatedResource]]: 1a
158 """Returns a mapping of roles to related resources in that role"""
159 resources: Dict[str, List[RelatedResource]] = defaultdict(list)
160 for related in self.related:
161 resources[related.role].append(related)
162 return resources
164 @field_validator("related") 1a
165 @classmethod 1a
166 def enforce_maximum_related_resources( 1a
167 cls, value: List[RelatedResource]
168 ) -> List[RelatedResource]:
169 if len(value) > PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value():
170 raise ValueError(
171 "The maximum number of related resources "
172 f"is {PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value()}"
173 )
175 return value
177 def receive( 1a
178 self, received: Optional[prefect.types._datetime.DateTime] = None
179 ) -> "ReceivedEvent":
180 kwargs = self.model_dump()
181 if received is not None:
182 kwargs["received"] = received
183 return ReceivedEvent(**kwargs)
185 def find_resource_label(self, label: str) -> Optional[str]: 1a
186 """Finds the value of the given label in this event's resource or one of its
187 related resources. If the label starts with `related:<role>:`, search for the
188 first matching label in a related resource with that role."""
189 directive, _, related_label = label.rpartition(":")
190 directive, _, role = directive.partition(":")
191 if directive == "related":
192 for related in self.related:
193 if related.role == role:
194 return related.get(related_label)
195 return self.resource.get(label)
198class ReceivedEvent(Event): 1a
199 """The server-side view of an event that has happened to a Resource after it has
200 been received by the server"""
202 model_config: ClassVar[ConfigDict] = ConfigDict( 1a
203 extra="ignore", from_attributes=True
204 )
206 received: prefect.types._datetime.DateTime = Field( 1a
207 default_factory=lambda: prefect.types._datetime.now("UTC"),
208 description="When the event was received by Prefect Cloud",
209 )
211 def as_database_row(self) -> dict[str, Any]: 1a
212 row = self.model_dump()
213 row["resource_id"] = self.resource.id
214 row["recorded"] = prefect.types._datetime.now("UTC")
215 row["related_resource_ids"] = [related.id for related in self.related]
216 return row
218 def as_database_resource_rows(self) -> List[Dict[str, Any]]: 1a
219 def without_id_and_role(resource: Resource) -> Dict[str, str]:
220 d: Dict[str, str] = resource.root.copy()
221 d.pop("prefect.resource.id", None)
222 d.pop("prefect.resource.role", None)
223 return d
225 return [
226 {
227 "occurred": self.occurred,
228 "resource_id": resource.id,
229 "resource_role": (
230 resource.role if isinstance(resource, RelatedResource) else ""
231 ),
232 "resource": without_id_and_role(resource),
233 "event_id": self.id,
234 }
235 for resource in [self.resource, *self.related]
236 ]
239def matches(expected: str, value: Optional[str]) -> bool: 1a
240 """Returns true if the given value matches the expected string, which may
241 include a a negation prefix ("!this-value") or a wildcard suffix
242 ("any-value-starting-with*")"""
243 if value is None:
244 return False
246 positive = True
247 if expected.startswith("!"):
248 expected = expected[1:]
249 positive = False
251 if expected.endswith("*"):
252 match = value.startswith(expected[:-1])
253 else:
254 match = value == expected
256 return match if positive else not match
259class ResourceSpecification(RootModel[Dict[str, Union[str, List[str]]]]): 1a
260 def matches_every_resource(self) -> bool: 1a
261 return len(self.root) == 0
263 def matches_every_resource_of_kind(self, prefix: str) -> bool: 1a
264 if self.matches_every_resource():
265 return True
266 if len(self.root) == 1:
267 resource_id = self.root.get("prefect.resource.id")
268 if resource_id:
269 values = [resource_id] if isinstance(resource_id, str) else resource_id
270 return any(value == f"{prefix}.*" for value in values)
271 return False
273 def includes(self, candidates: Iterable[Resource]) -> bool: 1a
274 if self.matches_every_resource():
275 return True
276 for candidate in candidates:
277 if self.matches(candidate):
278 return True
279 return False
281 def matches(self, resource: Resource) -> bool: 1a
282 for label, expected in self.items():
283 value = resource.get(label)
284 if not any(matches(candidate, value) for candidate in expected):
285 return False
286 return True
288 def items(self) -> Iterable[Tuple[str, List[str]]]: 1a
289 return [
290 (label, [value] if isinstance(value, str) else value)
291 for label, value in self.root.items()
292 ]
294 def __contains__(self, key: str) -> bool: 1a
295 return key in self.root
297 def __getitem__(self, key: str) -> List[str]: 1a
298 value = self.root[key]
299 if not value:
300 return []
301 if not isinstance(value, list):
302 value = [value]
303 return value
305 def pop( 1a
306 self, key: str, default: Optional[Union[str, List[str]]] = None
307 ) -> Optional[List[str]]:
308 value = self.root.pop(key, default)
309 if not value:
310 return []
311 if not isinstance(value, list):
312 value = [value]
313 return value
315 def get( 1a
316 self, key: str, default: Optional[Union[str, List[str]]] = None
317 ) -> Optional[List[str]]:
318 value = self.root.get(key, default)
319 if not value:
320 return []
321 if not isinstance(value, list):
322 value = [value]
323 return value
325 def __len__(self) -> int: 1a
326 return len(self.root)
328 def deepcopy(self) -> "ResourceSpecification": 1a
329 return ResourceSpecification(root=copy.deepcopy(self.root))
332class EventPage(PrefectBaseModel): 1a
333 """A single page of events returned from the API, with an optional link to the
334 next page of results"""
336 events: List[ReceivedEvent] = Field( 1a
337 ..., description="The Events matching the query"
338 )
339 total: int = Field(..., description="The total number of matching Events") 1a
340 next_page: Optional[AnyHttpUrl] = Field( 1a
341 ..., description="The URL for the next page of results, if there are more"
342 )
345class EventCount(PrefectBaseModel): 1a
346 """The count of events with the given filter value"""
348 value: str = Field(..., description="The value to use for filtering") 1a
349 label: str = Field(..., description="The value to display for this count") 1a
350 count: int = Field(..., description="The count of matching events") 1a
351 start_time: prefect.types._datetime.DateTime = Field( 1a
352 ..., description="The start time of this group of events"
353 )
354 end_time: prefect.types._datetime.DateTime = Field( 1a
355 ..., description="The end time of this group of events"
356 )