Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/schemas/events.py: 30%
152 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import copy 1a
2from collections import defaultdict 1a
3from typing import ( 1a
4 TYPE_CHECKING,
5 Any,
6 ClassVar,
7 Dict,
8 Iterable,
9 List,
10 Mapping,
11 Optional,
12 Sequence,
13 Tuple,
14 Union,
15)
16from uuid import UUID 1a
18from pydantic import ( 1a
19 AfterValidator,
20 ConfigDict,
21 Field,
22 RootModel,
23 model_validator,
24)
25from typing_extensions import Annotated, Self 1a
27import prefect.types._datetime 1a
28from prefect._internal.schemas.bases import PrefectBaseModel 1a
29from prefect._internal.uuid7 import uuid7 1a
30from prefect.logging import get_logger 1a
31from prefect.settings import ( 1a
32 PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE,
33)
35from .labelling import Labelled 1a
37if TYPE_CHECKING: 37 ↛ 38line 37 didn't jump to line 38 because the condition on line 37 was never true1a
38 import logging
40logger: "logging.Logger" = get_logger(__name__) 1a
43class Resource(Labelled): 1a
44 """An observable business object of interest to the user"""
46 @model_validator(mode="after") 1a
47 def enforce_maximum_labels(self) -> Self: 1a
48 if len(self.root) > PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value():
49 raise ValueError(
50 "The maximum number of labels per resource "
51 f"is {PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value()}"
52 )
54 return self
56 @model_validator(mode="after") 1a
57 def requires_resource_id(self) -> Self: 1a
58 if "prefect.resource.id" not in self.root:
59 raise ValueError("Resources must include the prefect.resource.id label")
60 if not self.root["prefect.resource.id"]:
61 raise ValueError("The prefect.resource.id label must be non-empty")
63 return self
65 @property 1a
66 def id(self) -> str: 1a
67 return self["prefect.resource.id"]
69 @property 1a
70 def name(self) -> Optional[str]: 1a
71 return self.get("prefect.resource.name")
73 def prefect_object_id(self, kind: str) -> UUID: 1a
74 """Extracts the UUID from an event's resource ID if it's the expected kind
75 of prefect resource"""
76 prefix = f"{kind}." if not kind.endswith(".") else kind
78 if not self.id.startswith(prefix):
79 raise ValueError(f"Resource ID {self.id} does not start with {prefix}")
81 return UUID(self.id[len(prefix) :])
84class RelatedResource(Resource): 1a
85 """A Resource with a specific role in an Event"""
87 @model_validator(mode="after") 1a
88 def requires_resource_role(self) -> Self: 1a
89 if "prefect.resource.role" not in self.root:
90 raise ValueError(
91 "Related Resources must include the prefect.resource.role label"
92 )
93 if not self.root["prefect.resource.role"]:
94 raise ValueError("The prefect.resource.role label must be non-empty")
96 return self
98 @property 1a
99 def role(self) -> str: 1a
100 return self["prefect.resource.role"]
103def _validate_related_resources(value) -> List: 1a
104 from prefect.settings import PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES
106 if len(value) > PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value():
107 raise ValueError(
108 "The maximum number of related resources "
109 f"is {PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value()}"
110 )
111 return value
114class Event(PrefectBaseModel): 1a
115 """The client-side view of an event that has happened to a Resource"""
117 model_config: ClassVar[ConfigDict] = ConfigDict(extra="ignore") 1a
119 occurred: prefect.types._datetime.DateTime = Field( 1a
120 default_factory=lambda: prefect.types._datetime.now("UTC"),
121 description="When the event happened from the sender's perspective",
122 )
123 event: str = Field(description="The name of the event that happened") 1a
124 resource: Resource = Field( 1a
125 description="The primary Resource this event concerns",
126 )
127 related: Annotated[ 1a
128 List[RelatedResource],
129 AfterValidator(_validate_related_resources),
130 ] = Field(
131 default_factory=list,
132 description="A list of additional Resources involved in this event",
133 )
134 payload: Dict[str, Any] = Field( 1a
135 default_factory=dict,
136 description="An open-ended set of data describing what happened",
137 )
138 id: UUID = Field( 1a
139 default_factory=uuid7,
140 description="The client-provided identifier of this event",
141 )
142 follows: Optional[UUID] = Field( 1a
143 default=None,
144 description=(
145 "The ID of an event that is known to have occurred prior to this one. "
146 "If set, this may be used to establish a more precise ordering of causally-"
147 "related events when they occur close enough together in time that the "
148 "system may receive them out-of-order."
149 ),
150 )
152 @property 1a
153 def involved_resources(self) -> Sequence[Resource]: 1a
154 return [self.resource] + list(self.related)
156 @property 1a
157 def resource_in_role(self) -> Mapping[str, RelatedResource]: 1a
158 """Returns a mapping of roles to the first related resource in that role"""
159 return {related.role: related for related in reversed(self.related)}
161 @property 1a
162 def resources_in_role(self) -> Mapping[str, Sequence[RelatedResource]]: 1a
163 """Returns a mapping of roles to related resources in that role"""
164 resources: Dict[str, List[RelatedResource]] = defaultdict(list)
165 for related in self.related:
166 resources[related.role].append(related)
167 return resources
169 def find_resource_label(self, label: str) -> Optional[str]: 1a
170 """Finds the value of the given label in this event's resource or one of its
171 related resources. If the label starts with `related:<role>:`, search for the
172 first matching label in a related resource with that role."""
173 directive, _, related_label = label.rpartition(":")
174 directive, _, role = directive.partition(":")
175 if directive == "related":
176 for related in self.related:
177 if related.role == role:
178 return related.get(related_label)
179 return self.resource.get(label)
182class ReceivedEvent(Event): 1a
183 """The server-side view of an event that has happened to a Resource after it has
184 been received by the server"""
186 model_config: ClassVar[ConfigDict] = ConfigDict(from_attributes=True) 1a
188 received: prefect.types._datetime.DateTime = Field( 1a
189 ...,
190 description="When the event was received by Prefect Cloud",
191 )
194def matches(expected: str, value: Optional[str]) -> bool: 1a
195 """Returns true if the given value matches the expected string, which may
196 include a a negation prefix ("!this-value") or a wildcard suffix
197 ("any-value-starting-with*")"""
198 if value is None:
199 return False
201 positive = True
202 if expected.startswith("!"):
203 expected = expected[1:]
204 positive = False
206 if expected.endswith("*"):
207 match = value.startswith(expected[:-1])
208 else:
209 match = value == expected
211 return match if positive else not match
214class ResourceSpecification(RootModel[Dict[str, Union[str, List[str]]]]): 1a
215 def matches_every_resource(self) -> bool: 1a
216 return len(self.root) == 0
218 def matches_every_resource_of_kind(self, prefix: str) -> bool: 1a
219 if self.matches_every_resource():
220 return True
221 if len(self.root) == 1:
222 resource_id = self.root.get("prefect.resource.id")
223 if resource_id:
224 values = [resource_id] if isinstance(resource_id, str) else resource_id
225 return any(value == f"{prefix}.*" for value in values)
226 return False
228 def includes(self, candidates: Iterable[Resource]) -> bool: 1a
229 if self.matches_every_resource():
230 return True
231 for candidate in candidates:
232 if self.matches(candidate):
233 return True
234 return False
236 def matches(self, resource: Resource) -> bool: 1a
237 for label, expected in self.items():
238 value = resource.get(label)
239 if not any(matches(candidate, value) for candidate in expected):
240 return False
241 return True
243 def items(self) -> Iterable[Tuple[str, List[str]]]: 1a
244 return [
245 (label, [value] if isinstance(value, str) else value)
246 for label, value in self.root.items()
247 ]
249 def __contains__(self, key: str) -> bool: 1a
250 return key in self.root
252 def __getitem__(self, key: str) -> List[str]: 1a
253 value = self.root[key]
254 if not value:
255 return []
256 if not isinstance(value, list):
257 value = [value]
258 return value
260 def pop( 1a
261 self, key: str, default: Optional[Union[str, List[str]]] = None
262 ) -> Optional[List[str]]:
263 value = self.root.pop(key, default)
264 if not value:
265 return []
266 if not isinstance(value, list):
267 value = [value]
268 return value
270 def get( 1a
271 self, key: str, default: Optional[Union[str, List[str]]] = None
272 ) -> Optional[List[str]]:
273 value = self.root.get(key, default)
274 if not value:
275 return []
276 if not isinstance(value, list):
277 value = [value]
278 return value
280 def __len__(self) -> int: 1a
281 return len(self.root)
283 def deepcopy(self) -> "ResourceSpecification": 1a
284 return ResourceSpecification(root=copy.deepcopy(self.root))