Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/filters.py: 19%
411 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import sys 1a
4from collections.abc import Iterable 1a
5from dataclasses import dataclass, field 1a
6from datetime import datetime, timedelta 1a
7from typing import TYPE_CHECKING, Optional, Sequence, Union 1a
8from uuid import UUID 1a
9from zoneinfo import ZoneInfo 1a
11import sqlalchemy as sa 1a
12from pydantic import Field, PrivateAttr 1a
13from sqlalchemy.sql import Select 1a
15import prefect.types._datetime 1a
16from prefect.server.database import PrefectDBInterface, db_injector 1a
17from prefect.server.schemas.filters import ( 1a
18 PrefectFilterBaseModel,
19 PrefectOperatorFilterBaseModel,
20)
21from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a
22from prefect.server.utilities.text_search_parser import ( 1a
23 parse_text_search_query,
24)
25from prefect.types import DateTime 1a
26from prefect.utilities.collections import AutoEnum 1a
28from .schemas.events import Event, Resource, ResourceSpecification 1a
30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a
31 from sqlalchemy.sql.expression import ColumnElement, ColumnExpressionArgument
34class AutomationFilterCreated(PrefectFilterBaseModel): 1a
35 """Filter by `Automation.created`."""
37 before_: Optional[DateTime] = Field( 1a
38 default=None,
39 description="Only include automations created before this datetime",
40 )
42 def _get_filter_list( 1a
43 self, db: PrefectDBInterface
44 ) -> Iterable[sa.ColumnElement[bool]]:
45 if self.before_ is not None:
46 return [db.Automation.created <= self.before_]
47 return ()
50class AutomationFilterName(PrefectFilterBaseModel): 1a
51 """Filter by `Automation.created`."""
53 any_: Optional[list[str]] = Field( 1a
54 default=None,
55 description="Only include automations with names that match any of these strings",
56 )
58 def _get_filter_list(self, db: PrefectDBInterface) -> list[sa.ColumnElement[bool]]: 1a
59 if self.any_ is not None:
60 return [db.Automation.name.in_(self.any_)]
61 return []
64class AutomationFilterTags(PrefectOperatorFilterBaseModel): 1a
65 """Filter by `Automation.tags`."""
67 all_: Optional[list[str]] = Field( 1a
68 default=None,
69 examples=[["tag-1", "tag-2"]],
70 description="A list of tags. Automations will be returned only if their tags are a superset of the list",
71 )
72 any_: Optional[list[str]] = Field( 1a
73 default=None,
74 examples=[["tag-1", "tag-2"]],
75 description="A list of tags. Automations will be returned if their tags contain any of the tags in the list",
76 )
77 is_null_: Optional[bool] = Field( 1a
78 default=None, description="If true, only include automations without tags"
79 )
81 def _get_filter_list( 1a
82 self, db: PrefectDBInterface
83 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
84 from prefect.server.schemas.filters import _as_array
86 filters: list[sa.ColumnElement[bool]] = []
87 if self.all_ is not None:
88 filters.append(db.Automation.tags.has_all(_as_array(self.all_)))
89 if self.any_ is not None:
90 filters.append(db.Automation.tags.has_any(_as_array(self.any_)))
91 if self.is_null_ is not None:
92 filters.append(
93 db.Automation.tags == [] if self.is_null_ else db.Automation.tags != []
94 )
95 return filters
98class AutomationFilter(PrefectOperatorFilterBaseModel): 1a
99 name: Optional[AutomationFilterName] = Field( 1a
100 default=None, description="Filter criteria for `Automation.name`"
101 )
102 created: Optional[AutomationFilterCreated] = Field( 1a
103 default=None, description="Filter criteria for `Automation.created`"
104 )
105 tags: Optional[AutomationFilterTags] = Field( 1a
106 default=None, description="Filter criteria for `Automation.tags`"
107 )
109 def _get_filter_list( 1a
110 self, db: PrefectDBInterface
111 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
112 filters: list[sa.ColumnExpressionArgument[bool]] = []
114 if self.name is not None:
115 filters.append(self.name.as_sql_filter())
116 if self.created is not None:
117 filters.append(self.created.as_sql_filter())
118 if self.tags is not None:
119 filters.append(self.tags.as_sql_filter())
121 return filters
124class EventDataFilter(PrefectBaseModel, extra="forbid"): 1a
125 """A base class for filtering event data."""
127 _top_level_filter: Optional[Select[tuple[UUID]]] = PrivateAttr(None) 1a
129 def get_filters(self) -> list["EventDataFilter"]: 1a
130 filters: list[EventDataFilter] = []
131 for filter in [
132 getattr(self, name) for name in self.__class__.model_fields.keys()
133 ]:
134 # Any embedded list of filters are flattened and thus ANDed together
135 subfilters: list[EventDataFilter] = (
136 filter if isinstance(filter, list) else [filter]
137 )
139 for subfilter in subfilters:
140 if not isinstance(subfilter, EventDataFilter):
141 continue
143 subfilter._top_level_filter = self._top_level_filter
144 filters.append(subfilter)
146 return filters
148 def includes(self, event: Event) -> bool: 1a
149 """Does the given event match the criteria of this filter?"""
150 return all(filter.includes(event) for filter in self.get_filters())
152 def excludes(self, event: Event) -> bool: 1a
153 """Would the given filter exclude this event?"""
154 return not self.includes(event)
156 def build_where_clauses(self) -> Sequence["ColumnExpressionArgument[bool]"]: 1a
157 """Convert the criteria to a WHERE clause."""
158 clauses: list["ColumnExpressionArgument[bool]"] = []
159 for filter in self.get_filters():
160 clauses.extend(filter.build_where_clauses())
161 return clauses
164class EventOccurredFilter(EventDataFilter): 1a
165 since: DateTime = Field( 1a
166 default_factory=lambda: prefect.types._datetime.start_of_day(
167 prefect.types._datetime.now("UTC")
168 )
169 - timedelta(days=180),
170 description="Only include events after this time (inclusive)",
171 )
172 until: DateTime = Field( 1a
173 default_factory=lambda: prefect.types._datetime.now("UTC"),
174 description="Only include events prior to this time (inclusive)",
175 )
177 def clamp(self, max_duration: timedelta) -> None: 1a
178 """Limit how far the query can look back based on the given duration"""
179 # Using datetime.now() instead of prefect.types._datetime.now() to avoid
180 # dropping timezone information which happens if pendulum is used
181 earliest = datetime.now(ZoneInfo("UTC")) - max_duration
182 self.since = max(earliest, self.since)
184 def includes(self, event: Event) -> bool: 1a
185 return self.since <= event.occurred <= self.until
187 @db_injector 1a
188 def build_where_clauses( 1a
189 self, db: PrefectDBInterface
190 ) -> Sequence["ColumnExpressionArgument[bool]"]:
191 return [
192 db.Event.occurred >= self.since,
193 db.Event.occurred <= self.until,
194 ]
197class EventNameFilter(EventDataFilter): 1a
198 prefix: Optional[list[str]] = Field( 1a
199 default=None, description="Only include events matching one of these prefixes"
200 )
201 exclude_prefix: Optional[list[str]] = Field( 1a
202 default=None, description="Exclude events matching one of these prefixes"
203 )
205 name: Optional[list[str]] = Field( 1a
206 default=None,
207 description="Only include events matching one of these names exactly",
208 )
209 exclude_name: Optional[list[str]] = Field( 1a
210 default=None, description="Exclude events matching one of these names exactly"
211 )
213 def includes(self, event: Event) -> bool: 1a
214 if self.prefix:
215 if not any(event.event.startswith(prefix) for prefix in self.prefix):
216 return False
218 if self.exclude_prefix:
219 if any(event.event.startswith(prefix) for prefix in self.exclude_prefix):
220 return False
222 if self.name:
223 if not any(event.event == name for name in self.name):
224 return False
226 if self.exclude_name:
227 if any(event.event == name for name in self.exclude_name):
228 return False
230 return True
232 @db_injector 1a
233 def build_where_clauses( 1a
234 self, db: PrefectDBInterface
235 ) -> Sequence["ColumnExpressionArgument[bool]"]:
236 filters: list["ColumnExpressionArgument[bool]"] = []
238 if self.prefix:
239 filters.append(
240 sa.or_(*(db.Event.event.startswith(prefix) for prefix in self.prefix))
241 )
243 if self.exclude_prefix:
244 filters.extend(
245 sa.not_(db.Event.event.startswith(prefix))
246 for prefix in self.exclude_prefix
247 )
249 if self.name:
250 filters.append(db.Event.event.in_(self.name))
252 if self.exclude_name:
253 filters.append(db.Event.event.not_in(self.exclude_name))
255 return filters
258@dataclass 1a
259class LabelSet: 1a
260 simple: list[str] = field(default_factory=list) 1a
261 prefixes: list[str] = field(default_factory=list) 1a
264@dataclass 1a
265class LabelOperations: 1a
266 values: list[str] 1a
267 positive: LabelSet = field(default_factory=LabelSet) 1a
268 negative: LabelSet = field(default_factory=LabelSet) 1a
270 def __post_init__(self) -> None: 1a
271 for value in self.values:
272 label_set = self.positive
273 if value.startswith("!"):
274 label_set = self.negative
275 value = value[1:]
277 if value.endswith("*"):
278 label_set.prefixes.append(value.rstrip("*"))
279 else:
280 label_set.simple.append(value)
283class EventResourceFilter(EventDataFilter): 1a
284 id: Optional[list[str]] = Field( 1a
285 default=None, description="Only include events for resources with these IDs"
286 )
287 id_prefix: Optional[list[str]] = Field( 1a
288 default=None,
289 description=(
290 "Only include events for resources with IDs starting with these prefixes."
291 ),
292 )
293 labels: Optional[ResourceSpecification] = Field( 1a
294 default=None, description="Only include events for resources with these labels"
295 )
296 distinct: bool = Field( 1a
297 default=False,
298 description="Only include events for distinct resources",
299 )
301 def includes(self, event: Event) -> bool: 1a
302 if self.id:
303 if not any(event.resource.id == resource_id for resource_id in self.id):
304 return False
306 if self.id_prefix:
307 if not any(
308 event.resource.id.startswith(prefix) for prefix in self.id_prefix
309 ):
310 return False
312 if self.labels:
313 if not self.labels.matches(event.resource):
314 return False
316 return True
318 @db_injector 1a
319 def build_where_clauses( 1a
320 self, db: PrefectDBInterface
321 ) -> Sequence["ColumnExpressionArgument[bool]"]:
322 filters: list["ColumnExpressionArgument[bool]"] = []
324 # If we're doing an exact or prefix search on resource_id, this is efficient
325 # enough to do on the events table without going to the event_resources table
327 if self.id:
328 filters.append(db.Event.resource_id.in_(self.id))
330 if self.id_prefix:
331 filters.append(
332 sa.or_(
333 *(
334 db.Event.resource_id.startswith(prefix)
335 for prefix in self.id_prefix
336 )
337 )
338 )
340 if self.labels:
341 labels = self.labels.deepcopy()
343 # We are explicitly searching for the primary resource here so the
344 # resource_role must be ''
345 label_filters = [db.EventResource.resource_role == ""]
347 # On the event_resources table, resource_id is unpacked
348 # into a column, so we should search for it there
349 if resource_ids := labels.pop("prefect.resource.id", None):
350 label_ops = LabelOperations(resource_ids)
352 resource_id_column = db.EventResource.resource_id
354 if values := label_ops.positive.simple:
355 label_filters.append(resource_id_column.in_(values))
356 if values := label_ops.negative.simple:
357 label_filters.append(resource_id_column.not_in(values))
358 for prefix in label_ops.positive.prefixes:
359 label_filters.append(resource_id_column.startswith(prefix))
360 for prefix in label_ops.negative.prefixes:
361 label_filters.append(sa.not_(resource_id_column.startswith(prefix)))
363 if labels:
364 for _, (label, values) in enumerate(labels.items()):
365 # Empty label value arrays should match nothing
366 if not values:
367 label_filters.append(sa.false())
368 continue
370 label_ops = LabelOperations(values)
372 label_column = db.EventResource.resource[label].astext
374 # With negative labels, the resource _must_ have the label
375 if label_ops.negative.simple or label_ops.negative.prefixes:
376 label_filters.append(label_column.is_not(None))
378 if values := label_ops.positive.simple:
379 label_filters.append(label_column.in_(values))
380 if values := label_ops.negative.simple:
381 label_filters.append(label_column.notin_(values))
382 for prefix in label_ops.positive.prefixes:
383 label_filters.append(label_column.startswith(prefix))
384 for prefix in label_ops.negative.prefixes:
385 label_filters.append(sa.not_(label_column.startswith(prefix)))
387 assert self._top_level_filter is not None
388 filters.append(
389 db.Event.id.in_(self._top_level_filter.where(*label_filters))
390 )
392 return filters
395class EventRelatedFilter(EventDataFilter): 1a
396 id: Optional[list[str]] = Field( 1a
397 None, description="Only include events for related resources with these IDs"
398 )
399 role: Optional[list[str]] = Field( 1a
400 None, description="Only include events for related resources in these roles"
401 )
402 resources_in_roles: Optional[list[tuple[str, str]]] = Field( 1a
403 None,
404 description=(
405 "Only include events with specific related resources in specific roles"
406 ),
407 )
408 labels: Optional[ResourceSpecification] = Field( 1a
409 None, description="Only include events for related resources with these labels"
410 )
412 @db_injector 1a
413 def build_where_clauses( 1a
414 self, db: PrefectDBInterface
415 ) -> Sequence["ColumnExpressionArgument[bool]"]:
416 filters: list["ColumnExpressionArgument[bool]"] = []
418 if self.id:
419 filters.append(db.EventResource.resource_id.in_(self.id))
421 if self.role:
422 filters.append(db.EventResource.resource_role.in_(self.role))
424 if self.resources_in_roles:
425 filters.append(
426 sa.or_(
427 *(
428 sa.and_(
429 db.EventResource.resource_id == resource_id,
430 db.EventResource.resource_role == role,
431 )
432 for resource_id, role in self.resources_in_roles
433 )
434 )
435 )
437 if self.labels:
438 label_filters: list[ColumnElement[bool]] = []
439 labels = self.labels.deepcopy()
441 # On the event_resources table, resource_id and resource_role are unpacked
442 # into columns, so we should search there for them
443 if resource_ids := labels.pop("prefect.resource.id", None):
444 label_ops = LabelOperations(resource_ids)
446 resource_id_column = db.EventResource.resource_id
448 if values := label_ops.positive.simple:
449 label_filters.append(resource_id_column.in_(values))
450 if values := label_ops.negative.simple:
451 label_filters.append(resource_id_column.notin_(values))
452 for prefix in label_ops.positive.prefixes:
453 label_filters.append(resource_id_column.startswith(prefix))
454 for prefix in label_ops.negative.prefixes:
455 label_filters.append(sa.not_(resource_id_column.startswith(prefix)))
457 if roles := labels.pop("prefect.resource.role", None):
458 label_filters.append(db.EventResource.resource_role.in_(roles))
460 if labels:
461 for _, (label, values) in enumerate(labels.items()):
462 # Empty label value arrays should match nothing
463 if not values:
464 label_filters.append(sa.false())
465 continue
467 label_ops = LabelOperations(values)
469 label_column = db.EventResource.resource[label].astext
471 if label_ops.negative.simple or label_ops.negative.prefixes:
472 label_filters.append(label_column.is_not(None))
474 if values := label_ops.positive.simple:
475 label_filters.append(label_column.in_(values))
476 if values := label_ops.negative.simple:
477 label_filters.append(label_column.notin_(values))
478 for prefix in label_ops.positive.prefixes:
479 label_filters.append(label_column.startswith(prefix))
480 for prefix in label_ops.negative.prefixes:
481 label_filters.append(sa.not_(label_column.startswith(prefix)))
483 filters.append(sa.and_(*label_filters))
485 if filters:
486 # This filter is explicitly searching for related resources, so if no other
487 # role is specified, and we're doing any kind of filtering with this filter,
488 # also filter out primary resources (those with an empty role) for any of
489 # these queries
490 if not self.role:
491 filters.append(db.EventResource.resource_role != "")
493 assert self._top_level_filter is not None
494 filters = [db.Event.id.in_(self._top_level_filter.where(*filters))]
496 return filters
499class EventAnyResourceFilter(EventDataFilter): 1a
500 id: Optional[list[str]] = Field( 1a
501 default=None, description="Only include events for resources with these IDs"
502 )
503 id_prefix: Optional[list[str]] = Field( 1a
504 default=None,
505 description=(
506 "Only include events for resources with IDs starting with these prefixes"
507 ),
508 )
509 labels: Optional[ResourceSpecification] = Field( 1a
510 default=None,
511 description="Only include events for related resources with these labels",
512 )
514 def includes(self, event: Event) -> bool: 1a
515 resources = [event.resource] + event.related
516 if not any(self._includes(resource) for resource in resources):
517 return False
518 return True
520 def _includes(self, resource: Resource) -> bool: 1a
521 if self.id:
522 if not any(resource.id == resource_id for resource_id in self.id):
523 return False
525 if self.id_prefix:
526 if not any(resource.id.startswith(prefix) for prefix in self.id_prefix):
527 return False
529 if self.labels:
530 if not self.labels.matches(resource):
531 return False
533 return True
535 @db_injector 1a
536 def build_where_clauses( 1a
537 self, db: PrefectDBInterface
538 ) -> Sequence["ColumnExpressionArgument[bool]"]:
539 filters: list["ColumnExpressionArgument[bool]"] = []
541 if self.id:
542 filters.append(db.EventResource.resource_id.in_(self.id))
544 if self.id_prefix:
545 filters.append(
546 sa.or_(
547 *[
548 db.EventResource.resource_id.startswith(prefix)
549 for prefix in self.id_prefix
550 ]
551 )
552 )
554 if self.labels:
555 label_filters: list[ColumnElement[bool]] = []
556 labels = self.labels.deepcopy()
558 # On the event_resources table, resource_id and resource_role are unpacked
559 # into columns, so we should search there for them
560 if resource_ids := labels.pop("prefect.resource.id", None):
561 label_ops = LabelOperations(resource_ids)
563 resource_id_column = db.EventResource.resource_id
565 if values := label_ops.positive.simple:
566 label_filters.append(resource_id_column.in_(values))
567 if values := label_ops.negative.simple:
568 label_filters.append(resource_id_column.notin_(values))
569 for prefix in label_ops.positive.prefixes:
570 label_filters.append(resource_id_column.startswith(prefix))
571 for prefix in label_ops.negative.prefixes:
572 label_filters.append(sa.not_(resource_id_column.startswith(prefix)))
574 if roles := labels.pop("prefect.resource.role", None):
575 label_filters.append(db.EventResource.resource_role.in_(roles))
577 if labels:
578 for _, (label, values) in enumerate(labels.items()):
579 # Empty label value arrays should match nothing
580 if not values:
581 label_filters.append(sa.false())
582 continue
584 label_ops = LabelOperations(values)
586 label_column = db.EventResource.resource[label].astext
588 if label_ops.negative.simple or label_ops.negative.prefixes:
589 label_filters.append(label_column.is_not(None))
591 if values := label_ops.positive.simple:
592 label_filters.append(label_column.in_(values))
593 if values := label_ops.negative.simple:
594 label_filters.append(label_column.notin_(values))
595 for prefix in label_ops.positive.prefixes:
596 label_filters.append(label_column.startswith(prefix))
597 for prefix in label_ops.negative.prefixes:
598 label_filters.append(sa.not_(label_column.startswith(prefix)))
600 filters.append(sa.and_(*label_filters))
602 if filters:
603 assert self._top_level_filter is not None
604 filters = [db.Event.id.in_(self._top_level_filter.where(*filters))]
606 return filters
609class EventIDFilter(EventDataFilter): 1a
610 id: Optional[list[UUID]] = Field( 1a
611 default=None, description="Only include events with one of these IDs"
612 )
614 def includes(self, event: Event) -> bool: 1a
615 if self.id:
616 if not any(event.id == id for id in self.id):
617 return False
619 return True
621 @db_injector 1a
622 def build_where_clauses( 1a
623 self, db: PrefectDBInterface
624 ) -> Sequence["ColumnExpressionArgument[bool]"]:
625 filters: list["ColumnExpressionArgument[bool]"] = []
627 if self.id:
628 filters.append(db.Event.id.in_(self.id))
630 return filters
633class EventTextFilter(EventDataFilter): 1a
634 """Filter by text search across event content."""
636 query: str = Field( 1a
637 description="Text search query string",
638 examples=[
639 "error",
640 "error -debug",
641 '"connection timeout"',
642 "+required -excluded",
643 ],
644 max_length=200,
645 )
647 def includes(self, event: Event) -> bool: 1a
648 """Check if this text filter includes the given event."""
649 # Parse query into components
650 parsed = parse_text_search_query(self.query)
652 # Build searchable text from all event string fields
653 searchable_text = self._build_searchable_text(event).lower()
655 # Check include terms (OR logic)
656 if parsed.include:
657 include_match = any(
658 term.lower() in searchable_text for term in parsed.include
659 )
660 if not include_match:
661 return False
663 # Check exclude terms (NOT logic)
664 if parsed.exclude:
665 exclude_match = any(
666 term.lower() in searchable_text for term in parsed.exclude
667 )
668 if exclude_match:
669 return False
671 # Check required terms (AND logic - future feature)
672 if parsed.required:
673 required_match = all(
674 term.lower() in searchable_text for term in parsed.required
675 )
676 if not required_match:
677 return False
679 return True
681 def _build_searchable_text(self, event: Event) -> str: 1a
682 """Build searchable text from all relevant event fields"""
683 text_parts = [
684 event.event, # Event type/name
685 ]
687 # Add resource VALUES only (not keys)
688 text_parts.extend(event.resource.root.values())
690 # Add related resource VALUES only (not keys)
691 for related in event.related:
692 text_parts.extend(related.root.values())
694 # Add full payload as string (can include keys and values)
695 text_parts.append(str(event.payload))
697 return " ".join(text_parts)
699 @db_injector 1a
700 def build_where_clauses( 1a
701 self, db: PrefectDBInterface
702 ) -> Sequence["ColumnExpressionArgument[bool]"]:
703 """Build SQLAlchemy WHERE clauses for text search"""
704 filters: list["ColumnExpressionArgument[bool]"] = []
706 if not self.query.strip():
707 return filters
709 parsed = parse_text_search_query(self.query)
711 # Build combined searchable text field
712 # This concatenates event, resource VALUES only, related resource VALUES only, and full payload
713 event_field = db.Event.event
715 payload_field = sa.cast(db.Event.payload, sa.Text)
717 # For now, include resource and related as JSON strings - this will find the data
718 # even though it also includes keys. We can optimize later to extract only values.
719 resource_field = sa.cast(db.Event.resource, sa.Text)
720 related_field = sa.cast(db.Event.related, sa.Text)
722 # Combine all searchable fields
723 searchable_field = sa.func.concat(
724 event_field, " ", resource_field, " ", related_field, " ", payload_field
725 )
727 # Handle include terms (OR logic)
728 if parsed.include:
729 include_conditions = []
730 for term in parsed.include:
731 include_conditions.append(
732 sa.func.lower(searchable_field).contains(term.lower())
733 )
735 if include_conditions:
736 filters.append(sa.or_(*include_conditions))
738 # Handle exclude terms (NOT logic)
739 if parsed.exclude:
740 exclude_conditions = []
741 for term in parsed.exclude:
742 exclude_conditions.append(
743 ~sa.func.lower(searchable_field).contains(term.lower())
744 )
746 if exclude_conditions:
747 filters.append(sa.and_(*exclude_conditions))
749 # Handle required terms (AND logic - future feature)
750 if parsed.required:
751 required_conditions = []
752 for term in parsed.required:
753 required_conditions.append(
754 sa.func.lower(searchable_field).contains(term.lower())
755 )
757 if required_conditions:
758 filters.append(sa.and_(*required_conditions))
760 return filters
763class EventOrder(AutoEnum): 1a
764 ASC = "ASC" 1a
765 DESC = "DESC" 1a
768class EventFilter(EventDataFilter): 1a
769 occurred: EventOccurredFilter = Field( 1a
770 default_factory=lambda: EventOccurredFilter(),
771 description="Filter criteria for when the events occurred",
772 )
773 event: Optional[EventNameFilter] = Field( 1a
774 default=None,
775 description="Filter criteria for the event name",
776 )
777 resource: Optional[EventResourceFilter] = Field( 1a
778 default=None, description="Filter criteria for the resource of the event"
779 )
780 related: Optional[Union[EventRelatedFilter, list[EventRelatedFilter]]] = Field( 1a
781 default=None,
782 description="Filter criteria for the related resources of the event",
783 )
784 any_resource: Optional[ 1a
785 Union[EventAnyResourceFilter, list[EventAnyResourceFilter]]
786 ] = Field(
787 default=None,
788 description="Filter criteria for any resource involved in the event",
789 )
790 id: EventIDFilter = Field( 1a
791 default_factory=lambda: EventIDFilter(),
792 description="Filter criteria for the events' ID",
793 )
794 text: Optional[EventTextFilter] = Field( 1a
795 default=None,
796 description="Filter criteria for text search across event content",
797 )
799 order: EventOrder = Field( 1a
800 default=EventOrder.DESC,
801 description="The order to return filtered events",
802 )
804 @db_injector 1a
805 def build_where_clauses( 1a
806 self, db: PrefectDBInterface
807 ) -> Sequence["ColumnExpressionArgument[bool]"]:
808 self._top_level_filter = self._scoped_event_resources(db)
809 result = super().build_where_clauses()
810 self._top_level_filter = None
811 return result
813 def _scoped_event_resources(self, db: PrefectDBInterface) -> Select[tuple[UUID]]: 1a
814 """Returns an event_resources query that is scoped to this filter's scope by occurred."""
815 query = sa.select(db.EventResource.event_id).where(
816 db.EventResource.occurred >= self.occurred.since,
817 db.EventResource.occurred <= self.occurred.until,
818 )
819 return query
821 @property 1a
822 def logical_limit(self) -> int: 1a
823 """The logical limit for this query, which is a maximum number of rows that it
824 _could_ return (regardless of what the caller has requested). May be used as
825 an optimization for DB queries"""
826 if self.id and self.id.id:
827 # If we're asking for a specific set of IDs, the most we could get back is
828 # that number of rows
829 return len(self.id.id)
831 return sys.maxsize