Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/filters.py: 28%
1032 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Schemas that define Prefect REST API filtering operations.
4Each filter schema includes logic for transforming itself into a SQL `where` clause.
5"""
7from collections.abc import Iterable, Sequence 1a
8from typing import TYPE_CHECKING, ClassVar, Optional 1a
9from uuid import UUID 1a
11from pydantic import ConfigDict, Field 1a
12from sqlalchemy.sql.functions import coalesce 1a
14import prefect.server.schemas as schemas 1a
15from prefect.server.utilities.database import db_injector 1a
16from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a
17from prefect.server.utilities.text_search_parser import ( 1a
18 parse_text_search_query,
19)
20from prefect.types import DateTime 1a
21from prefect.utilities.collections import AutoEnum 1a
22from prefect.utilities.importtools import lazy_import 1a
24if TYPE_CHECKING: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true1a
25 import sqlalchemy as sa
26 from sqlalchemy.dialects import postgresql
28 from prefect.server.database import PrefectDBInterface
29 from prefect.server.schemas.core import Log
30else:
31 sa = lazy_import("sqlalchemy") 1a
32 postgresql = lazy_import("sqlalchemy.dialects.postgresql") 1a
34# TODO: Consider moving the `as_sql_filter` functions out of here since they are a
35# database model level function and do not properly separate concerns when
36# present in the schemas module
39def _as_array(elems: Sequence[str]) -> sa.ColumnElement[Sequence[str]]: 1a
40 return sa.cast(postgresql.array(elems), type_=postgresql.ARRAY(sa.String()))
43class Operator(AutoEnum): 1a
44 """Operators for combining filter criteria."""
46 and_ = AutoEnum.auto() 1a
47 or_ = AutoEnum.auto() 1a
50class PrefectFilterBaseModel(PrefectBaseModel): 1a
51 """Base model for Prefect filters"""
53 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
55 @db_injector 1a
56 def as_sql_filter(self, db: "PrefectDBInterface") -> sa.ColumnElement[bool]: 1a
57 """Generate SQL filter from provided filter parameters. If no filters parameters are available, return a TRUE filter."""
58 filters = self._get_filter_list(db)
59 if not filters:
60 return sa.true()
61 return sa.and_(*filters)
63 def _get_filter_list( 1a
64 self, db: "PrefectDBInterface"
65 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
66 """Return a list of boolean filter statements based on filter parameters"""
67 raise NotImplementedError("_get_filter_list must be implemented")
70class PrefectOperatorFilterBaseModel(PrefectFilterBaseModel): 1a
71 """Base model for Prefect filters that combines criteria with a user-provided operator"""
73 operator: Operator = Field( 1a
74 default=Operator.and_,
75 description="Operator for combining filter criteria. Defaults to 'and_'.",
76 )
78 @db_injector 1a
79 def as_sql_filter(self, db: "PrefectDBInterface") -> sa.ColumnElement[bool]: 1a
80 filters = self._get_filter_list(db)
81 if not filters:
82 return sa.true()
83 return sa.and_(*filters) if self.operator == Operator.and_ else sa.or_(*filters)
86class FlowFilterId(PrefectFilterBaseModel): 1a
87 """Filter by `Flow.id`."""
89 any_: Optional[list[UUID]] = Field( 1a
90 default=None, description="A list of flow ids to include"
91 )
93 def _get_filter_list( 1a
94 self, db: "PrefectDBInterface"
95 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
96 filters: list[sa.ColumnExpressionArgument[bool]] = []
97 if self.any_ is not None:
98 filters.append(db.Flow.id.in_(self.any_))
99 return filters
102class FlowFilterDeployment(PrefectOperatorFilterBaseModel): 1a
103 """Filter by flows by deployment"""
105 is_null_: Optional[bool] = Field( 1a
106 default=None,
107 description="If true, only include flows without deployments",
108 )
110 def _get_filter_list( 1a
111 self, db: "PrefectDBInterface"
112 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
113 filters: list[sa.ColumnExpressionArgument[bool]] = []
115 if self.is_null_ is not None:
116 deployments_subquery = (
117 sa.select(db.Deployment.flow_id).distinct().subquery()
118 )
120 if self.is_null_:
121 filters.append(
122 db.Flow.id.not_in(sa.select(deployments_subquery.c.flow_id))
123 )
124 else:
125 filters.append(
126 db.Flow.id.in_(sa.select(deployments_subquery.c.flow_id))
127 )
129 return filters
132class FlowFilterName(PrefectFilterBaseModel): 1a
133 """Filter by `Flow.name`."""
135 any_: Optional[list[str]] = Field( 1a
136 default=None,
137 description="A list of flow names to include",
138 examples=[["my-flow-1", "my-flow-2"]],
139 )
141 like_: Optional[str] = Field( 1a
142 default=None,
143 description=(
144 "A case-insensitive partial match. For example, "
145 " passing 'marvin' will match "
146 "'marvin', 'sad-Marvin', and 'marvin-robot'."
147 ),
148 examples=["marvin"],
149 )
151 def _get_filter_list( 1a
152 self, db: "PrefectDBInterface"
153 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
154 filters: list[sa.ColumnExpressionArgument[bool]] = []
155 if self.any_ is not None:
156 filters.append(db.Flow.name.in_(self.any_))
157 if self.like_ is not None:
158 filters.append(db.Flow.name.ilike(f"%{self.like_}%"))
159 return filters
162class FlowFilterTags(PrefectOperatorFilterBaseModel): 1a
163 """Filter by `Flow.tags`."""
165 all_: Optional[list[str]] = Field( 1a
166 default=None,
167 examples=[["tag-1", "tag-2"]],
168 description=(
169 "A list of tags. Flows will be returned only if their tags are a superset"
170 " of the list"
171 ),
172 )
173 is_null_: Optional[bool] = Field( 1a
174 default=None, description="If true, only include flows without tags"
175 )
177 def _get_filter_list( 1a
178 self, db: "PrefectDBInterface"
179 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
180 filters: list[sa.ColumnExpressionArgument[bool]] = []
181 if self.all_ is not None:
182 filters.append(db.Flow.tags.has_all(_as_array(self.all_)))
183 if self.is_null_ is not None:
184 filters.append(db.Flow.tags == [] if self.is_null_ else db.Flow.tags != [])
185 return filters
188class FlowFilter(PrefectOperatorFilterBaseModel): 1a
189 """Filter for flows. Only flows matching all criteria will be returned."""
191 id: Optional[FlowFilterId] = Field( 1a
192 default=None, description="Filter criteria for `Flow.id`"
193 )
194 deployment: Optional[FlowFilterDeployment] = Field( 1a
195 default=None, description="Filter criteria for Flow deployments"
196 )
197 name: Optional[FlowFilterName] = Field( 1a
198 default=None, description="Filter criteria for `Flow.name`"
199 )
200 tags: Optional[FlowFilterTags] = Field( 1a
201 default=None, description="Filter criteria for `Flow.tags`"
202 )
204 def _get_filter_list( 1a
205 self, db: "PrefectDBInterface"
206 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
207 filters: list[sa.ColumnExpressionArgument[bool]] = []
209 if self.id is not None:
210 filters.append(self.id.as_sql_filter())
211 if self.deployment is not None:
212 filters.append(self.deployment.as_sql_filter())
213 if self.name is not None:
214 filters.append(self.name.as_sql_filter())
215 if self.tags is not None:
216 filters.append(self.tags.as_sql_filter())
218 return filters
221class FlowRunFilterId(PrefectFilterBaseModel): 1a
222 """Filter by `FlowRun.id`."""
224 any_: Optional[list[UUID]] = Field( 1a
225 default=None, description="A list of flow run ids to include"
226 )
227 not_any_: Optional[list[UUID]] = Field( 1a
228 default=None, description="A list of flow run ids to exclude"
229 )
231 def _get_filter_list( 1a
232 self, db: "PrefectDBInterface"
233 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
234 filters: list[sa.ColumnExpressionArgument[bool]] = []
235 if self.any_ is not None:
236 filters.append(db.FlowRun.id.in_(self.any_))
237 if self.not_any_ is not None:
238 filters.append(db.FlowRun.id.not_in(self.not_any_))
239 return filters
242class FlowRunFilterName(PrefectFilterBaseModel): 1a
243 """Filter by `FlowRun.name`."""
245 any_: Optional[list[str]] = Field( 1a
246 default=None,
247 description="A list of flow run names to include",
248 examples=[["my-flow-run-1", "my-flow-run-2"]],
249 )
251 like_: Optional[str] = Field( 1a
252 default=None,
253 description=(
254 "A case-insensitive partial match. For example, "
255 " passing 'marvin' will match "
256 "'marvin', 'sad-Marvin', and 'marvin-robot'."
257 ),
258 examples=["marvin"],
259 )
261 def _get_filter_list( 1a
262 self, db: "PrefectDBInterface"
263 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
264 filters: list[sa.ColumnExpressionArgument[bool]] = []
265 if self.any_ is not None:
266 filters.append(db.FlowRun.name.in_(self.any_))
267 if self.like_ is not None:
268 filters.append(db.FlowRun.name.ilike(f"%{self.like_}%"))
269 return filters
272class FlowRunFilterTags(PrefectOperatorFilterBaseModel): 1a
273 """Filter by `FlowRun.tags`."""
275 all_: Optional[list[str]] = Field( 1a
276 default=None,
277 examples=[["tag-1", "tag-2"]],
278 description=(
279 "A list of tags. Flow runs will be returned only if their tags are a"
280 " superset of the list"
281 ),
282 )
284 any_: Optional[list[str]] = Field( 1a
285 default=None,
286 examples=[["tag-1", "tag-2"]],
287 description="A list of tags to include",
288 )
290 is_null_: Optional[bool] = Field( 1a
291 default=None, description="If true, only include flow runs without tags"
292 )
294 def _get_filter_list( 1a
295 self, db: "PrefectDBInterface"
296 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
297 def as_array(elems: Sequence[str]) -> sa.ColumnElement[Sequence[str]]:
298 return sa.cast(postgresql.array(elems), type_=postgresql.ARRAY(sa.String()))
300 filters: list[sa.ColumnElement[bool]] = []
301 if self.all_ is not None:
302 filters.append(db.FlowRun.tags.has_all(as_array(self.all_)))
303 if self.any_ is not None:
304 filters.append(db.FlowRun.tags.has_any(as_array(self.any_)))
305 if self.is_null_ is not None:
306 filters.append(
307 db.FlowRun.tags == [] if self.is_null_ else db.FlowRun.tags != []
308 )
309 return filters
312class FlowRunFilterDeploymentId(PrefectOperatorFilterBaseModel): 1a
313 """Filter by `FlowRun.deployment_id`."""
315 any_: Optional[list[UUID]] = Field( 1a
316 default=None, description="A list of flow run deployment ids to include"
317 )
318 is_null_: Optional[bool] = Field( 1a
319 default=None,
320 description="If true, only include flow runs without deployment ids",
321 )
323 def _get_filter_list( 1a
324 self, db: "PrefectDBInterface"
325 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
326 filters: list[sa.ColumnExpressionArgument[bool]] = []
327 if self.any_ is not None:
328 filters.append(db.FlowRun.deployment_id.in_(self.any_))
329 if self.is_null_ is not None:
330 filters.append(
331 db.FlowRun.deployment_id.is_(None)
332 if self.is_null_
333 else db.FlowRun.deployment_id.is_not(None)
334 )
335 return filters
338class FlowRunFilterWorkQueueName(PrefectOperatorFilterBaseModel): 1a
339 """Filter by `FlowRun.work_queue_name`."""
341 any_: Optional[list[str]] = Field( 1a
342 default=None,
343 description="A list of work queue names to include",
344 examples=[["work_queue_1", "work_queue_2"]],
345 )
346 is_null_: Optional[bool] = Field( 1a
347 default=None,
348 description="If true, only include flow runs without work queue names",
349 )
351 def _get_filter_list( 1a
352 self, db: "PrefectDBInterface"
353 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
354 filters: list[sa.ColumnExpressionArgument[bool]] = []
355 if self.any_ is not None:
356 filters.append(db.FlowRun.work_queue_name.in_(self.any_))
357 if self.is_null_ is not None:
358 filters.append(
359 db.FlowRun.work_queue_name.is_(None)
360 if self.is_null_
361 else db.FlowRun.work_queue_name.is_not(None)
362 )
363 return filters
366class FlowRunFilterStateType(PrefectFilterBaseModel): 1a
367 """Filter by `FlowRun.state_type`."""
369 any_: Optional[list[schemas.states.StateType]] = Field( 1a
370 default=None, description="A list of flow run state types to include"
371 )
372 not_any_: Optional[list[schemas.states.StateType]] = Field( 1a
373 default=None, description="A list of flow run state types to exclude"
374 )
376 def _get_filter_list( 1a
377 self, db: "PrefectDBInterface"
378 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
379 filters: list[sa.ColumnExpressionArgument[bool]] = []
380 if self.any_ is not None:
381 filters.append(db.FlowRun.state_type.in_(self.any_))
382 if self.not_any_ is not None:
383 filters.append(db.FlowRun.state_type.not_in(self.not_any_))
384 return filters
387class FlowRunFilterStateName(PrefectFilterBaseModel): 1a
388 """Filter by `FlowRun.state_name`."""
390 any_: Optional[list[str]] = Field( 1a
391 default=None, description="A list of flow run state names to include"
392 )
393 not_any_: Optional[list[str]] = Field( 1a
394 default=None, description="A list of flow run state names to exclude"
395 )
397 def _get_filter_list( 1a
398 self, db: "PrefectDBInterface"
399 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
400 filters: list[sa.ColumnExpressionArgument[bool]] = []
401 if self.any_ is not None:
402 filters.append(db.FlowRun.state_name.in_(self.any_))
403 if self.not_any_ is not None:
404 filters.append(db.FlowRun.state_name.not_in(self.not_any_))
405 return filters
408class FlowRunFilterState(PrefectOperatorFilterBaseModel): 1a
409 """Filter by `FlowRun.state_type` and `FlowRun.state_name`."""
411 type: Optional[FlowRunFilterStateType] = Field( 1a
412 default=None, description="Filter criteria for `FlowRun.state_type`"
413 )
414 name: Optional[FlowRunFilterStateName] = Field( 1a
415 default=None, description="Filter criteria for `FlowRun.state_name`"
416 )
418 def _get_filter_list( 1a
419 self, db: "PrefectDBInterface"
420 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
421 filters: list[sa.ColumnExpressionArgument[bool]] = []
422 if self.type is not None:
423 filter = self.type.as_sql_filter()
424 if isinstance(filter, sa.BinaryExpression):
425 filters.append(filter)
426 if self.name is not None:
427 filter = self.name.as_sql_filter()
428 if isinstance(filter, sa.BinaryExpression):
429 filters.append(filter)
430 return filters
433class FlowRunFilterFlowVersion(PrefectFilterBaseModel): 1a
434 """Filter by `FlowRun.flow_version`."""
436 any_: Optional[list[str]] = Field( 1a
437 default=None, description="A list of flow run flow_versions to include"
438 )
440 def _get_filter_list( 1a
441 self, db: "PrefectDBInterface"
442 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
443 filters: list[sa.ColumnExpressionArgument[bool]] = []
444 if self.any_ is not None:
445 filters.append(db.FlowRun.flow_version.in_(self.any_))
446 return filters
449class FlowRunFilterStartTime(PrefectFilterBaseModel): 1a
450 """Filter by `FlowRun.start_time`."""
452 before_: Optional[DateTime] = Field( 1a
453 default=None,
454 description="Only include flow runs starting at or before this time",
455 )
456 after_: Optional[DateTime] = Field( 1a
457 default=None,
458 description="Only include flow runs starting at or after this time",
459 )
460 is_null_: Optional[bool] = Field( 1a
461 default=None, description="If true, only return flow runs without a start time"
462 )
464 def _get_filter_list( 1a
465 self, db: "PrefectDBInterface"
466 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
467 filters: list[sa.ColumnExpressionArgument[bool]] = []
468 if self.before_ is not None:
469 filters.append(
470 coalesce(db.FlowRun.start_time, db.FlowRun.expected_start_time)
471 <= self.before_
472 )
473 if self.after_ is not None:
474 filters.append(
475 coalesce(db.FlowRun.start_time, db.FlowRun.expected_start_time)
476 >= self.after_
477 )
478 if self.is_null_ is not None:
479 filters.append(
480 db.FlowRun.start_time.is_(None)
481 if self.is_null_
482 else db.FlowRun.start_time.is_not(None)
483 )
484 return filters
487class FlowRunFilterEndTime(PrefectFilterBaseModel): 1a
488 """Filter by `FlowRun.end_time`."""
490 before_: Optional[DateTime] = Field( 1a
491 default=None,
492 description="Only include flow runs ending at or before this time",
493 )
494 after_: Optional[DateTime] = Field( 1a
495 default=None,
496 description="Only include flow runs ending at or after this time",
497 )
498 is_null_: Optional[bool] = Field( 1a
499 default=None, description="If true, only return flow runs without an end time"
500 )
502 def _get_filter_list( 1a
503 self, db: "PrefectDBInterface"
504 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
505 filters: list[sa.ColumnExpressionArgument[bool]] = []
506 if self.before_ is not None:
507 filters.append(db.FlowRun.end_time <= self.before_)
508 if self.after_ is not None:
509 filters.append(db.FlowRun.end_time >= self.after_)
510 if self.is_null_ is not None:
511 filters.append(
512 db.FlowRun.end_time.is_(None)
513 if self.is_null_
514 else db.FlowRun.end_time.is_not(None)
515 )
516 return filters
519class FlowRunFilterExpectedStartTime(PrefectFilterBaseModel): 1a
520 """Filter by `FlowRun.expected_start_time`."""
522 before_: Optional[DateTime] = Field( 1a
523 default=None,
524 description="Only include flow runs scheduled to start at or before this time",
525 )
526 after_: Optional[DateTime] = Field( 1a
527 default=None,
528 description="Only include flow runs scheduled to start at or after this time",
529 )
531 def _get_filter_list( 1a
532 self, db: "PrefectDBInterface"
533 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
534 filters: list[sa.ColumnExpressionArgument[bool]] = []
535 if self.before_ is not None:
536 filters.append(db.FlowRun.expected_start_time <= self.before_)
537 if self.after_ is not None:
538 filters.append(db.FlowRun.expected_start_time >= self.after_)
539 return filters
542class FlowRunFilterNextScheduledStartTime(PrefectFilterBaseModel): 1a
543 """Filter by `FlowRun.next_scheduled_start_time`."""
545 before_: Optional[DateTime] = Field( 1a
546 default=None,
547 description=(
548 "Only include flow runs with a next_scheduled_start_time or before this"
549 " time"
550 ),
551 )
552 after_: Optional[DateTime] = Field( 1a
553 default=None,
554 description=(
555 "Only include flow runs with a next_scheduled_start_time at or after this"
556 " time"
557 ),
558 )
560 def _get_filter_list( 1a
561 self, db: "PrefectDBInterface"
562 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
563 filters: list[sa.ColumnExpressionArgument[bool]] = []
564 if self.before_ is not None:
565 filters.append(db.FlowRun.next_scheduled_start_time <= self.before_)
566 if self.after_ is not None:
567 filters.append(db.FlowRun.next_scheduled_start_time >= self.after_)
568 return filters
571class FlowRunFilterParentFlowRunId(PrefectOperatorFilterBaseModel): 1a
572 """Filter for subflows of a given flow run"""
574 any_: Optional[list[UUID]] = Field( 1a
575 default=None, description="A list of parent flow run ids to include"
576 )
578 def _get_filter_list( 1a
579 self, db: "PrefectDBInterface"
580 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
581 filters: list[sa.ColumnExpressionArgument[bool]] = []
582 if self.any_ is not None:
583 filters.append(
584 db.FlowRun.id.in_(
585 sa.select(db.FlowRun.id)
586 .join(
587 db.TaskRun,
588 sa.and_(
589 db.TaskRun.id == db.FlowRun.parent_task_run_id,
590 ),
591 )
592 .where(db.TaskRun.flow_run_id.in_(self.any_))
593 )
594 )
595 return filters
598class FlowRunFilterParentTaskRunId(PrefectOperatorFilterBaseModel): 1a
599 """Filter by `FlowRun.parent_task_run_id`."""
601 any_: Optional[list[UUID]] = Field( 1a
602 default=None, description="A list of flow run parent_task_run_ids to include"
603 )
604 is_null_: Optional[bool] = Field( 1a
605 default=None,
606 description="If true, only include flow runs without parent_task_run_id",
607 )
609 def _get_filter_list( 1a
610 self, db: "PrefectDBInterface"
611 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
612 filters: list[sa.ColumnExpressionArgument[bool]] = []
613 if self.any_ is not None:
614 filters.append(db.FlowRun.parent_task_run_id.in_(self.any_))
615 if self.is_null_ is not None:
616 filters.append(
617 db.FlowRun.parent_task_run_id.is_(None)
618 if self.is_null_
619 else db.FlowRun.parent_task_run_id.is_not(None)
620 )
621 return filters
624class FlowRunFilterIdempotencyKey(PrefectFilterBaseModel): 1a
625 """Filter by FlowRun.idempotency_key."""
627 any_: Optional[list[str]] = Field( 1a
628 default=None, description="A list of flow run idempotency keys to include"
629 )
630 not_any_: Optional[list[str]] = Field( 1a
631 default=None, description="A list of flow run idempotency keys to exclude"
632 )
634 def _get_filter_list( 1a
635 self, db: "PrefectDBInterface"
636 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
637 filters: list[sa.ColumnExpressionArgument[bool]] = []
638 if self.any_ is not None:
639 filters.append(db.FlowRun.idempotency_key.in_(self.any_))
640 if self.not_any_ is not None:
641 filters.append(db.FlowRun.idempotency_key.not_in(self.not_any_))
642 return filters
645class FlowRunFilter(PrefectOperatorFilterBaseModel): 1a
646 """Filter flow runs. Only flow runs matching all criteria will be returned"""
648 id: Optional[FlowRunFilterId] = Field( 1a
649 default=None, description="Filter criteria for `FlowRun.id`"
650 )
651 name: Optional[FlowRunFilterName] = Field( 1a
652 default=None, description="Filter criteria for `FlowRun.name`"
653 )
654 tags: Optional[FlowRunFilterTags] = Field( 1a
655 default=None, description="Filter criteria for `FlowRun.tags`"
656 )
657 deployment_id: Optional[FlowRunFilterDeploymentId] = Field( 1a
658 default=None, description="Filter criteria for `FlowRun.deployment_id`"
659 )
660 work_queue_name: Optional[FlowRunFilterWorkQueueName] = Field( 1a
661 default=None, description="Filter criteria for `FlowRun.work_queue_name"
662 )
663 state: Optional[FlowRunFilterState] = Field( 1a
664 default=None, description="Filter criteria for `FlowRun.state`"
665 )
666 flow_version: Optional[FlowRunFilterFlowVersion] = Field( 1a
667 default=None, description="Filter criteria for `FlowRun.flow_version`"
668 )
669 start_time: Optional[FlowRunFilterStartTime] = Field( 1a
670 default=None, description="Filter criteria for `FlowRun.start_time`"
671 )
672 end_time: Optional[FlowRunFilterEndTime] = Field( 1a
673 default=None, description="Filter criteria for `FlowRun.end_time`"
674 )
675 expected_start_time: Optional[FlowRunFilterExpectedStartTime] = Field( 1a
676 default=None, description="Filter criteria for `FlowRun.expected_start_time`"
677 )
678 next_scheduled_start_time: Optional[FlowRunFilterNextScheduledStartTime] = Field( 1a
679 default=None,
680 description="Filter criteria for `FlowRun.next_scheduled_start_time`",
681 )
682 parent_flow_run_id: Optional[FlowRunFilterParentFlowRunId] = Field( 1a
683 default=None, description="Filter criteria for subflows of the given flow runs"
684 )
685 parent_task_run_id: Optional[FlowRunFilterParentTaskRunId] = Field( 1a
686 default=None, description="Filter criteria for `FlowRun.parent_task_run_id`"
687 )
688 idempotency_key: Optional[FlowRunFilterIdempotencyKey] = Field( 1a
689 default=None, description="Filter criteria for `FlowRun.idempotency_key`"
690 )
692 def only_filters_on_id(self) -> bool: 1a
693 return bool(
694 self.id is not None
695 and (self.id.any_ and not self.id.not_any_)
696 and self.name is None
697 and self.tags is None
698 and self.deployment_id is None
699 and self.work_queue_name is None
700 and self.state is None
701 and self.flow_version is None
702 and self.start_time is None
703 and self.end_time is None
704 and self.expected_start_time is None
705 and self.next_scheduled_start_time is None
706 and self.parent_flow_run_id is None
707 and self.parent_task_run_id is None
708 and self.idempotency_key is None
709 )
711 def _get_filter_list( 1a
712 self, db: "PrefectDBInterface"
713 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
714 filters: list[sa.ColumnExpressionArgument[bool]] = []
716 if self.id is not None:
717 filters.append(self.id.as_sql_filter())
718 if self.name is not None:
719 filters.append(self.name.as_sql_filter())
720 if self.tags is not None:
721 filters.append(self.tags.as_sql_filter())
722 if self.deployment_id is not None:
723 filters.append(self.deployment_id.as_sql_filter())
724 if self.work_queue_name is not None:
725 filters.append(self.work_queue_name.as_sql_filter())
726 if self.flow_version is not None:
727 filters.append(self.flow_version.as_sql_filter())
728 if self.state is not None:
729 filters.append(self.state.as_sql_filter())
730 if self.start_time is not None:
731 filters.append(self.start_time.as_sql_filter())
732 if self.end_time is not None:
733 filters.append(self.end_time.as_sql_filter())
734 if self.expected_start_time is not None:
735 filters.append(self.expected_start_time.as_sql_filter())
736 if self.next_scheduled_start_time is not None:
737 filters.append(self.next_scheduled_start_time.as_sql_filter())
738 if self.parent_flow_run_id is not None:
739 filters.append(self.parent_flow_run_id.as_sql_filter())
740 if self.parent_task_run_id is not None:
741 filters.append(self.parent_task_run_id.as_sql_filter())
742 if self.idempotency_key is not None:
743 filters.append(self.idempotency_key.as_sql_filter())
745 return filters
748class TaskRunFilterFlowRunId(PrefectOperatorFilterBaseModel): 1a
749 """Filter by `TaskRun.flow_run_id`."""
751 any_: Optional[list[UUID]] = Field( 1a
752 default=None, description="A list of task run flow run ids to include"
753 )
755 is_null_: Optional[bool] = Field( 1a
756 default=False, description="Filter for task runs with None as their flow run id"
757 )
759 def _get_filter_list( 1a
760 self, db: "PrefectDBInterface"
761 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
762 filters: list[sa.ColumnExpressionArgument[bool]] = []
763 if self.is_null_ is True:
764 filters.append(db.TaskRun.flow_run_id.is_(None))
765 elif self.is_null_ is False and self.any_ is None:
766 filters.append(db.TaskRun.flow_run_id.is_not(None))
767 else:
768 if self.any_ is not None:
769 filters.append(db.TaskRun.flow_run_id.in_(self.any_))
770 return filters
773class TaskRunFilterId(PrefectFilterBaseModel): 1a
774 """Filter by `TaskRun.id`."""
776 any_: Optional[list[UUID]] = Field( 1a
777 default=None, description="A list of task run ids to include"
778 )
780 def _get_filter_list( 1a
781 self, db: "PrefectDBInterface"
782 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
783 filters: list[sa.ColumnExpressionArgument[bool]] = []
784 if self.any_ is not None:
785 filters.append(db.TaskRun.id.in_(self.any_))
786 return filters
789class TaskRunFilterName(PrefectFilterBaseModel): 1a
790 """Filter by `TaskRun.name`."""
792 any_: Optional[list[str]] = Field( 1a
793 default=None,
794 description="A list of task run names to include",
795 examples=[["my-task-run-1", "my-task-run-2"]],
796 )
798 like_: Optional[str] = Field( 1a
799 default=None,
800 description=(
801 "A case-insensitive partial match. For example, "
802 " passing 'marvin' will match "
803 "'marvin', 'sad-Marvin', and 'marvin-robot'."
804 ),
805 examples=["marvin"],
806 )
808 def _get_filter_list( 1a
809 self, db: "PrefectDBInterface"
810 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
811 filters: list[sa.ColumnExpressionArgument[bool]] = []
812 if self.any_ is not None:
813 filters.append(db.TaskRun.name.in_(self.any_))
814 if self.like_ is not None:
815 filters.append(db.TaskRun.name.ilike(f"%{self.like_}%"))
816 return filters
819class TaskRunFilterTags(PrefectOperatorFilterBaseModel): 1a
820 """Filter by `TaskRun.tags`."""
822 all_: Optional[list[str]] = Field( 1a
823 default=None,
824 examples=[["tag-1", "tag-2"]],
825 description=(
826 "A list of tags. Task runs will be returned only if their tags are a"
827 " superset of the list"
828 ),
829 )
830 is_null_: Optional[bool] = Field( 1a
831 default=None, description="If true, only include task runs without tags"
832 )
834 def _get_filter_list( 1a
835 self, db: "PrefectDBInterface"
836 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
837 filters: list[sa.ColumnElement[bool]] = []
838 if self.all_ is not None:
839 filters.append(db.TaskRun.tags.has_all(_as_array(self.all_)))
840 if self.is_null_ is not None:
841 filters.append(
842 db.TaskRun.tags == [] if self.is_null_ else db.TaskRun.tags != []
843 )
844 return filters
847class TaskRunFilterStateType(PrefectFilterBaseModel): 1a
848 """Filter by `TaskRun.state_type`."""
850 any_: Optional[list[schemas.states.StateType]] = Field( 1a
851 default=None, description="A list of task run state types to include"
852 )
854 def _get_filter_list( 1a
855 self, db: "PrefectDBInterface"
856 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
857 filters: list[sa.ColumnExpressionArgument[bool]] = []
858 if self.any_ is not None:
859 filters.append(db.TaskRun.state_type.in_(self.any_))
860 return filters
863class TaskRunFilterStateName(PrefectFilterBaseModel): 1a
864 """Filter by `TaskRun.state_name`."""
866 any_: Optional[list[str]] = Field( 1a
867 default=None, description="A list of task run state names to include"
868 )
870 def _get_filter_list( 1a
871 self, db: "PrefectDBInterface"
872 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
873 filters: list[sa.ColumnExpressionArgument[bool]] = []
874 if self.any_ is not None:
875 filters.append(db.TaskRun.state_name.in_(self.any_))
876 return filters
879class TaskRunFilterState(PrefectOperatorFilterBaseModel): 1a
880 """Filter by `TaskRun.type` and `TaskRun.name`."""
882 type: Optional[TaskRunFilterStateType] = Field( 1a
883 default=None, description="Filter criteria for `TaskRun.state_type`"
884 )
885 name: Optional[TaskRunFilterStateName] = Field( 1a
886 default=None, description="Filter criteria for `TaskRun.state_name`"
887 )
889 def _get_filter_list( 1a
890 self, db: "PrefectDBInterface"
891 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
892 filters: list[sa.ColumnExpressionArgument[bool]] = []
893 if self.type is not None:
894 filter = self.type.as_sql_filter()
895 if isinstance(filter, sa.BinaryExpression):
896 filters.append(filter)
897 if self.name is not None:
898 filter = self.name.as_sql_filter()
899 if isinstance(filter, sa.BinaryExpression):
900 filters.append(filter)
901 return filters
904class TaskRunFilterSubFlowRuns(PrefectFilterBaseModel): 1a
905 """Filter by `TaskRun.subflow_run`."""
907 exists_: Optional[bool] = Field( 1a
908 default=None,
909 description=(
910 "If true, only include task runs that are subflow run parents; if false,"
911 " exclude parent task runs"
912 ),
913 )
915 def _get_filter_list( 1a
916 self, db: "PrefectDBInterface"
917 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
918 filters: list[sa.ColumnExpressionArgument[bool]] = []
919 if self.exists_ is True:
920 filters.append(db.TaskRun.subflow_run.has())
921 elif self.exists_ is False:
922 filters.append(sa.not_(db.TaskRun.subflow_run.has()))
923 return filters
926class TaskRunFilterStartTime(PrefectFilterBaseModel): 1a
927 """Filter by `TaskRun.start_time`."""
929 before_: Optional[DateTime] = Field( 1a
930 default=None,
931 description="Only include task runs starting at or before this time",
932 )
933 after_: Optional[DateTime] = Field( 1a
934 default=None,
935 description="Only include task runs starting at or after this time",
936 )
937 is_null_: Optional[bool] = Field( 1a
938 default=None, description="If true, only return task runs without a start time"
939 )
941 def _get_filter_list( 1a
942 self, db: "PrefectDBInterface"
943 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
944 filters: list[sa.ColumnExpressionArgument[bool]] = []
945 if self.before_ is not None:
946 filters.append(db.TaskRun.start_time <= self.before_)
947 if self.after_ is not None:
948 filters.append(db.TaskRun.start_time >= self.after_)
949 if self.is_null_ is not None:
950 filters.append(
951 db.TaskRun.start_time.is_(None)
952 if self.is_null_
953 else db.TaskRun.start_time.is_not(None)
954 )
955 return filters
958class TaskRunFilterExpectedStartTime(PrefectFilterBaseModel): 1a
959 """Filter by `TaskRun.expected_start_time`."""
961 before_: Optional[DateTime] = Field( 1a
962 default=None,
963 description="Only include task runs expected to start at or before this time",
964 )
965 after_: Optional[DateTime] = Field( 1a
966 default=None,
967 description="Only include task runs expected to start at or after this time",
968 )
970 def _get_filter_list( 1a
971 self, db: "PrefectDBInterface"
972 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
973 filters: list[sa.ColumnExpressionArgument[bool]] = []
974 if self.before_ is not None:
975 filters.append(db.TaskRun.expected_start_time <= self.before_)
976 if self.after_ is not None:
977 filters.append(db.TaskRun.expected_start_time >= self.after_)
978 return filters
981class TaskRunFilter(PrefectOperatorFilterBaseModel): 1a
982 """Filter task runs. Only task runs matching all criteria will be returned"""
984 id: Optional[TaskRunFilterId] = Field( 1a
985 default=None, description="Filter criteria for `TaskRun.id`"
986 )
987 name: Optional[TaskRunFilterName] = Field( 1a
988 default=None, description="Filter criteria for `TaskRun.name`"
989 )
990 tags: Optional[TaskRunFilterTags] = Field( 1a
991 default=None, description="Filter criteria for `TaskRun.tags`"
992 )
993 state: Optional[TaskRunFilterState] = Field( 1a
994 default=None, description="Filter criteria for `TaskRun.state`"
995 )
996 start_time: Optional[TaskRunFilterStartTime] = Field( 1a
997 default=None, description="Filter criteria for `TaskRun.start_time`"
998 )
999 expected_start_time: Optional[TaskRunFilterExpectedStartTime] = Field( 1a
1000 default=None, description="Filter criteria for `TaskRun.expected_start_time`"
1001 )
1002 subflow_runs: Optional[TaskRunFilterSubFlowRuns] = Field( 1a
1003 default=None, description="Filter criteria for `TaskRun.subflow_run`"
1004 )
1005 flow_run_id: Optional[TaskRunFilterFlowRunId] = Field( 1a
1006 default=None, description="Filter criteria for `TaskRun.flow_run_id`"
1007 )
1009 def _get_filter_list( 1a
1010 self, db: "PrefectDBInterface"
1011 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1012 filters: list[sa.ColumnExpressionArgument[bool]] = []
1014 if self.id is not None:
1015 filters.append(self.id.as_sql_filter())
1016 if self.name is not None:
1017 filters.append(self.name.as_sql_filter())
1018 if self.tags is not None:
1019 filters.append(self.tags.as_sql_filter())
1020 if self.state is not None:
1021 filters.append(self.state.as_sql_filter())
1022 if self.start_time is not None:
1023 filters.append(self.start_time.as_sql_filter())
1024 if self.expected_start_time is not None:
1025 filters.append(self.expected_start_time.as_sql_filter())
1026 if self.subflow_runs is not None:
1027 filters.append(self.subflow_runs.as_sql_filter())
1028 if self.flow_run_id is not None:
1029 filters.append(self.flow_run_id.as_sql_filter())
1031 return filters
1034class DeploymentFilterId(PrefectFilterBaseModel): 1a
1035 """Filter by `Deployment.id`."""
1037 any_: Optional[list[UUID]] = Field( 1a
1038 default=None, description="A list of deployment ids to include"
1039 )
1040 not_any_: Optional[list[UUID]] = Field( 1a
1041 default=None, description="A list of deployment ids to exclude"
1042 )
1044 def _get_filter_list( 1a
1045 self, db: "PrefectDBInterface"
1046 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1047 filters: list[sa.ColumnExpressionArgument[bool]] = []
1048 if self.any_ is not None:
1049 filters.append(db.Deployment.id.in_(self.any_))
1050 if self.not_any_ is not None:
1051 filters.append(db.Deployment.id.not_in(self.not_any_))
1052 return filters
1055class DeploymentFilterName(PrefectFilterBaseModel): 1a
1056 """Filter by `Deployment.name`."""
1058 any_: Optional[list[str]] = Field( 1a
1059 default=None,
1060 description="A list of deployment names to include",
1061 examples=[["my-deployment-1", "my-deployment-2"]],
1062 )
1064 like_: Optional[str] = Field( 1a
1065 default=None,
1066 description=(
1067 "A case-insensitive partial match. For example, "
1068 " passing 'marvin' will match "
1069 "'marvin', 'sad-Marvin', and 'marvin-robot'."
1070 ),
1071 examples=["marvin"],
1072 )
1074 def _get_filter_list( 1a
1075 self, db: "PrefectDBInterface"
1076 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1077 filters: list[sa.ColumnExpressionArgument[bool]] = []
1078 if self.any_ is not None:
1079 filters.append(db.Deployment.name.in_(self.any_))
1080 if self.like_ is not None:
1081 filters.append(db.Deployment.name.ilike(f"%{self.like_}%"))
1082 return filters
1085class DeploymentOrFlowNameFilter(PrefectFilterBaseModel): 1a
1086 """Filter by `Deployment.name` or `Flow.name` with a single input string for ilike filtering."""
1088 like_: Optional[str] = Field( 1a
1089 default=None,
1090 description=(
1091 "A case-insensitive partial match on deployment or flow names. For example, "
1092 "passing 'example' might match deployments or flows with 'example' in their names."
1093 ),
1094 )
1096 def _get_filter_list( 1a
1097 self, db: "PrefectDBInterface"
1098 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1099 filters: list[sa.ColumnExpressionArgument[bool]] = []
1100 if self.like_ is not None:
1101 deployment_name_filter = db.Deployment.name.ilike(f"%{self.like_}%")
1103 flow_name_filter = db.Deployment.flow.has(
1104 db.Flow.name.ilike(f"%{self.like_}%")
1105 )
1106 filters.append(sa.or_(deployment_name_filter, flow_name_filter))
1107 return filters
1110class DeploymentFilterPaused(PrefectFilterBaseModel): 1a
1111 """Filter by `Deployment.paused`."""
1113 eq_: Optional[bool] = Field( 1a
1114 default=None,
1115 description="Only returns where deployment is/is not paused",
1116 )
1118 def _get_filter_list( 1a
1119 self, db: "PrefectDBInterface"
1120 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1121 filters: list[sa.ColumnExpressionArgument[bool]] = []
1122 if self.eq_ is not None:
1123 filters.append(db.Deployment.paused.is_(self.eq_))
1124 return filters
1127class DeploymentFilterWorkQueueName(PrefectFilterBaseModel): 1a
1128 """Filter by `Deployment.work_queue_name`."""
1130 any_: Optional[list[str]] = Field( 1a
1131 default=None,
1132 description="A list of work queue names to include",
1133 examples=[["work_queue_1", "work_queue_2"]],
1134 )
1136 def _get_filter_list( 1a
1137 self, db: "PrefectDBInterface"
1138 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1139 filters: list[sa.ColumnExpressionArgument[bool]] = []
1140 if self.any_ is not None:
1141 filters.append(db.Deployment.work_queue_name.in_(self.any_))
1142 return filters
1145class DeploymentFilterConcurrencyLimit(PrefectFilterBaseModel): 1a
1146 """DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`."""
1148 ge_: Optional[int] = Field( 1a
1149 default=None,
1150 description="Only include deployments with a concurrency limit greater than or equal to this value",
1151 )
1153 le_: Optional[int] = Field( 1a
1154 default=None,
1155 description="Only include deployments with a concurrency limit less than or equal to this value",
1156 )
1157 is_null_: Optional[bool] = Field( 1a
1158 default=None,
1159 description="If true, only include deployments without a concurrency limit",
1160 )
1162 def _get_filter_list( 1a
1163 self, db: "PrefectDBInterface"
1164 ) -> list[sa.ColumnElement[bool]]:
1165 # This used to filter on an `int` column that was moved to a `ForeignKey` relationship
1166 # This filter is now deprecated rather than support filtering on the new relationship
1167 return []
1170class DeploymentFilterTags(PrefectOperatorFilterBaseModel): 1a
1171 """Filter by `Deployment.tags`."""
1173 all_: Optional[list[str]] = Field( 1a
1174 default=None,
1175 examples=[["tag-1", "tag-2"]],
1176 description=(
1177 "A list of tags. Deployments will be returned only if their tags are a"
1178 " superset of the list"
1179 ),
1180 )
1181 any_: Optional[list[str]] = Field( 1a
1182 default=None,
1183 examples=[["tag-1", "tag-2"]],
1184 description="A list of tags to include",
1185 )
1187 is_null_: Optional[bool] = Field( 1a
1188 default=None, description="If true, only include deployments without tags"
1189 )
1191 def _get_filter_list( 1a
1192 self, db: "PrefectDBInterface"
1193 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1194 from prefect.server.database import orm_models
1196 filters: list[sa.ColumnElement[bool]] = []
1197 if self.all_ is not None:
1198 filters.append(orm_models.Deployment.tags.has_all(_as_array(self.all_)))
1199 if self.any_ is not None:
1200 filters.append(orm_models.Deployment.tags.has_any(_as_array(self.any_)))
1201 if self.is_null_ is not None:
1202 filters.append(
1203 db.Deployment.tags == [] if self.is_null_ else db.Deployment.tags != []
1204 )
1205 return filters
1208class DeploymentFilter(PrefectOperatorFilterBaseModel): 1a
1209 """Filter for deployments. Only deployments matching all criteria will be returned."""
1211 id: Optional[DeploymentFilterId] = Field( 1a
1212 default=None, description="Filter criteria for `Deployment.id`"
1213 )
1214 name: Optional[DeploymentFilterName] = Field( 1a
1215 default=None, description="Filter criteria for `Deployment.name`"
1216 )
1217 flow_or_deployment_name: Optional[DeploymentOrFlowNameFilter] = Field( 1a
1218 default=None, description="Filter criteria for `Deployment.name` or `Flow.name`"
1219 )
1220 paused: Optional[DeploymentFilterPaused] = Field( 1a
1221 default=None, description="Filter criteria for `Deployment.paused`"
1222 )
1223 tags: Optional[DeploymentFilterTags] = Field( 1a
1224 default=None, description="Filter criteria for `Deployment.tags`"
1225 )
1226 work_queue_name: Optional[DeploymentFilterWorkQueueName] = Field( 1a
1227 default=None, description="Filter criteria for `Deployment.work_queue_name`"
1228 )
1229 concurrency_limit: Optional[DeploymentFilterConcurrencyLimit] = Field( 1a
1230 default=None,
1231 description="DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`. If provided, will be ignored for backwards-compatibility. Will be removed after December 2024.",
1232 deprecated=True,
1233 )
1235 def _get_filter_list( 1a
1236 self, db: "PrefectDBInterface"
1237 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1238 filters: list[sa.ColumnExpressionArgument[bool]] = []
1239 if self.id is not None:
1240 filters.append(self.id.as_sql_filter())
1241 if self.name is not None:
1242 filters.append(self.name.as_sql_filter())
1243 if self.flow_or_deployment_name is not None:
1244 filters.append(self.flow_or_deployment_name.as_sql_filter())
1245 if self.paused is not None:
1246 filters.append(self.paused.as_sql_filter())
1247 if self.tags is not None:
1248 filters.append(self.tags.as_sql_filter())
1249 if self.work_queue_name is not None:
1250 filters.append(self.work_queue_name.as_sql_filter())
1252 return filters
1255class DeploymentScheduleFilterActive(PrefectFilterBaseModel): 1a
1256 """Filter by `DeploymentSchedule.active`."""
1258 eq_: Optional[bool] = Field( 1a
1259 default=None,
1260 description="Only returns where deployment schedule is/is not active",
1261 )
1263 def _get_filter_list( 1a
1264 self, db: "PrefectDBInterface"
1265 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1266 filters: list[sa.ColumnExpressionArgument[bool]] = []
1267 if self.eq_ is not None:
1268 filters.append(db.DeploymentSchedule.active.is_(self.eq_))
1269 return filters
1272class DeploymentScheduleFilter(PrefectOperatorFilterBaseModel): 1a
1273 """Filter for deployments. Only deployments matching all criteria will be returned."""
1275 active: Optional[DeploymentScheduleFilterActive] = Field( 1a
1276 default=None, description="Filter criteria for `DeploymentSchedule.active`"
1277 )
1279 def _get_filter_list( 1a
1280 self, db: "PrefectDBInterface"
1281 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1282 filters: list[sa.ColumnExpressionArgument[bool]] = []
1284 if self.active is not None:
1285 filters.append(self.active.as_sql_filter())
1287 return filters
1290class LogFilterName(PrefectFilterBaseModel): 1a
1291 """Filter by `Log.name`."""
1293 any_: Optional[list[str]] = Field( 1a
1294 default=None,
1295 description="A list of log names to include",
1296 examples=[["prefect.logger.flow_runs", "prefect.logger.task_runs"]],
1297 )
1299 def _get_filter_list( 1a
1300 self, db: "PrefectDBInterface"
1301 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1302 filters: list[sa.ColumnExpressionArgument[bool]] = []
1303 if self.any_ is not None:
1304 filters.append(db.Log.name.in_(self.any_))
1305 return filters
1308class LogFilterLevel(PrefectFilterBaseModel): 1a
1309 """Filter by `Log.level`."""
1311 ge_: Optional[int] = Field( 1a
1312 default=None,
1313 description="Include logs with a level greater than or equal to this level",
1314 examples=[20],
1315 )
1317 le_: Optional[int] = Field( 1a
1318 default=None,
1319 description="Include logs with a level less than or equal to this level",
1320 examples=[50],
1321 )
1323 def _get_filter_list( 1a
1324 self, db: "PrefectDBInterface"
1325 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1326 filters: list[sa.ColumnExpressionArgument[bool]] = []
1327 if self.ge_ is not None:
1328 filters.append(db.Log.level >= self.ge_)
1329 if self.le_ is not None:
1330 filters.append(db.Log.level <= self.le_)
1331 return filters
1334class LogFilterTimestamp(PrefectFilterBaseModel): 1a
1335 """Filter by `Log.timestamp`."""
1337 before_: Optional[DateTime] = Field( 1a
1338 default=None,
1339 description="Only include logs with a timestamp at or before this time",
1340 )
1341 after_: Optional[DateTime] = Field( 1a
1342 default=None,
1343 description="Only include logs with a timestamp at or after this time",
1344 )
1346 def _get_filter_list( 1a
1347 self, db: "PrefectDBInterface"
1348 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1349 filters: list[sa.ColumnExpressionArgument[bool]] = []
1350 if self.before_ is not None:
1351 filters.append(db.Log.timestamp <= self.before_)
1352 if self.after_ is not None:
1353 filters.append(db.Log.timestamp >= self.after_)
1354 return filters
1357class LogFilterFlowRunId(PrefectFilterBaseModel): 1a
1358 """Filter by `Log.flow_run_id`."""
1360 any_: Optional[list[UUID]] = Field( 1a
1361 default=None, description="A list of flow run IDs to include"
1362 )
1364 def _get_filter_list( 1a
1365 self, db: "PrefectDBInterface"
1366 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1367 filters: list[sa.ColumnExpressionArgument[bool]] = []
1368 if self.any_ is not None:
1369 filters.append(db.Log.flow_run_id.in_(self.any_))
1370 return filters
1373class LogFilterTaskRunId(PrefectFilterBaseModel): 1a
1374 """Filter by `Log.task_run_id`."""
1376 any_: Optional[list[UUID]] = Field( 1a
1377 default=None, description="A list of task run IDs to include"
1378 )
1380 is_null_: Optional[bool] = Field( 1a
1381 default=None,
1382 description="If true, only include logs without a task run id",
1383 )
1385 def _get_filter_list( 1a
1386 self, db: "PrefectDBInterface"
1387 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1388 filters: list[sa.ColumnExpressionArgument[bool]] = []
1389 if self.any_ is not None:
1390 filters.append(db.Log.task_run_id.in_(self.any_))
1391 if self.is_null_ is not None:
1392 filters.append(
1393 db.Log.task_run_id.is_(None)
1394 if self.is_null_
1395 else db.Log.task_run_id.is_not(None)
1396 )
1397 return filters
1400class LogFilterTextSearch(PrefectFilterBaseModel): 1a
1401 """Filter by text search across log content."""
1403 query: str = Field( 1a
1404 description="Text search query string",
1405 examples=[
1406 "error",
1407 "error -debug",
1408 '"connection timeout"',
1409 "+required -excluded",
1410 ],
1411 max_length=200,
1412 )
1414 def includes(self, log: "Log") -> bool: 1a
1415 """Check if this text filter includes the given log."""
1416 from prefect.server.schemas.core import Log
1418 if not isinstance(log, Log):
1419 raise TypeError(f"Expected Log object, got {type(log)}")
1421 # Parse query into components
1422 parsed = parse_text_search_query(self.query)
1424 # Build searchable text from message and logger name
1425 searchable_text = f"{log.message} {log.name}".lower()
1427 # Check include terms (OR logic)
1428 if parsed.include:
1429 include_match = any(
1430 term.lower() in searchable_text for term in parsed.include
1431 )
1432 if not include_match:
1433 return False
1435 # Check exclude terms (NOT logic)
1436 if parsed.exclude:
1437 exclude_match = any(
1438 term.lower() in searchable_text for term in parsed.exclude
1439 )
1440 if exclude_match:
1441 return False
1443 # Check required terms (AND logic - future feature)
1444 if parsed.required:
1445 required_match = all(
1446 term.lower() in searchable_text for term in parsed.required
1447 )
1448 if not required_match:
1449 return False
1451 return True
1453 def _get_filter_list( 1a
1454 self, db: "PrefectDBInterface"
1455 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1456 """Build SQLAlchemy WHERE clauses for text search"""
1457 filters: list[sa.ColumnExpressionArgument[bool]] = []
1459 if not self.query.strip():
1460 return filters
1462 parsed = parse_text_search_query(self.query)
1464 # Build combined searchable text field (message + name)
1465 searchable_field = sa.func.concat(db.Log.message, " ", db.Log.name)
1467 # Handle include terms (OR logic)
1468 if parsed.include:
1469 include_conditions = []
1470 for term in parsed.include:
1471 include_conditions.append(
1472 sa.func.lower(searchable_field).contains(term.lower())
1473 )
1475 if include_conditions:
1476 filters.append(sa.or_(*include_conditions))
1478 # Handle exclude terms (NOT logic)
1479 if parsed.exclude:
1480 exclude_conditions = []
1481 for term in parsed.exclude:
1482 exclude_conditions.append(
1483 ~sa.func.lower(searchable_field).contains(term.lower())
1484 )
1486 if exclude_conditions:
1487 filters.append(sa.and_(*exclude_conditions))
1489 # Handle required terms (AND logic - future feature)
1490 if parsed.required:
1491 required_conditions = []
1492 for term in parsed.required:
1493 required_conditions.append(
1494 sa.func.lower(searchable_field).contains(term.lower())
1495 )
1497 if required_conditions:
1498 filters.append(sa.and_(*required_conditions))
1500 return filters
1503class LogFilter(PrefectOperatorFilterBaseModel): 1a
1504 """Filter logs. Only logs matching all criteria will be returned"""
1506 level: Optional[LogFilterLevel] = Field( 1a
1507 default=None, description="Filter criteria for `Log.level`"
1508 )
1509 timestamp: Optional[LogFilterTimestamp] = Field( 1a
1510 default=None, description="Filter criteria for `Log.timestamp`"
1511 )
1512 flow_run_id: Optional[LogFilterFlowRunId] = Field( 1a
1513 default=None, description="Filter criteria for `Log.flow_run_id`"
1514 )
1515 task_run_id: Optional[LogFilterTaskRunId] = Field( 1a
1516 default=None, description="Filter criteria for `Log.task_run_id`"
1517 )
1518 text: Optional[LogFilterTextSearch] = Field( 1a
1519 default=None, description="Filter criteria for text search across log content"
1520 )
1522 def _get_filter_list( 1a
1523 self, db: "PrefectDBInterface"
1524 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1525 filters: list[sa.ColumnExpressionArgument[bool]] = []
1527 if self.level is not None:
1528 filters.append(self.level.as_sql_filter())
1529 if self.timestamp is not None:
1530 filters.append(self.timestamp.as_sql_filter())
1531 if self.flow_run_id is not None:
1532 filters.append(self.flow_run_id.as_sql_filter())
1533 if self.task_run_id is not None:
1534 filters.append(self.task_run_id.as_sql_filter())
1535 if self.text is not None:
1536 filters.extend(self.text._get_filter_list(db))
1538 return filters
1541class FilterSet(PrefectBaseModel): 1a
1542 """A collection of filters for common objects"""
1544 flows: FlowFilter = Field( 1a
1545 default_factory=FlowFilter, description="Filters that apply to flows"
1546 )
1547 flow_runs: FlowRunFilter = Field( 1a
1548 default_factory=FlowRunFilter, description="Filters that apply to flow runs"
1549 )
1550 task_runs: TaskRunFilter = Field( 1a
1551 default_factory=TaskRunFilter, description="Filters that apply to task runs"
1552 )
1553 deployments: DeploymentFilter = Field( 1a
1554 default_factory=DeploymentFilter,
1555 description="Filters that apply to deployments",
1556 )
1559class BlockTypeFilterName(PrefectFilterBaseModel): 1a
1560 """Filter by `BlockType.name`"""
1562 like_: Optional[str] = Field( 1a
1563 default=None,
1564 description=(
1565 "A case-insensitive partial match. For example, "
1566 " passing 'marvin' will match "
1567 "'marvin', 'sad-Marvin', and 'marvin-robot'."
1568 ),
1569 examples=["marvin"],
1570 )
1572 def _get_filter_list( 1a
1573 self, db: "PrefectDBInterface"
1574 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1575 filters: list[sa.ColumnExpressionArgument[bool]] = []
1576 if self.like_ is not None:
1577 filters.append(db.BlockType.name.ilike(f"%{self.like_}%"))
1578 return filters
1581class BlockTypeFilterSlug(PrefectFilterBaseModel): 1a
1582 """Filter by `BlockType.slug`"""
1584 any_: Optional[list[str]] = Field( 1a
1585 default=None, description="A list of slugs to match"
1586 )
1588 def _get_filter_list( 1a
1589 self, db: "PrefectDBInterface"
1590 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1591 filters: list[sa.ColumnExpressionArgument[bool]] = []
1592 if self.any_ is not None:
1593 filters.append(db.BlockType.slug.in_(self.any_))
1595 return filters
1598class BlockTypeFilter(PrefectFilterBaseModel): 1a
1599 """Filter BlockTypes"""
1601 name: Optional[BlockTypeFilterName] = Field( 1a
1602 default=None, description="Filter criteria for `BlockType.name`"
1603 )
1605 slug: Optional[BlockTypeFilterSlug] = Field( 1a
1606 default=None, description="Filter criteria for `BlockType.slug`"
1607 )
1609 def _get_filter_list( 1a
1610 self, db: "PrefectDBInterface"
1611 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1612 filters: list[sa.ColumnExpressionArgument[bool]] = []
1614 if self.name is not None:
1615 filters.append(self.name.as_sql_filter())
1616 if self.slug is not None:
1617 filters.append(self.slug.as_sql_filter())
1619 return filters
1622class BlockSchemaFilterBlockTypeId(PrefectFilterBaseModel): 1a
1623 """Filter by `BlockSchema.block_type_id`."""
1625 any_: Optional[list[UUID]] = Field( 1a
1626 default=None, description="A list of block type ids to include"
1627 )
1629 def _get_filter_list( 1a
1630 self, db: "PrefectDBInterface"
1631 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1632 filters: list[sa.ColumnExpressionArgument[bool]] = []
1633 if self.any_ is not None:
1634 filters.append(db.BlockSchema.block_type_id.in_(self.any_))
1635 return filters
1638class BlockSchemaFilterId(PrefectFilterBaseModel): 1a
1639 """Filter by BlockSchema.id"""
1641 any_: Optional[list[UUID]] = Field( 1a
1642 default=None, description="A list of IDs to include"
1643 )
1645 def _get_filter_list( 1a
1646 self, db: "PrefectDBInterface"
1647 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1648 filters: list[sa.ColumnExpressionArgument[bool]] = []
1649 if self.any_ is not None:
1650 filters.append(db.BlockSchema.id.in_(self.any_))
1651 return filters
1654class BlockSchemaFilterCapabilities(PrefectFilterBaseModel): 1a
1655 """Filter by `BlockSchema.capabilities`"""
1657 all_: Optional[list[str]] = Field( 1a
1658 default=None,
1659 examples=[["write-storage", "read-storage"]],
1660 description=(
1661 "A list of block capabilities. Block entities will be returned only if an"
1662 " associated block schema has a superset of the defined capabilities."
1663 ),
1664 )
1666 def _get_filter_list( 1a
1667 self, db: "PrefectDBInterface"
1668 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1669 filters: list[sa.ColumnElement[bool]] = []
1670 if self.all_ is not None:
1671 filters.append(db.BlockSchema.capabilities.has_all(_as_array(self.all_)))
1672 return filters
1675class BlockSchemaFilterVersion(PrefectFilterBaseModel): 1a
1676 """Filter by `BlockSchema.capabilities`"""
1678 any_: Optional[list[str]] = Field( 1a
1679 default=None,
1680 examples=[["2.0.0", "2.1.0"]],
1681 description="A list of block schema versions.",
1682 )
1684 def _get_filter_list( 1a
1685 self, db: "PrefectDBInterface"
1686 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1687 filters: list[sa.ColumnElement[bool]] = []
1688 if self.any_ is not None:
1689 filters.append(db.BlockSchema.version.in_(self.any_))
1690 return filters
1693class BlockSchemaFilter(PrefectOperatorFilterBaseModel): 1a
1694 """Filter BlockSchemas"""
1696 block_type_id: Optional[BlockSchemaFilterBlockTypeId] = Field( 1a
1697 default=None, description="Filter criteria for `BlockSchema.block_type_id`"
1698 )
1699 block_capabilities: Optional[BlockSchemaFilterCapabilities] = Field( 1a
1700 default=None, description="Filter criteria for `BlockSchema.capabilities`"
1701 )
1702 id: Optional[BlockSchemaFilterId] = Field( 1a
1703 default=None, description="Filter criteria for `BlockSchema.id`"
1704 )
1705 version: Optional[BlockSchemaFilterVersion] = Field( 1a
1706 default=None, description="Filter criteria for `BlockSchema.version`"
1707 )
1709 def _get_filter_list( 1a
1710 self, db: "PrefectDBInterface"
1711 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1712 filters: list[sa.ColumnExpressionArgument[bool]] = []
1714 if self.block_type_id is not None:
1715 filters.append(self.block_type_id.as_sql_filter())
1716 if self.block_capabilities is not None:
1717 filters.append(self.block_capabilities.as_sql_filter())
1718 if self.id is not None:
1719 filters.append(self.id.as_sql_filter())
1720 if self.version is not None:
1721 filters.append(self.version.as_sql_filter())
1723 return filters
1726class BlockDocumentFilterIsAnonymous(PrefectFilterBaseModel): 1a
1727 """Filter by `BlockDocument.is_anonymous`."""
1729 eq_: Optional[bool] = Field( 1a
1730 default=None,
1731 description=(
1732 "Filter block documents for only those that are or are not anonymous."
1733 ),
1734 )
1736 def _get_filter_list( 1a
1737 self, db: "PrefectDBInterface"
1738 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1739 filters: list[sa.ColumnExpressionArgument[bool]] = []
1740 if self.eq_ is not None:
1741 filters.append(db.BlockDocument.is_anonymous.is_(self.eq_))
1742 return filters
1745class BlockDocumentFilterBlockTypeId(PrefectFilterBaseModel): 1a
1746 """Filter by `BlockDocument.block_type_id`."""
1748 any_: Optional[list[UUID]] = Field( 1a
1749 default=None, description="A list of block type ids to include"
1750 )
1752 def _get_filter_list( 1a
1753 self, db: "PrefectDBInterface"
1754 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1755 filters: list[sa.ColumnExpressionArgument[bool]] = []
1756 if self.any_ is not None:
1757 filters.append(db.BlockDocument.block_type_id.in_(self.any_))
1758 return filters
1761class BlockDocumentFilterId(PrefectFilterBaseModel): 1a
1762 """Filter by `BlockDocument.id`."""
1764 any_: Optional[list[UUID]] = Field( 1a
1765 default=None, description="A list of block ids to include"
1766 )
1768 def _get_filter_list( 1a
1769 self, db: "PrefectDBInterface"
1770 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1771 filters: list[sa.ColumnExpressionArgument[bool]] = []
1772 if self.any_ is not None:
1773 filters.append(db.BlockDocument.id.in_(self.any_))
1774 return filters
1777class BlockDocumentFilterName(PrefectFilterBaseModel): 1a
1778 """Filter by `BlockDocument.name`."""
1780 any_: Optional[list[str]] = Field( 1a
1781 default=None, description="A list of block names to include"
1782 )
1783 like_: Optional[str] = Field( 1a
1784 default=None,
1785 description=(
1786 "A string to match block names against. This can include "
1787 "SQL wildcard characters like `%` and `_`."
1788 ),
1789 examples=["my-block%"],
1790 )
1792 def _get_filter_list( 1a
1793 self, db: "PrefectDBInterface"
1794 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1795 filters: list[sa.ColumnExpressionArgument[bool]] = []
1796 if self.any_ is not None:
1797 filters.append(db.BlockDocument.name.in_(self.any_))
1798 if self.like_ is not None:
1799 filters.append(db.BlockDocument.name.ilike(f"%{self.like_}%"))
1800 return filters
1803class BlockDocumentFilter(PrefectOperatorFilterBaseModel): 1a
1804 """Filter BlockDocuments. Only BlockDocuments matching all criteria will be returned"""
1806 id: Optional[BlockDocumentFilterId] = Field( 1a
1807 default=None, description="Filter criteria for `BlockDocument.id`"
1808 )
1809 is_anonymous: Optional[BlockDocumentFilterIsAnonymous] = Field( 1a
1810 # default is to exclude anonymous blocks
1811 BlockDocumentFilterIsAnonymous(eq_=False),
1812 description=(
1813 "Filter criteria for `BlockDocument.is_anonymous`. "
1814 "Defaults to excluding anonymous blocks."
1815 ),
1816 )
1817 block_type_id: Optional[BlockDocumentFilterBlockTypeId] = Field( 1a
1818 default=None, description="Filter criteria for `BlockDocument.block_type_id`"
1819 )
1820 name: Optional[BlockDocumentFilterName] = Field( 1a
1821 default=None, description="Filter criteria for `BlockDocument.name`"
1822 )
1824 def _get_filter_list( 1a
1825 self, db: "PrefectDBInterface"
1826 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1827 filters: list[sa.ColumnExpressionArgument[bool]] = []
1828 if self.id is not None:
1829 filters.append(self.id.as_sql_filter())
1830 if self.is_anonymous is not None:
1831 filters.append(self.is_anonymous.as_sql_filter())
1832 if self.block_type_id is not None:
1833 filters.append(self.block_type_id.as_sql_filter())
1834 if self.name is not None:
1835 filters.append(self.name.as_sql_filter())
1836 return filters
1839class WorkQueueFilterId(PrefectFilterBaseModel): 1a
1840 """Filter by `WorkQueue.id`."""
1842 any_: Optional[list[UUID]] = Field( 1a
1843 default=None,
1844 description="A list of work queue ids to include",
1845 )
1847 def _get_filter_list( 1a
1848 self, db: "PrefectDBInterface"
1849 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1850 filters: list[sa.ColumnExpressionArgument[bool]] = []
1851 if self.any_ is not None:
1852 filters.append(db.WorkQueue.id.in_(self.any_))
1853 return filters
1856class WorkQueueFilterName(PrefectFilterBaseModel): 1a
1857 """Filter by `WorkQueue.name`."""
1859 any_: Optional[list[str]] = Field( 1a
1860 default=None,
1861 description="A list of work queue names to include",
1862 examples=[["wq-1", "wq-2"]],
1863 )
1865 startswith_: Optional[list[str]] = Field( 1a
1866 default=None,
1867 description=(
1868 "A list of case-insensitive starts-with matches. For example, "
1869 " passing 'marvin' will match "
1870 "'marvin', and 'Marvin-robot', but not 'sad-marvin'."
1871 ),
1872 examples=[["marvin", "Marvin-robot"]],
1873 )
1875 def _get_filter_list( 1a
1876 self, db: "PrefectDBInterface"
1877 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1878 filters: list[sa.ColumnExpressionArgument[bool]] = []
1879 if self.any_ is not None:
1880 filters.append(db.WorkQueue.name.in_(self.any_))
1881 if self.startswith_ is not None:
1882 filters.append(
1883 sa.or_(
1884 *[db.WorkQueue.name.ilike(f"{item}%") for item in self.startswith_]
1885 )
1886 )
1887 return filters
1890class WorkQueueFilter(PrefectOperatorFilterBaseModel): 1a
1891 """Filter work queues. Only work queues matching all criteria will be
1892 returned"""
1894 id: Optional[WorkQueueFilterId] = Field( 1a
1895 default=None, description="Filter criteria for `WorkQueue.id`"
1896 )
1898 name: Optional[WorkQueueFilterName] = Field( 1a
1899 default=None, description="Filter criteria for `WorkQueue.name`"
1900 )
1902 def _get_filter_list( 1a
1903 self, db: "PrefectDBInterface"
1904 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1905 filters: list[sa.ColumnExpressionArgument[bool]] = []
1907 if self.id is not None:
1908 filters.append(self.id.as_sql_filter())
1909 if self.name is not None:
1910 filters.append(self.name.as_sql_filter())
1912 return filters
1915class WorkPoolFilterId(PrefectFilterBaseModel): 1a
1916 """Filter by `WorkPool.id`."""
1918 any_: Optional[list[UUID]] = Field( 1a
1919 default=None, description="A list of work pool ids to include"
1920 )
1922 def _get_filter_list( 1a
1923 self, db: "PrefectDBInterface"
1924 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1925 filters: list[sa.ColumnExpressionArgument[bool]] = []
1926 if self.any_ is not None:
1927 filters.append(db.WorkPool.id.in_(self.any_))
1928 return filters
1931class WorkPoolFilterName(PrefectFilterBaseModel): 1a
1932 """Filter by `WorkPool.name`."""
1934 any_: Optional[list[str]] = Field( 1a
1935 default=None, description="A list of work pool names to include"
1936 )
1938 def _get_filter_list( 1a
1939 self, db: "PrefectDBInterface"
1940 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1941 filters: list[sa.ColumnExpressionArgument[bool]] = []
1942 if self.any_ is not None:
1943 filters.append(db.WorkPool.name.in_(self.any_))
1944 return filters
1947class WorkPoolFilterType(PrefectFilterBaseModel): 1a
1948 """Filter by `WorkPool.type`."""
1950 any_: Optional[list[str]] = Field( 1a
1951 default=None, description="A list of work pool types to include"
1952 )
1954 def _get_filter_list( 1a
1955 self, db: "PrefectDBInterface"
1956 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1957 filters: list[sa.ColumnExpressionArgument[bool]] = []
1958 if self.any_ is not None:
1959 filters.append(db.WorkPool.type.in_(self.any_))
1960 return filters
1963class WorkPoolFilter(PrefectOperatorFilterBaseModel): 1a
1964 """Filter work pools. Only work pools matching all criteria will be returned"""
1966 id: Optional[WorkPoolFilterId] = Field( 1a
1967 default=None, description="Filter criteria for `WorkPool.id`"
1968 )
1969 name: Optional[WorkPoolFilterName] = Field( 1a
1970 default=None, description="Filter criteria for `WorkPool.name`"
1971 )
1972 type: Optional[WorkPoolFilterType] = Field( 1a
1973 default=None, description="Filter criteria for `WorkPool.type`"
1974 )
1976 def _get_filter_list( 1a
1977 self, db: "PrefectDBInterface"
1978 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1979 filters: list[sa.ColumnExpressionArgument[bool]] = []
1981 if self.id is not None:
1982 filters.append(self.id.as_sql_filter())
1983 if self.name is not None:
1984 filters.append(self.name.as_sql_filter())
1985 if self.type is not None:
1986 filters.append(self.type.as_sql_filter())
1988 return filters
1991class WorkerFilterWorkPoolId(PrefectFilterBaseModel): 1a
1992 """Filter by `Worker.worker_config_id`."""
1994 any_: Optional[list[UUID]] = Field( 1a
1995 default=None, description="A list of work pool ids to include"
1996 )
1998 def _get_filter_list( 1a
1999 self, db: "PrefectDBInterface"
2000 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2001 filters: list[sa.ColumnExpressionArgument[bool]] = []
2002 if self.any_ is not None:
2003 filters.append(db.Worker.work_pool_id.in_(self.any_))
2004 return filters
2007class WorkerFilterStatus(PrefectFilterBaseModel): 1a
2008 """Filter by `Worker.status`."""
2010 any_: Optional[list[schemas.statuses.WorkerStatus]] = Field( 1a
2011 default=None, description="A list of worker statuses to include"
2012 )
2013 not_any_: Optional[list[schemas.statuses.WorkerStatus]] = Field( 1a
2014 default=None, description="A list of worker statuses to exclude"
2015 )
2017 def _get_filter_list( 1a
2018 self, db: "PrefectDBInterface"
2019 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2020 filters: list[sa.ColumnExpressionArgument[bool]] = []
2021 if self.any_ is not None:
2022 filters.append(db.Worker.status.in_(self.any_))
2023 if self.not_any_ is not None:
2024 filters.append(db.Worker.status.notin_(self.not_any_))
2025 return filters
2028class WorkerFilterLastHeartbeatTime(PrefectFilterBaseModel): 1a
2029 """Filter by `Worker.last_heartbeat_time`."""
2031 before_: Optional[DateTime] = Field( 1a
2032 default=None,
2033 description=(
2034 "Only include processes whose last heartbeat was at or before this time"
2035 ),
2036 )
2037 after_: Optional[DateTime] = Field( 1a
2038 default=None,
2039 description=(
2040 "Only include processes whose last heartbeat was at or after this time"
2041 ),
2042 )
2044 def _get_filter_list( 1a
2045 self, db: "PrefectDBInterface"
2046 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2047 filters: list[sa.ColumnExpressionArgument[bool]] = []
2048 if self.before_ is not None:
2049 filters.append(db.Worker.last_heartbeat_time <= self.before_)
2050 if self.after_ is not None:
2051 filters.append(db.Worker.last_heartbeat_time >= self.after_)
2052 return filters
2055class WorkerFilter(PrefectOperatorFilterBaseModel): 1a
2056 """Filter by `Worker.last_heartbeat_time`."""
2058 # worker_config_id: Optional[WorkerFilterWorkPoolId] = Field(
2059 # default=None, description="Filter criteria for `Worker.worker_config_id`"
2060 # )
2062 last_heartbeat_time: Optional[WorkerFilterLastHeartbeatTime] = Field( 1a
2063 default=None,
2064 description="Filter criteria for `Worker.last_heartbeat_time`",
2065 )
2067 status: Optional[WorkerFilterStatus] = Field( 1a
2068 default=None, description="Filter criteria for `Worker.status`"
2069 )
2071 def _get_filter_list( 1a
2072 self, db: "PrefectDBInterface"
2073 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2074 filters: list[sa.ColumnExpressionArgument[bool]] = []
2076 if self.last_heartbeat_time is not None:
2077 filters.append(self.last_heartbeat_time.as_sql_filter())
2079 if self.status is not None:
2080 filters.append(self.status.as_sql_filter())
2082 return filters
2085class ArtifactFilterId(PrefectFilterBaseModel): 1a
2086 """Filter by `Artifact.id`."""
2088 any_: Optional[list[UUID]] = Field( 1a
2089 default=None, description="A list of artifact ids to include"
2090 )
2092 def _get_filter_list( 1a
2093 self, db: "PrefectDBInterface"
2094 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2095 filters: list[sa.ColumnExpressionArgument[bool]] = []
2096 if self.any_ is not None:
2097 filters.append(db.Artifact.id.in_(self.any_))
2098 return filters
2101class ArtifactFilterKey(PrefectFilterBaseModel): 1a
2102 """Filter by `Artifact.key`."""
2104 any_: Optional[list[str]] = Field( 1a
2105 default=None, description="A list of artifact keys to include"
2106 )
2108 like_: Optional[str] = Field( 1a
2109 default=None,
2110 description=(
2111 "A string to match artifact keys against. This can include "
2112 "SQL wildcard characters like `%` and `_`."
2113 ),
2114 examples=["my-artifact-%"],
2115 )
2117 exists_: Optional[bool] = Field( 1a
2118 default=None,
2119 description=(
2120 "If `true`, only include artifacts with a non-null key. If `false`, "
2121 "only include artifacts with a null key."
2122 ),
2123 )
2125 def _get_filter_list( 1a
2126 self, db: "PrefectDBInterface"
2127 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2128 filters: list[sa.ColumnExpressionArgument[bool]] = []
2129 if self.any_ is not None:
2130 filters.append(db.Artifact.key.in_(self.any_))
2131 if self.like_ is not None:
2132 filters.append(db.Artifact.key.ilike(f"%{self.like_}%"))
2133 if self.exists_ is not None:
2134 filters.append(
2135 db.Artifact.key.isnot(None)
2136 if self.exists_
2137 else db.Artifact.key.is_(None)
2138 )
2139 return filters
2142class ArtifactFilterFlowRunId(PrefectFilterBaseModel): 1a
2143 """Filter by `Artifact.flow_run_id`."""
2145 any_: Optional[list[UUID]] = Field( 1a
2146 default=None, description="A list of flow run IDs to include"
2147 )
2149 def _get_filter_list( 1a
2150 self, db: "PrefectDBInterface"
2151 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2152 filters: list[sa.ColumnExpressionArgument[bool]] = []
2153 if self.any_ is not None:
2154 filters.append(db.Artifact.flow_run_id.in_(self.any_))
2155 return filters
2158class ArtifactFilterTaskRunId(PrefectFilterBaseModel): 1a
2159 """Filter by `Artifact.task_run_id`."""
2161 any_: Optional[list[UUID]] = Field( 1a
2162 default=None, description="A list of task run IDs to include"
2163 )
2165 def _get_filter_list( 1a
2166 self, db: "PrefectDBInterface"
2167 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2168 filters: list[sa.ColumnExpressionArgument[bool]] = []
2169 if self.any_ is not None:
2170 filters.append(db.Artifact.task_run_id.in_(self.any_))
2171 return filters
2174class ArtifactFilterType(PrefectFilterBaseModel): 1a
2175 """Filter by `Artifact.type`."""
2177 any_: Optional[list[str]] = Field( 1a
2178 default=None, description="A list of artifact types to include"
2179 )
2180 not_any_: Optional[list[str]] = Field( 1a
2181 default=None, description="A list of artifact types to exclude"
2182 )
2184 def _get_filter_list( 1a
2185 self, db: "PrefectDBInterface"
2186 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2187 filters: list[sa.ColumnExpressionArgument[bool]] = []
2188 if self.any_ is not None:
2189 filters.append(db.Artifact.type.in_(self.any_))
2190 if self.not_any_ is not None:
2191 filters.append(db.Artifact.type.notin_(self.not_any_))
2192 return filters
2195class ArtifactFilter(PrefectOperatorFilterBaseModel): 1a
2196 """Filter artifacts. Only artifacts matching all criteria will be returned"""
2198 id: Optional[ArtifactFilterId] = Field( 1a
2199 default=None, description="Filter criteria for `Artifact.id`"
2200 )
2201 key: Optional[ArtifactFilterKey] = Field( 1a
2202 default=None, description="Filter criteria for `Artifact.key`"
2203 )
2204 flow_run_id: Optional[ArtifactFilterFlowRunId] = Field( 1a
2205 default=None, description="Filter criteria for `Artifact.flow_run_id`"
2206 )
2207 task_run_id: Optional[ArtifactFilterTaskRunId] = Field( 1a
2208 default=None, description="Filter criteria for `Artifact.task_run_id`"
2209 )
2210 type: Optional[ArtifactFilterType] = Field( 1a
2211 default=None, description="Filter criteria for `Artifact.type`"
2212 )
2214 def _get_filter_list( 1a
2215 self, db: "PrefectDBInterface"
2216 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2217 filters: list[sa.ColumnExpressionArgument[bool]] = []
2219 if self.id is not None:
2220 filters.append(self.id.as_sql_filter())
2221 if self.key is not None:
2222 filters.append(self.key.as_sql_filter())
2223 if self.flow_run_id is not None:
2224 filters.append(self.flow_run_id.as_sql_filter())
2225 if self.task_run_id is not None:
2226 filters.append(self.task_run_id.as_sql_filter())
2227 if self.type is not None:
2228 filters.append(self.type.as_sql_filter())
2230 return filters
2233class ArtifactCollectionFilterLatestId(PrefectFilterBaseModel): 1a
2234 """Filter by `ArtifactCollection.latest_id`."""
2236 any_: Optional[list[UUID]] = Field( 1a
2237 default=None, description="A list of artifact ids to include"
2238 )
2240 def _get_filter_list( 1a
2241 self, db: "PrefectDBInterface"
2242 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2243 filters: list[sa.ColumnExpressionArgument[bool]] = []
2244 if self.any_ is not None:
2245 filters.append(db.ArtifactCollection.latest_id.in_(self.any_))
2246 return filters
2249class ArtifactCollectionFilterKey(PrefectFilterBaseModel): 1a
2250 """Filter by `ArtifactCollection.key`."""
2252 any_: Optional[list[str]] = Field( 1a
2253 default=None, description="A list of artifact keys to include"
2254 )
2256 like_: Optional[str] = Field( 1a
2257 default=None,
2258 description=(
2259 "A string to match artifact keys against. This can include "
2260 "SQL wildcard characters like `%` and `_`."
2261 ),
2262 examples=["my-artifact-%"],
2263 )
2265 exists_: Optional[bool] = Field( 1a
2266 default=None,
2267 description=(
2268 "If `true`, only include artifacts with a non-null key. If `false`, "
2269 "only include artifacts with a null key. Should return all rows in "
2270 "the ArtifactCollection table if specified."
2271 ),
2272 )
2274 def _get_filter_list( 1a
2275 self, db: "PrefectDBInterface"
2276 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2277 filters: list[sa.ColumnExpressionArgument[bool]] = []
2278 if self.any_ is not None:
2279 filters.append(db.ArtifactCollection.key.in_(self.any_))
2280 if self.like_ is not None:
2281 filters.append(db.ArtifactCollection.key.ilike(f"%{self.like_}%"))
2282 if self.exists_ is not None:
2283 filters.append(
2284 db.ArtifactCollection.key.isnot(None)
2285 if self.exists_
2286 else db.ArtifactCollection.key.is_(None)
2287 )
2288 return filters
2291class ArtifactCollectionFilterFlowRunId(PrefectFilterBaseModel): 1a
2292 """Filter by `ArtifactCollection.flow_run_id`."""
2294 any_: Optional[list[UUID]] = Field( 1a
2295 default=None, description="A list of flow run IDs to include"
2296 )
2298 def _get_filter_list( 1a
2299 self, db: "PrefectDBInterface"
2300 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2301 filters: list[sa.ColumnExpressionArgument[bool]] = []
2302 if self.any_ is not None:
2303 filters.append(db.ArtifactCollection.flow_run_id.in_(self.any_))
2304 return filters
2307class ArtifactCollectionFilterTaskRunId(PrefectFilterBaseModel): 1a
2308 """Filter by `ArtifactCollection.task_run_id`."""
2310 any_: Optional[list[UUID]] = Field( 1a
2311 default=None, description="A list of task run IDs to include"
2312 )
2314 def _get_filter_list( 1a
2315 self, db: "PrefectDBInterface"
2316 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2317 filters: list[sa.ColumnExpressionArgument[bool]] = []
2318 if self.any_ is not None:
2319 filters.append(db.ArtifactCollection.task_run_id.in_(self.any_))
2320 return filters
2323class ArtifactCollectionFilterType(PrefectFilterBaseModel): 1a
2324 """Filter by `ArtifactCollection.type`."""
2326 any_: Optional[list[str]] = Field( 1a
2327 default=None, description="A list of artifact types to include"
2328 )
2329 not_any_: Optional[list[str]] = Field( 1a
2330 default=None, description="A list of artifact types to exclude"
2331 )
2333 def _get_filter_list( 1a
2334 self, db: "PrefectDBInterface"
2335 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2336 filters: list[sa.ColumnExpressionArgument[bool]] = []
2337 if self.any_ is not None:
2338 filters.append(db.ArtifactCollection.type.in_(self.any_))
2339 if self.not_any_ is not None:
2340 filters.append(db.ArtifactCollection.type.notin_(self.not_any_))
2341 return filters
2344class ArtifactCollectionFilter(PrefectOperatorFilterBaseModel): 1a
2345 """Filter artifact collections. Only artifact collections matching all criteria will be returned"""
2347 latest_id: Optional[ArtifactCollectionFilterLatestId] = Field( 1a
2348 default=None, description="Filter criteria for `Artifact.id`"
2349 )
2350 key: Optional[ArtifactCollectionFilterKey] = Field( 1a
2351 default=None, description="Filter criteria for `Artifact.key`"
2352 )
2353 flow_run_id: Optional[ArtifactCollectionFilterFlowRunId] = Field( 1a
2354 default=None, description="Filter criteria for `Artifact.flow_run_id`"
2355 )
2356 task_run_id: Optional[ArtifactCollectionFilterTaskRunId] = Field( 1a
2357 default=None, description="Filter criteria for `Artifact.task_run_id`"
2358 )
2359 type: Optional[ArtifactCollectionFilterType] = Field( 1a
2360 default=None, description="Filter criteria for `Artifact.type`"
2361 )
2363 def _get_filter_list( 1a
2364 self, db: "PrefectDBInterface"
2365 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2366 filters: list[sa.ColumnExpressionArgument[bool]] = []
2368 if self.latest_id is not None:
2369 filters.append(self.latest_id.as_sql_filter())
2370 if self.key is not None:
2371 filters.append(self.key.as_sql_filter())
2372 if self.flow_run_id is not None:
2373 filters.append(self.flow_run_id.as_sql_filter())
2374 if self.task_run_id is not None:
2375 filters.append(self.task_run_id.as_sql_filter())
2376 if self.type is not None:
2377 filters.append(self.type.as_sql_filter())
2379 return filters
2382class VariableFilterId(PrefectFilterBaseModel): 1a
2383 """Filter by `Variable.id`."""
2385 any_: Optional[list[UUID]] = Field( 1a
2386 default=None, description="A list of variable ids to include"
2387 )
2389 def _get_filter_list( 1a
2390 self, db: "PrefectDBInterface"
2391 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2392 filters: list[sa.ColumnExpressionArgument[bool]] = []
2393 if self.any_ is not None:
2394 filters.append(db.Variable.id.in_(self.any_))
2395 return filters
2398class VariableFilterName(PrefectFilterBaseModel): 1a
2399 """Filter by `Variable.name`."""
2401 any_: Optional[list[str]] = Field( 1a
2402 default=None, description="A list of variables names to include"
2403 )
2404 like_: Optional[str] = Field( 1a
2405 default=None,
2406 description=(
2407 "A string to match variable names against. This can include "
2408 "SQL wildcard characters like `%` and `_`."
2409 ),
2410 examples=["my_variable_%"],
2411 )
2413 def _get_filter_list( 1a
2414 self, db: "PrefectDBInterface"
2415 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2416 filters: list[sa.ColumnExpressionArgument[bool]] = []
2417 if self.any_ is not None:
2418 filters.append(db.Variable.name.in_(self.any_))
2419 if self.like_ is not None:
2420 filters.append(db.Variable.name.ilike(f"%{self.like_}%"))
2421 return filters
2424class VariableFilterTags(PrefectOperatorFilterBaseModel): 1a
2425 """Filter by `Variable.tags`."""
2427 all_: Optional[list[str]] = Field( 1a
2428 default=None,
2429 examples=[["tag-1", "tag-2"]],
2430 description=(
2431 "A list of tags. Variables will be returned only if their tags are a"
2432 " superset of the list"
2433 ),
2434 )
2435 is_null_: Optional[bool] = Field( 1a
2436 default=None, description="If true, only include Variables without tags"
2437 )
2439 def _get_filter_list( 1a
2440 self, db: "PrefectDBInterface"
2441 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2442 filters: list[sa.ColumnElement[bool]] = []
2443 if self.all_ is not None:
2444 filters.append(db.Variable.tags.has_all(_as_array(self.all_)))
2445 if self.is_null_ is not None:
2446 filters.append(
2447 db.Variable.tags == [] if self.is_null_ else db.Variable.tags != []
2448 )
2449 return filters
2452class VariableFilter(PrefectOperatorFilterBaseModel): 1a
2453 """Filter variables. Only variables matching all criteria will be returned"""
2455 id: Optional[VariableFilterId] = Field( 1a
2456 default=None, description="Filter criteria for `Variable.id`"
2457 )
2458 name: Optional[VariableFilterName] = Field( 1a
2459 default=None, description="Filter criteria for `Variable.name`"
2460 )
2461 tags: Optional[VariableFilterTags] = Field( 1a
2462 default=None, description="Filter criteria for `Variable.tags`"
2463 )
2465 def _get_filter_list( 1a
2466 self, db: "PrefectDBInterface"
2467 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2468 filters: list[sa.ColumnExpressionArgument[bool]] = []
2470 if self.id is not None:
2471 filters.append(self.id.as_sql_filter())
2472 if self.name is not None:
2473 filters.append(self.name.as_sql_filter())
2474 if self.tags is not None:
2475 filters.append(self.tags.as_sql_filter())
2476 return filters