Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/filters.py: 38%
117 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import datetime 1a
4from typing import Optional, Union 1a
5from uuid import UUID 1a
7from pydantic import Field 1a
9import prefect.types._datetime 1a
10from prefect._internal.schemas.bases import PrefectBaseModel 1a
11from prefect.types import DateTime 1a
12from prefect.utilities.collections import AutoEnum 1a
14from .schemas.events import Event, Resource, ResourceSpecification 1a
17class AutomationFilterCreated(PrefectBaseModel): 1a
18 """Filter by `Automation.created`."""
20 before_: Optional[DateTime] = Field( 1a
21 default=None,
22 description="Only include automations created before this datetime",
23 )
26class AutomationFilterName(PrefectBaseModel): 1a
27 """Filter by `Automation.created`."""
29 any_: Optional[list[str]] = Field( 1a
30 default=None,
31 description="Only include automations with names that match any of these strings",
32 )
35class AutomationFilter(PrefectBaseModel): 1a
36 name: Optional[AutomationFilterName] = Field( 1a
37 default=None, description="Filter criteria for `Automation.name`"
38 )
39 created: Optional[AutomationFilterCreated] = Field( 1a
40 default=None, description="Filter criteria for `Automation.created`"
41 )
44class EventDataFilter(PrefectBaseModel, extra="forbid"): # type: ignore[call-arg] 1a
45 """A base class for filtering event data."""
47 def get_filters(self) -> list["EventDataFilter"]: 1a
48 filters: list[EventDataFilter] = []
49 for filter in [
50 getattr(self, name) for name in self.__class__.model_fields.keys()
51 ]:
52 # Any embedded list of filters are flattened and thus ANDed together
53 subfilters: list[EventDataFilter] = (
54 filter if isinstance(filter, list) else [filter]
55 )
57 for subfilter in subfilters:
58 if not isinstance(subfilter, EventDataFilter):
59 continue
61 filters.append(subfilter)
63 return filters
65 def includes(self, event: Event) -> bool: 1a
66 """Does the given event match the criteria of this filter?"""
67 return all(filter.includes(event) for filter in self.get_filters())
69 def excludes(self, event: Event) -> bool: 1a
70 """Would the given filter exclude this event?"""
71 return not self.includes(event)
74class EventOccurredFilter(EventDataFilter): 1a
75 since: DateTime = Field( 1a
76 default_factory=lambda: prefect.types._datetime.start_of_day(
77 prefect.types._datetime.now("UTC")
78 )
79 - datetime.timedelta(days=180),
80 description="Only include events after this time (inclusive)",
81 )
82 until: DateTime = Field( 1a
83 default_factory=lambda: prefect.types._datetime.now("UTC"),
84 description="Only include events prior to this time (inclusive)",
85 )
87 def includes(self, event: Event) -> bool: 1a
88 return self.since <= event.occurred <= self.until
91class EventNameFilter(EventDataFilter): 1a
92 prefix: Optional[list[str]] = Field( 1a
93 default=None, description="Only include events matching one of these prefixes"
94 )
95 exclude_prefix: Optional[list[str]] = Field( 1a
96 default=None, description="Exclude events matching one of these prefixes"
97 )
99 name: Optional[list[str]] = Field( 1a
100 default=None,
101 description="Only include events matching one of these names exactly",
102 )
103 exclude_name: Optional[list[str]] = Field( 1a
104 default=None, description="Exclude events matching one of these names exactly"
105 )
107 def includes(self, event: Event) -> bool: 1a
108 if self.prefix:
109 if not any(event.event.startswith(prefix) for prefix in self.prefix):
110 return False
112 if self.exclude_prefix:
113 if any(event.event.startswith(prefix) for prefix in self.exclude_prefix):
114 return False
116 if self.name:
117 if not any(event.event == name for name in self.name):
118 return False
120 if self.exclude_name:
121 if any(event.event == name for name in self.exclude_name):
122 return False
124 return True
127class EventResourceFilter(EventDataFilter): 1a
128 id: Optional[list[str]] = Field( 1a
129 default=None, description="Only include events for resources with these IDs"
130 )
131 id_prefix: Optional[list[str]] = Field( 1a
132 default=None,
133 description=(
134 "Only include events for resources with IDs starting with these prefixes."
135 ),
136 )
137 labels: Optional[ResourceSpecification] = Field( 1a
138 default=None, description="Only include events for resources with these labels"
139 )
140 distinct: bool = Field( 1a
141 default=False,
142 description="Only include events for distinct resources",
143 )
145 def includes(self, event: Event) -> bool: 1a
146 if self.id:
147 if not any(event.resource.id == resource_id for resource_id in self.id):
148 return False
150 if self.id_prefix:
151 if not any(
152 event.resource.id.startswith(prefix) for prefix in self.id_prefix
153 ):
154 return False
156 if self.labels:
157 if not self.labels.matches(event.resource):
158 return False
160 return True
163class EventRelatedFilter(EventDataFilter): 1a
164 id: Optional[list[str]] = Field( 1a
165 default=None,
166 description="Only include events for related resources with these IDs",
167 )
168 role: Optional[list[str]] = Field( 1a
169 default=None,
170 description="Only include events for related resources in these roles",
171 )
172 resources_in_roles: Optional[list[tuple[str, str]]] = Field( 1a
173 default=None,
174 description=(
175 "Only include events with specific related resources in specific roles"
176 ),
177 )
178 labels: Optional[ResourceSpecification] = Field( 1a
179 default=None,
180 description="Only include events for related resources with these labels",
181 )
184class EventAnyResourceFilter(EventDataFilter): 1a
185 id: Optional[list[str]] = Field( 1a
186 default=None, description="Only include events for resources with these IDs"
187 )
188 id_prefix: Optional[list[str]] = Field( 1a
189 default=None,
190 description=(
191 "Only include events for resources with IDs starting with these prefixes"
192 ),
193 )
194 labels: Optional[ResourceSpecification] = Field( 1a
195 default=None,
196 description="Only include events for related resources with these labels",
197 )
199 def includes(self, event: Event) -> bool: 1a
200 resources = [event.resource] + event.related
201 if not any(self._includes(resource) for resource in resources):
202 return False
203 return True
205 def _includes(self, resource: Resource) -> bool: 1a
206 if self.id:
207 if not any(resource.id == resource_id for resource_id in self.id):
208 return False
210 if self.id_prefix:
211 if not any(resource.id.startswith(prefix) for prefix in self.id_prefix):
212 return False
214 if self.labels:
215 if not self.labels.matches(resource):
216 return False
218 return True
221class EventIDFilter(EventDataFilter): 1a
222 id: Optional[list[UUID]] = Field( 1a
223 default=None, description="Only include events with one of these IDs"
224 )
226 def includes(self, event: Event) -> bool: 1a
227 if self.id:
228 if not any(event.id == id for id in self.id):
229 return False
231 return True
234class EventTextFilter(EventDataFilter): 1a
235 """Filter by text search across event content."""
237 query: str = Field( 1a
238 description="Text search query string",
239 examples=[
240 "error",
241 "error -debug",
242 '"connection timeout"',
243 "+required -excluded",
244 ],
245 max_length=200,
246 )
249class EventOrder(AutoEnum): 1a
250 ASC = "ASC" 1a
251 DESC = "DESC" 1a
254class EventFilter(EventDataFilter): 1a
255 occurred: EventOccurredFilter = Field( 1a
256 default_factory=lambda: EventOccurredFilter(),
257 description="Filter criteria for when the events occurred",
258 )
259 event: Optional[EventNameFilter] = Field( 1a
260 default=None,
261 description="Filter criteria for the event name",
262 )
263 resource: Optional[EventResourceFilter] = Field( 1a
264 default=None,
265 description="Filter criteria for the resource of the event",
266 )
267 related: Optional[Union[EventRelatedFilter, list[EventRelatedFilter]]] = Field( 1a
268 default=None,
269 description="Filter criteria for the related resources of the event",
270 )
271 any_resource: Optional[ 1a
272 Union[EventAnyResourceFilter, list[EventAnyResourceFilter]]
273 ] = Field(
274 default=None,
275 description="Filter criteria for any resource involved in the event",
276 )
277 id: EventIDFilter = Field( 1a
278 default_factory=lambda: EventIDFilter(id=[]),
279 description="Filter criteria for the events' ID",
280 )
281 text: Optional[EventTextFilter] = Field( 1a
282 default=None,
283 description="Filter criteria for text search across event content",
284 )
286 order: EventOrder = Field( 1a
287 default=EventOrder.DESC,
288 description="The order to return filtered events",
289 )