Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/filters.py: 88%
1032 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Schemas that define Prefect REST API filtering operations.
4Each filter schema includes logic for transforming itself into a SQL `where` clause.
5"""
7from collections.abc import Iterable, Sequence 1c
8from typing import TYPE_CHECKING, ClassVar, Optional 1c
9from uuid import UUID 1c
11from pydantic import ConfigDict, Field 1c
12from sqlalchemy.sql.functions import coalesce 1c
14import prefect.server.schemas as schemas 1c
15from prefect.server.utilities.database import db_injector 1c
16from prefect.server.utilities.schemas.bases import PrefectBaseModel 1c
17from prefect.server.utilities.text_search_parser import ( 1c
18 parse_text_search_query,
19)
20from prefect.types import DateTime 1c
21from prefect.utilities.collections import AutoEnum 1c
22from prefect.utilities.importtools import lazy_import 1c
24if TYPE_CHECKING: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true1c
25 import sqlalchemy as sa
26 from sqlalchemy.dialects import postgresql
28 from prefect.server.database import PrefectDBInterface
29 from prefect.server.schemas.core import Log
30else:
31 sa = lazy_import("sqlalchemy") 1c
32 postgresql = lazy_import("sqlalchemy.dialects.postgresql") 1c
34# TODO: Consider moving the `as_sql_filter` functions out of here since they are a
35# database model level function and do not properly separate concerns when
36# present in the schemas module
39def _as_array(elems: Sequence[str]) -> sa.ColumnElement[Sequence[str]]: 1c
40 return sa.cast(postgresql.array(elems), type_=postgresql.ARRAY(sa.String())) 1bda
43class Operator(AutoEnum): 1c
44 """Operators for combining filter criteria."""
46 and_ = AutoEnum.auto() 1c
47 or_ = AutoEnum.auto() 1c
50class PrefectFilterBaseModel(PrefectBaseModel): 1c
51 """Base model for Prefect filters"""
53 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1c
55 @db_injector 1c
56 def as_sql_filter(self, db: "PrefectDBInterface") -> sa.ColumnElement[bool]: 1c
57 """Generate SQL filter from provided filter parameters. If no filters parameters are available, return a TRUE filter."""
58 filters = self._get_filter_list(db) 1bfdgea
59 if not filters: 1bfdgea
60 return sa.true() 1bda
61 return sa.and_(*filters) 1bfdgea
63 def _get_filter_list( 1c
64 self, db: "PrefectDBInterface"
65 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
66 """Return a list of boolean filter statements based on filter parameters"""
67 raise NotImplementedError("_get_filter_list must be implemented")
70class PrefectOperatorFilterBaseModel(PrefectFilterBaseModel): 1c
71 """Base model for Prefect filters that combines criteria with a user-provided operator"""
73 operator: Operator = Field( 1c
74 default=Operator.and_,
75 description="Operator for combining filter criteria. Defaults to 'and_'.",
76 )
78 @db_injector 1c
79 def as_sql_filter(self, db: "PrefectDBInterface") -> sa.ColumnElement[bool]: 1c
80 filters = self._get_filter_list(db) 1bfdgea
81 if not filters: 1bfdgea
82 return sa.true() 1bdea
83 return sa.and_(*filters) if self.operator == Operator.and_ else sa.or_(*filters) 1bfdgea
86class FlowFilterId(PrefectFilterBaseModel): 1c
87 """Filter by `Flow.id`."""
89 any_: Optional[list[UUID]] = Field( 1c
90 default=None, description="A list of flow ids to include"
91 )
93 def _get_filter_list( 1c
94 self, db: "PrefectDBInterface"
95 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
96 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
97 if self.any_ is not None: 1bda
98 filters.append(db.Flow.id.in_(self.any_)) 1bda
99 return filters 1bda
102class FlowFilterDeployment(PrefectOperatorFilterBaseModel): 1c
103 """Filter by flows by deployment"""
105 is_null_: Optional[bool] = Field( 1c
106 default=None,
107 description="If true, only include flows without deployments",
108 )
110 def _get_filter_list( 1c
111 self, db: "PrefectDBInterface"
112 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
113 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
115 if self.is_null_ is not None: 1ba
116 deployments_subquery = ( 1ba
117 sa.select(db.Deployment.flow_id).distinct().subquery()
118 )
120 if self.is_null_: 1ba
121 filters.append( 1ba
122 db.Flow.id.not_in(sa.select(deployments_subquery.c.flow_id))
123 )
124 else:
125 filters.append( 1ba
126 db.Flow.id.in_(sa.select(deployments_subquery.c.flow_id))
127 )
129 return filters 1ba
132class FlowFilterName(PrefectFilterBaseModel): 1c
133 """Filter by `Flow.name`."""
135 any_: Optional[list[str]] = Field( 1c
136 default=None,
137 description="A list of flow names to include",
138 examples=[["my-flow-1", "my-flow-2"]],
139 )
141 like_: Optional[str] = Field( 1c
142 default=None,
143 description=(
144 "A case-insensitive partial match. For example, "
145 " passing 'marvin' will match "
146 "'marvin', 'sad-Marvin', and 'marvin-robot'."
147 ),
148 examples=["marvin"],
149 )
151 def _get_filter_list( 1c
152 self, db: "PrefectDBInterface"
153 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
154 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
155 if self.any_ is not None: 1bda
156 filters.append(db.Flow.name.in_(self.any_)) 1bda
157 if self.like_ is not None: 1bda
158 filters.append(db.Flow.name.ilike(f"%{self.like_}%")) 1bda
159 return filters 1bda
162class FlowFilterTags(PrefectOperatorFilterBaseModel): 1c
163 """Filter by `Flow.tags`."""
165 all_: Optional[list[str]] = Field( 1c
166 default=None,
167 examples=[["tag-1", "tag-2"]],
168 description=(
169 "A list of tags. Flows will be returned only if their tags are a superset"
170 " of the list"
171 ),
172 )
173 is_null_: Optional[bool] = Field( 1c
174 default=None, description="If true, only include flows without tags"
175 )
177 def _get_filter_list( 1c
178 self, db: "PrefectDBInterface"
179 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
180 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
181 if self.all_ is not None: 1bda
182 filters.append(db.Flow.tags.has_all(_as_array(self.all_))) 1ba
183 if self.is_null_ is not None: 1bda
184 filters.append(db.Flow.tags == [] if self.is_null_ else db.Flow.tags != []) 1ba
185 return filters 1bda
188class FlowFilter(PrefectOperatorFilterBaseModel): 1c
189 """Filter for flows. Only flows matching all criteria will be returned."""
191 id: Optional[FlowFilterId] = Field( 1c
192 default=None, description="Filter criteria for `Flow.id`"
193 )
194 deployment: Optional[FlowFilterDeployment] = Field( 1c
195 default=None, description="Filter criteria for Flow deployments"
196 )
197 name: Optional[FlowFilterName] = Field( 1c
198 default=None, description="Filter criteria for `Flow.name`"
199 )
200 tags: Optional[FlowFilterTags] = Field( 1c
201 default=None, description="Filter criteria for `Flow.tags`"
202 )
204 def _get_filter_list( 1c
205 self, db: "PrefectDBInterface"
206 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
207 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
209 if self.id is not None: 1bda
210 filters.append(self.id.as_sql_filter()) 1bda
211 if self.deployment is not None: 1bda
212 filters.append(self.deployment.as_sql_filter()) 1ba
213 if self.name is not None: 1bda
214 filters.append(self.name.as_sql_filter()) 1bda
215 if self.tags is not None: 1bda
216 filters.append(self.tags.as_sql_filter()) 1bda
218 return filters 1bda
221class FlowRunFilterId(PrefectFilterBaseModel): 1c
222 """Filter by `FlowRun.id`."""
224 any_: Optional[list[UUID]] = Field( 1c
225 default=None, description="A list of flow run ids to include"
226 )
227 not_any_: Optional[list[UUID]] = Field( 1c
228 default=None, description="A list of flow run ids to exclude"
229 )
231 def _get_filter_list( 1c
232 self, db: "PrefectDBInterface"
233 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
234 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
235 if self.any_ is not None: 1ba
236 filters.append(db.FlowRun.id.in_(self.any_)) 1ba
237 if self.not_any_ is not None: 1ba
238 filters.append(db.FlowRun.id.not_in(self.not_any_))
239 return filters 1ba
242class FlowRunFilterName(PrefectFilterBaseModel): 1c
243 """Filter by `FlowRun.name`."""
245 any_: Optional[list[str]] = Field( 1c
246 default=None,
247 description="A list of flow run names to include",
248 examples=[["my-flow-run-1", "my-flow-run-2"]],
249 )
251 like_: Optional[str] = Field( 1c
252 default=None,
253 description=(
254 "A case-insensitive partial match. For example, "
255 " passing 'marvin' will match "
256 "'marvin', 'sad-Marvin', and 'marvin-robot'."
257 ),
258 examples=["marvin"],
259 )
261 def _get_filter_list( 1c
262 self, db: "PrefectDBInterface"
263 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
264 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
265 if self.any_ is not None: 1bda
266 filters.append(db.FlowRun.name.in_(self.any_)) 1ba
267 if self.like_ is not None: 1bda
268 filters.append(db.FlowRun.name.ilike(f"%{self.like_}%")) 1ba
269 return filters 1bda
272class FlowRunFilterTags(PrefectOperatorFilterBaseModel): 1c
273 """Filter by `FlowRun.tags`."""
275 all_: Optional[list[str]] = Field( 1c
276 default=None,
277 examples=[["tag-1", "tag-2"]],
278 description=(
279 "A list of tags. Flow runs will be returned only if their tags are a"
280 " superset of the list"
281 ),
282 )
284 any_: Optional[list[str]] = Field( 1c
285 default=None,
286 examples=[["tag-1", "tag-2"]],
287 description="A list of tags to include",
288 )
290 is_null_: Optional[bool] = Field( 1c
291 default=None, description="If true, only include flow runs without tags"
292 )
294 def _get_filter_list( 1c
295 self, db: "PrefectDBInterface"
296 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
297 def as_array(elems: Sequence[str]) -> sa.ColumnElement[Sequence[str]]: 1ba
298 return sa.cast(postgresql.array(elems), type_=postgresql.ARRAY(sa.String())) 1ba
300 filters: list[sa.ColumnElement[bool]] = [] 1ba
301 if self.all_ is not None: 1ba
302 filters.append(db.FlowRun.tags.has_all(as_array(self.all_))) 1ba
303 if self.any_ is not None: 1ba
304 filters.append(db.FlowRun.tags.has_any(as_array(self.any_))) 1ba
305 if self.is_null_ is not None: 1ba
306 filters.append( 1b
307 db.FlowRun.tags == [] if self.is_null_ else db.FlowRun.tags != []
308 )
309 return filters 1ba
312class FlowRunFilterDeploymentId(PrefectOperatorFilterBaseModel): 1c
313 """Filter by `FlowRun.deployment_id`."""
315 any_: Optional[list[UUID]] = Field( 1c
316 default=None, description="A list of flow run deployment ids to include"
317 )
318 is_null_: Optional[bool] = Field( 1c
319 default=None,
320 description="If true, only include flow runs without deployment ids",
321 )
323 def _get_filter_list( 1c
324 self, db: "PrefectDBInterface"
325 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
326 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
327 if self.any_ is not None: 1ba
328 filters.append(db.FlowRun.deployment_id.in_(self.any_)) 1ba
329 if self.is_null_ is not None: 1ba
330 filters.append( 1ba
331 db.FlowRun.deployment_id.is_(None)
332 if self.is_null_
333 else db.FlowRun.deployment_id.is_not(None)
334 )
335 return filters 1ba
338class FlowRunFilterWorkQueueName(PrefectOperatorFilterBaseModel): 1c
339 """Filter by `FlowRun.work_queue_name`."""
341 any_: Optional[list[str]] = Field( 1c
342 default=None,
343 description="A list of work queue names to include",
344 examples=[["work_queue_1", "work_queue_2"]],
345 )
346 is_null_: Optional[bool] = Field( 1c
347 default=None,
348 description="If true, only include flow runs without work queue names",
349 )
351 def _get_filter_list( 1c
352 self, db: "PrefectDBInterface"
353 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
354 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
355 if self.any_ is not None: 1ba
356 filters.append(db.FlowRun.work_queue_name.in_(self.any_))
357 if self.is_null_ is not None: 1ba
358 filters.append( 1ba
359 db.FlowRun.work_queue_name.is_(None)
360 if self.is_null_
361 else db.FlowRun.work_queue_name.is_not(None)
362 )
363 return filters 1ba
366class FlowRunFilterStateType(PrefectFilterBaseModel): 1c
367 """Filter by `FlowRun.state_type`."""
369 any_: Optional[list[schemas.states.StateType]] = Field( 1c
370 default=None, description="A list of flow run state types to include"
371 )
372 not_any_: Optional[list[schemas.states.StateType]] = Field( 1c
373 default=None, description="A list of flow run state types to exclude"
374 )
376 def _get_filter_list( 1c
377 self, db: "PrefectDBInterface"
378 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
379 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
380 if self.any_ is not None: 1bda
381 filters.append(db.FlowRun.state_type.in_(self.any_)) 1bda
382 if self.not_any_ is not None: 1bda
383 filters.append(db.FlowRun.state_type.not_in(self.not_any_)) 1b
384 return filters 1bda
387class FlowRunFilterStateName(PrefectFilterBaseModel): 1c
388 """Filter by `FlowRun.state_name`."""
390 any_: Optional[list[str]] = Field( 1c
391 default=None, description="A list of flow run state names to include"
392 )
393 not_any_: Optional[list[str]] = Field( 1c
394 default=None, description="A list of flow run state names to exclude"
395 )
397 def _get_filter_list( 1c
398 self, db: "PrefectDBInterface"
399 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
400 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
401 if self.any_ is not None: 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true1ba
402 filters.append(db.FlowRun.state_name.in_(self.any_))
403 if self.not_any_ is not None: 1ba
404 filters.append(db.FlowRun.state_name.not_in(self.not_any_))
405 return filters 1ba
408class FlowRunFilterState(PrefectOperatorFilterBaseModel): 1c
409 """Filter by `FlowRun.state_type` and `FlowRun.state_name`."""
411 type: Optional[FlowRunFilterStateType] = Field( 1c
412 default=None, description="Filter criteria for `FlowRun.state_type`"
413 )
414 name: Optional[FlowRunFilterStateName] = Field( 1c
415 default=None, description="Filter criteria for `FlowRun.state_name`"
416 )
418 def _get_filter_list( 1c
419 self, db: "PrefectDBInterface"
420 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
421 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
422 if self.type is not None: 1bda
423 filter = self.type.as_sql_filter() 1bda
424 if isinstance(filter, sa.BinaryExpression): 1bda
425 filters.append(filter) 1bda
426 if self.name is not None: 1bda
427 filter = self.name.as_sql_filter() 1ba
428 if isinstance(filter, sa.BinaryExpression): 1ba
429 filters.append(filter)
430 return filters 1bda
433class FlowRunFilterFlowVersion(PrefectFilterBaseModel): 1c
434 """Filter by `FlowRun.flow_version`."""
436 any_: Optional[list[str]] = Field( 1c
437 default=None, description="A list of flow run flow_versions to include"
438 )
440 def _get_filter_list( 1c
441 self, db: "PrefectDBInterface"
442 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
443 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
444 if self.any_ is not None: 1bda
445 filters.append(db.FlowRun.flow_version.in_(self.any_)) 1ba
446 return filters 1bda
449class FlowRunFilterStartTime(PrefectFilterBaseModel): 1c
450 """Filter by `FlowRun.start_time`."""
452 before_: Optional[DateTime] = Field( 1c
453 default=None,
454 description="Only include flow runs starting at or before this time",
455 )
456 after_: Optional[DateTime] = Field( 1c
457 default=None,
458 description="Only include flow runs starting at or after this time",
459 )
460 is_null_: Optional[bool] = Field( 1c
461 default=None, description="If true, only return flow runs without a start time"
462 )
464 def _get_filter_list( 1c
465 self, db: "PrefectDBInterface"
466 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
467 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
468 if self.before_ is not None: 1bda
469 filters.append( 1bda
470 coalesce(db.FlowRun.start_time, db.FlowRun.expected_start_time)
471 <= self.before_
472 )
473 if self.after_ is not None: 1bda
474 filters.append( 1ba
475 coalesce(db.FlowRun.start_time, db.FlowRun.expected_start_time)
476 >= self.after_
477 )
478 if self.is_null_ is not None: 1bda
479 filters.append( 1bda
480 db.FlowRun.start_time.is_(None)
481 if self.is_null_
482 else db.FlowRun.start_time.is_not(None)
483 )
484 return filters 1bda
487class FlowRunFilterEndTime(PrefectFilterBaseModel): 1c
488 """Filter by `FlowRun.end_time`."""
490 before_: Optional[DateTime] = Field( 1c
491 default=None,
492 description="Only include flow runs ending at or before this time",
493 )
494 after_: Optional[DateTime] = Field( 1c
495 default=None,
496 description="Only include flow runs ending at or after this time",
497 )
498 is_null_: Optional[bool] = Field( 1c
499 default=None, description="If true, only return flow runs without an end time"
500 )
502 def _get_filter_list( 1c
503 self, db: "PrefectDBInterface"
504 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
505 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
506 if self.before_ is not None: 1bda
507 filters.append(db.FlowRun.end_time <= self.before_) 1ba
508 if self.after_ is not None: 1bda
509 filters.append(db.FlowRun.end_time >= self.after_) 1ba
510 if self.is_null_ is not None: 1bda
511 filters.append(
512 db.FlowRun.end_time.is_(None)
513 if self.is_null_
514 else db.FlowRun.end_time.is_not(None)
515 )
516 return filters 1bda
519class FlowRunFilterExpectedStartTime(PrefectFilterBaseModel): 1c
520 """Filter by `FlowRun.expected_start_time`."""
522 before_: Optional[DateTime] = Field( 1c
523 default=None,
524 description="Only include flow runs scheduled to start at or before this time",
525 )
526 after_: Optional[DateTime] = Field( 1c
527 default=None,
528 description="Only include flow runs scheduled to start at or after this time",
529 )
531 def _get_filter_list( 1c
532 self, db: "PrefectDBInterface"
533 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
534 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
535 if self.before_ is not None: 1ba
536 filters.append(db.FlowRun.expected_start_time <= self.before_)
537 if self.after_ is not None: 1ba
538 filters.append(db.FlowRun.expected_start_time >= self.after_) 1ba
539 return filters 1ba
542class FlowRunFilterNextScheduledStartTime(PrefectFilterBaseModel): 1c
543 """Filter by `FlowRun.next_scheduled_start_time`."""
545 before_: Optional[DateTime] = Field( 1c
546 default=None,
547 description=(
548 "Only include flow runs with a next_scheduled_start_time or before this"
549 " time"
550 ),
551 )
552 after_: Optional[DateTime] = Field( 1c
553 default=None,
554 description=(
555 "Only include flow runs with a next_scheduled_start_time at or after this"
556 " time"
557 ),
558 )
560 def _get_filter_list( 1c
561 self, db: "PrefectDBInterface"
562 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
563 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
564 if self.before_ is not None: 1bda
565 filters.append(db.FlowRun.next_scheduled_start_time <= self.before_) 1bda
566 if self.after_ is not None: 1bda
567 filters.append(db.FlowRun.next_scheduled_start_time >= self.after_) 1ba
568 return filters 1bda
571class FlowRunFilterParentFlowRunId(PrefectOperatorFilterBaseModel): 1c
572 """Filter for subflows of a given flow run"""
574 any_: Optional[list[UUID]] = Field( 1c
575 default=None, description="A list of parent flow run ids to include"
576 )
578 def _get_filter_list( 1c
579 self, db: "PrefectDBInterface"
580 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
581 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
582 if self.any_ is not None: 1bda
583 filters.append( 1ba
584 db.FlowRun.id.in_(
585 sa.select(db.FlowRun.id)
586 .join(
587 db.TaskRun,
588 sa.and_(
589 db.TaskRun.id == db.FlowRun.parent_task_run_id,
590 ),
591 )
592 .where(db.TaskRun.flow_run_id.in_(self.any_))
593 )
594 )
595 return filters 1bda
598class FlowRunFilterParentTaskRunId(PrefectOperatorFilterBaseModel): 1c
599 """Filter by `FlowRun.parent_task_run_id`."""
601 any_: Optional[list[UUID]] = Field( 1c
602 default=None, description="A list of flow run parent_task_run_ids to include"
603 )
604 is_null_: Optional[bool] = Field( 1c
605 default=None,
606 description="If true, only include flow runs without parent_task_run_id",
607 )
609 def _get_filter_list( 1c
610 self, db: "PrefectDBInterface"
611 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
612 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
613 if self.any_ is not None: 1bda
614 filters.append(db.FlowRun.parent_task_run_id.in_(self.any_)) 1ba
615 if self.is_null_ is not None: 1bda
616 filters.append(
617 db.FlowRun.parent_task_run_id.is_(None)
618 if self.is_null_
619 else db.FlowRun.parent_task_run_id.is_not(None)
620 )
621 return filters 1bda
624class FlowRunFilterIdempotencyKey(PrefectFilterBaseModel): 1c
625 """Filter by FlowRun.idempotency_key."""
627 any_: Optional[list[str]] = Field( 1c
628 default=None, description="A list of flow run idempotency keys to include"
629 )
630 not_any_: Optional[list[str]] = Field( 1c
631 default=None, description="A list of flow run idempotency keys to exclude"
632 )
634 def _get_filter_list( 1c
635 self, db: "PrefectDBInterface"
636 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
637 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
638 if self.any_ is not None: 1bda
639 filters.append(db.FlowRun.idempotency_key.in_(self.any_)) 1ba
640 if self.not_any_ is not None: 1bda
641 filters.append(db.FlowRun.idempotency_key.not_in(self.not_any_))
642 return filters 1bda
645class FlowRunFilter(PrefectOperatorFilterBaseModel): 1c
646 """Filter flow runs. Only flow runs matching all criteria will be returned"""
648 id: Optional[FlowRunFilterId] = Field( 1c
649 default=None, description="Filter criteria for `FlowRun.id`"
650 )
651 name: Optional[FlowRunFilterName] = Field( 1c
652 default=None, description="Filter criteria for `FlowRun.name`"
653 )
654 tags: Optional[FlowRunFilterTags] = Field( 1c
655 default=None, description="Filter criteria for `FlowRun.tags`"
656 )
657 deployment_id: Optional[FlowRunFilterDeploymentId] = Field( 1c
658 default=None, description="Filter criteria for `FlowRun.deployment_id`"
659 )
660 work_queue_name: Optional[FlowRunFilterWorkQueueName] = Field( 1c
661 default=None, description="Filter criteria for `FlowRun.work_queue_name"
662 )
663 state: Optional[FlowRunFilterState] = Field( 1c
664 default=None, description="Filter criteria for `FlowRun.state`"
665 )
666 flow_version: Optional[FlowRunFilterFlowVersion] = Field( 1c
667 default=None, description="Filter criteria for `FlowRun.flow_version`"
668 )
669 start_time: Optional[FlowRunFilterStartTime] = Field( 1c
670 default=None, description="Filter criteria for `FlowRun.start_time`"
671 )
672 end_time: Optional[FlowRunFilterEndTime] = Field( 1c
673 default=None, description="Filter criteria for `FlowRun.end_time`"
674 )
675 expected_start_time: Optional[FlowRunFilterExpectedStartTime] = Field( 1c
676 default=None, description="Filter criteria for `FlowRun.expected_start_time`"
677 )
678 next_scheduled_start_time: Optional[FlowRunFilterNextScheduledStartTime] = Field( 1c
679 default=None,
680 description="Filter criteria for `FlowRun.next_scheduled_start_time`",
681 )
682 parent_flow_run_id: Optional[FlowRunFilterParentFlowRunId] = Field( 1c
683 default=None, description="Filter criteria for subflows of the given flow runs"
684 )
685 parent_task_run_id: Optional[FlowRunFilterParentTaskRunId] = Field( 1c
686 default=None, description="Filter criteria for `FlowRun.parent_task_run_id`"
687 )
688 idempotency_key: Optional[FlowRunFilterIdempotencyKey] = Field( 1c
689 default=None, description="Filter criteria for `FlowRun.idempotency_key`"
690 )
692 def only_filters_on_id(self) -> bool: 1c
693 return bool( 1ba
694 self.id is not None
695 and (self.id.any_ and not self.id.not_any_)
696 and self.name is None
697 and self.tags is None
698 and self.deployment_id is None
699 and self.work_queue_name is None
700 and self.state is None
701 and self.flow_version is None
702 and self.start_time is None
703 and self.end_time is None
704 and self.expected_start_time is None
705 and self.next_scheduled_start_time is None
706 and self.parent_flow_run_id is None
707 and self.parent_task_run_id is None
708 and self.idempotency_key is None
709 )
711 def _get_filter_list( 1c
712 self, db: "PrefectDBInterface"
713 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
714 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
716 if self.id is not None: 1bda
717 filters.append(self.id.as_sql_filter()) 1ba
718 if self.name is not None: 1bda
719 filters.append(self.name.as_sql_filter()) 1bda
720 if self.tags is not None: 1bda
721 filters.append(self.tags.as_sql_filter()) 1ba
722 if self.deployment_id is not None: 1bda
723 filters.append(self.deployment_id.as_sql_filter()) 1ba
724 if self.work_queue_name is not None: 1bda
725 filters.append(self.work_queue_name.as_sql_filter()) 1ba
726 if self.flow_version is not None: 1bda
727 filters.append(self.flow_version.as_sql_filter()) 1bda
728 if self.state is not None: 1bda
729 filters.append(self.state.as_sql_filter()) 1bda
730 if self.start_time is not None: 1bda
731 filters.append(self.start_time.as_sql_filter()) 1bda
732 if self.end_time is not None: 1bda
733 filters.append(self.end_time.as_sql_filter()) 1bda
734 if self.expected_start_time is not None: 1bda
735 filters.append(self.expected_start_time.as_sql_filter()) 1ba
736 if self.next_scheduled_start_time is not None: 1bda
737 filters.append(self.next_scheduled_start_time.as_sql_filter()) 1bda
738 if self.parent_flow_run_id is not None: 1bda
739 filters.append(self.parent_flow_run_id.as_sql_filter()) 1bda
740 if self.parent_task_run_id is not None: 1bda
741 filters.append(self.parent_task_run_id.as_sql_filter()) 1bda
742 if self.idempotency_key is not None: 1bda
743 filters.append(self.idempotency_key.as_sql_filter()) 1bda
745 return filters 1bda
748class TaskRunFilterFlowRunId(PrefectOperatorFilterBaseModel): 1c
749 """Filter by `TaskRun.flow_run_id`."""
751 any_: Optional[list[UUID]] = Field( 1c
752 default=None, description="A list of task run flow run ids to include"
753 )
755 is_null_: Optional[bool] = Field( 1c
756 default=False, description="Filter for task runs with None as their flow run id"
757 )
759 def _get_filter_list( 1c
760 self, db: "PrefectDBInterface"
761 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
762 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
763 if self.is_null_ is True: 1bda
764 filters.append(db.TaskRun.flow_run_id.is_(None))
765 elif self.is_null_ is False and self.any_ is None: 1bda
766 filters.append(db.TaskRun.flow_run_id.is_not(None)) 1bda
767 else:
768 if self.any_ is not None: 1ba
769 filters.append(db.TaskRun.flow_run_id.in_(self.any_)) 1ba
770 return filters 1bda
773class TaskRunFilterId(PrefectFilterBaseModel): 1c
774 """Filter by `TaskRun.id`."""
776 any_: Optional[list[UUID]] = Field( 1c
777 default=None, description="A list of task run ids to include"
778 )
780 def _get_filter_list( 1c
781 self, db: "PrefectDBInterface"
782 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
783 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
784 if self.any_ is not None: 1bda
785 filters.append(db.TaskRun.id.in_(self.any_)) 1ba
786 return filters 1bda
789class TaskRunFilterName(PrefectFilterBaseModel): 1c
790 """Filter by `TaskRun.name`."""
792 any_: Optional[list[str]] = Field( 1c
793 default=None,
794 description="A list of task run names to include",
795 examples=[["my-task-run-1", "my-task-run-2"]],
796 )
798 like_: Optional[str] = Field( 1c
799 default=None,
800 description=(
801 "A case-insensitive partial match. For example, "
802 " passing 'marvin' will match "
803 "'marvin', 'sad-Marvin', and 'marvin-robot'."
804 ),
805 examples=["marvin"],
806 )
808 def _get_filter_list( 1c
809 self, db: "PrefectDBInterface"
810 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
811 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
812 if self.any_ is not None: 1bda
813 filters.append(db.TaskRun.name.in_(self.any_)) 1ba
814 if self.like_ is not None: 1bda
815 filters.append(db.TaskRun.name.ilike(f"%{self.like_}%")) 1ba
816 return filters 1bda
819class TaskRunFilterTags(PrefectOperatorFilterBaseModel): 1c
820 """Filter by `TaskRun.tags`."""
822 all_: Optional[list[str]] = Field( 1c
823 default=None,
824 examples=[["tag-1", "tag-2"]],
825 description=(
826 "A list of tags. Task runs will be returned only if their tags are a"
827 " superset of the list"
828 ),
829 )
830 is_null_: Optional[bool] = Field( 1c
831 default=None, description="If true, only include task runs without tags"
832 )
834 def _get_filter_list( 1c
835 self, db: "PrefectDBInterface"
836 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
837 filters: list[sa.ColumnElement[bool]] = [] 1ba
838 if self.all_ is not None: 1ba
839 filters.append(db.TaskRun.tags.has_all(_as_array(self.all_))) 1ba
840 if self.is_null_ is not None: 1ba
841 filters.append(
842 db.TaskRun.tags == [] if self.is_null_ else db.TaskRun.tags != []
843 )
844 return filters 1ba
847class TaskRunFilterStateType(PrefectFilterBaseModel): 1c
848 """Filter by `TaskRun.state_type`."""
850 any_: Optional[list[schemas.states.StateType]] = Field( 1c
851 default=None, description="A list of task run state types to include"
852 )
854 def _get_filter_list( 1c
855 self, db: "PrefectDBInterface"
856 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
857 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
858 if self.any_ is not None: 1ba
859 filters.append(db.TaskRun.state_type.in_(self.any_))
860 return filters 1ba
863class TaskRunFilterStateName(PrefectFilterBaseModel): 1c
864 """Filter by `TaskRun.state_name`."""
866 any_: Optional[list[str]] = Field( 1c
867 default=None, description="A list of task run state names to include"
868 )
870 def _get_filter_list( 1c
871 self, db: "PrefectDBInterface"
872 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
873 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
874 if self.any_ is not None: 1bda
875 filters.append(db.TaskRun.state_name.in_(self.any_)) 1bda
876 return filters 1bda
879class TaskRunFilterState(PrefectOperatorFilterBaseModel): 1c
880 """Filter by `TaskRun.type` and `TaskRun.name`."""
882 type: Optional[TaskRunFilterStateType] = Field( 1c
883 default=None, description="Filter criteria for `TaskRun.state_type`"
884 )
885 name: Optional[TaskRunFilterStateName] = Field( 1c
886 default=None, description="Filter criteria for `TaskRun.state_name`"
887 )
889 def _get_filter_list( 1c
890 self, db: "PrefectDBInterface"
891 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
892 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
893 if self.type is not None: 1bda
894 filter = self.type.as_sql_filter() 1ba
895 if isinstance(filter, sa.BinaryExpression): 1ba
896 filters.append(filter)
897 if self.name is not None: 1bda
898 filter = self.name.as_sql_filter() 1bda
899 if isinstance(filter, sa.BinaryExpression): 1bda
900 filters.append(filter) 1bda
901 return filters 1bda
904class TaskRunFilterSubFlowRuns(PrefectFilterBaseModel): 1c
905 """Filter by `TaskRun.subflow_run`."""
907 exists_: Optional[bool] = Field( 1c
908 default=None,
909 description=(
910 "If true, only include task runs that are subflow run parents; if false,"
911 " exclude parent task runs"
912 ),
913 )
915 def _get_filter_list( 1c
916 self, db: "PrefectDBInterface"
917 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
918 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
919 if self.exists_ is True: 1bda
920 filters.append(db.TaskRun.subflow_run.has())
921 elif self.exists_ is False: 1bda
922 filters.append(sa.not_(db.TaskRun.subflow_run.has())) 1ba
923 return filters 1bda
926class TaskRunFilterStartTime(PrefectFilterBaseModel): 1c
927 """Filter by `TaskRun.start_time`."""
929 before_: Optional[DateTime] = Field( 1c
930 default=None,
931 description="Only include task runs starting at or before this time",
932 )
933 after_: Optional[DateTime] = Field( 1c
934 default=None,
935 description="Only include task runs starting at or after this time",
936 )
937 is_null_: Optional[bool] = Field( 1c
938 default=None, description="If true, only return task runs without a start time"
939 )
941 def _get_filter_list( 1c
942 self, db: "PrefectDBInterface"
943 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
944 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bdea
945 if self.before_ is not None: 1bdea
946 filters.append(db.TaskRun.start_time <= self.before_) 1ba
947 if self.after_ is not None: 1bdea
948 filters.append(db.TaskRun.start_time >= self.after_)
949 if self.is_null_ is not None: 1bdea
950 filters.append( 1ea
951 db.TaskRun.start_time.is_(None)
952 if self.is_null_
953 else db.TaskRun.start_time.is_not(None)
954 )
955 return filters 1bdea
958class TaskRunFilterExpectedStartTime(PrefectFilterBaseModel): 1c
959 """Filter by `TaskRun.expected_start_time`."""
961 before_: Optional[DateTime] = Field( 1c
962 default=None,
963 description="Only include task runs expected to start at or before this time",
964 )
965 after_: Optional[DateTime] = Field( 1c
966 default=None,
967 description="Only include task runs expected to start at or after this time",
968 )
970 def _get_filter_list( 1c
971 self, db: "PrefectDBInterface"
972 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
973 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
974 if self.before_ is not None: 1bda
975 filters.append(db.TaskRun.expected_start_time <= self.before_) 1ba
976 if self.after_ is not None: 1bda
977 filters.append(db.TaskRun.expected_start_time >= self.after_) 1ba
978 return filters 1bda
981class TaskRunFilter(PrefectOperatorFilterBaseModel): 1c
982 """Filter task runs. Only task runs matching all criteria will be returned"""
984 id: Optional[TaskRunFilterId] = Field( 1c
985 default=None, description="Filter criteria for `TaskRun.id`"
986 )
987 name: Optional[TaskRunFilterName] = Field( 1c
988 default=None, description="Filter criteria for `TaskRun.name`"
989 )
990 tags: Optional[TaskRunFilterTags] = Field( 1c
991 default=None, description="Filter criteria for `TaskRun.tags`"
992 )
993 state: Optional[TaskRunFilterState] = Field( 1c
994 default=None, description="Filter criteria for `TaskRun.state`"
995 )
996 start_time: Optional[TaskRunFilterStartTime] = Field( 1c
997 default=None, description="Filter criteria for `TaskRun.start_time`"
998 )
999 expected_start_time: Optional[TaskRunFilterExpectedStartTime] = Field( 1c
1000 default=None, description="Filter criteria for `TaskRun.expected_start_time`"
1001 )
1002 subflow_runs: Optional[TaskRunFilterSubFlowRuns] = Field( 1c
1003 default=None, description="Filter criteria for `TaskRun.subflow_run`"
1004 )
1005 flow_run_id: Optional[TaskRunFilterFlowRunId] = Field( 1c
1006 default=None, description="Filter criteria for `TaskRun.flow_run_id`"
1007 )
1009 def _get_filter_list( 1c
1010 self, db: "PrefectDBInterface"
1011 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1012 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bdea
1014 if self.id is not None: 1bdea
1015 filters.append(self.id.as_sql_filter()) 1bda
1016 if self.name is not None: 1bdea
1017 filters.append(self.name.as_sql_filter()) 1bda
1018 if self.tags is not None: 1bdea
1019 filters.append(self.tags.as_sql_filter()) 1ba
1020 if self.state is not None: 1bdea
1021 filters.append(self.state.as_sql_filter()) 1bda
1022 if self.start_time is not None: 1bdea
1023 filters.append(self.start_time.as_sql_filter()) 1bdea
1024 if self.expected_start_time is not None: 1bdea
1025 filters.append(self.expected_start_time.as_sql_filter()) 1bda
1026 if self.subflow_runs is not None: 1bdea
1027 filters.append(self.subflow_runs.as_sql_filter()) 1bda
1028 if self.flow_run_id is not None: 1bdea
1029 filters.append(self.flow_run_id.as_sql_filter()) 1bda
1031 return filters 1bdea
1034class DeploymentFilterId(PrefectFilterBaseModel): 1c
1035 """Filter by `Deployment.id`."""
1037 any_: Optional[list[UUID]] = Field( 1c
1038 default=None, description="A list of deployment ids to include"
1039 )
1040 not_any_: Optional[list[UUID]] = Field( 1c
1041 default=None, description="A list of deployment ids to exclude"
1042 )
1044 def _get_filter_list( 1c
1045 self, db: "PrefectDBInterface"
1046 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1047 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1048 if self.any_ is not None: 1bda
1049 filters.append(db.Deployment.id.in_(self.any_)) 1bda
1050 if self.not_any_ is not None: 1bda
1051 filters.append(db.Deployment.id.not_in(self.not_any_)) 1b
1052 return filters 1bda
1055class DeploymentFilterName(PrefectFilterBaseModel): 1c
1056 """Filter by `Deployment.name`."""
1058 any_: Optional[list[str]] = Field( 1c
1059 default=None,
1060 description="A list of deployment names to include",
1061 examples=[["my-deployment-1", "my-deployment-2"]],
1062 )
1064 like_: Optional[str] = Field( 1c
1065 default=None,
1066 description=(
1067 "A case-insensitive partial match. For example, "
1068 " passing 'marvin' will match "
1069 "'marvin', 'sad-Marvin', and 'marvin-robot'."
1070 ),
1071 examples=["marvin"],
1072 )
1074 def _get_filter_list( 1c
1075 self, db: "PrefectDBInterface"
1076 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1077 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1078 if self.any_ is not None: 1bda
1079 filters.append(db.Deployment.name.in_(self.any_)) 1bda
1080 if self.like_ is not None: 1bda
1081 filters.append(db.Deployment.name.ilike(f"%{self.like_}%")) 1ba
1082 return filters 1bda
1085class DeploymentOrFlowNameFilter(PrefectFilterBaseModel): 1c
1086 """Filter by `Deployment.name` or `Flow.name` with a single input string for ilike filtering."""
1088 like_: Optional[str] = Field( 1c
1089 default=None,
1090 description=(
1091 "A case-insensitive partial match on deployment or flow names. For example, "
1092 "passing 'example' might match deployments or flows with 'example' in their names."
1093 ),
1094 )
1096 def _get_filter_list( 1c
1097 self, db: "PrefectDBInterface"
1098 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1099 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1100 if self.like_ is not None: 1bda
1101 deployment_name_filter = db.Deployment.name.ilike(f"%{self.like_}%") 1ba
1103 flow_name_filter = db.Deployment.flow.has( 1ba
1104 db.Flow.name.ilike(f"%{self.like_}%")
1105 )
1106 filters.append(sa.or_(deployment_name_filter, flow_name_filter)) 1ba
1107 return filters 1bda
1110class DeploymentFilterPaused(PrefectFilterBaseModel): 1c
1111 """Filter by `Deployment.paused`."""
1113 eq_: Optional[bool] = Field( 1c
1114 default=None,
1115 description="Only returns where deployment is/is not paused",
1116 )
1118 def _get_filter_list( 1c
1119 self, db: "PrefectDBInterface"
1120 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1121 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1122 if self.eq_ is not None: 1bda
1123 filters.append(db.Deployment.paused.is_(self.eq_)) 1ba
1124 return filters 1bda
1127class DeploymentFilterWorkQueueName(PrefectFilterBaseModel): 1c
1128 """Filter by `Deployment.work_queue_name`."""
1130 any_: Optional[list[str]] = Field( 1c
1131 default=None,
1132 description="A list of work queue names to include",
1133 examples=[["work_queue_1", "work_queue_2"]],
1134 )
1136 def _get_filter_list( 1c
1137 self, db: "PrefectDBInterface"
1138 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1139 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1140 if self.any_ is not None: 1bda
1141 filters.append(db.Deployment.work_queue_name.in_(self.any_)) 1da
1142 return filters 1bda
1145class DeploymentFilterConcurrencyLimit(PrefectFilterBaseModel): 1c
1146 """DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`."""
1148 ge_: Optional[int] = Field( 1c
1149 default=None,
1150 description="Only include deployments with a concurrency limit greater than or equal to this value",
1151 )
1153 le_: Optional[int] = Field( 1c
1154 default=None,
1155 description="Only include deployments with a concurrency limit less than or equal to this value",
1156 )
1157 is_null_: Optional[bool] = Field( 1c
1158 default=None,
1159 description="If true, only include deployments without a concurrency limit",
1160 )
1162 def _get_filter_list( 1c
1163 self, db: "PrefectDBInterface"
1164 ) -> list[sa.ColumnElement[bool]]:
1165 # This used to filter on an `int` column that was moved to a `ForeignKey` relationship
1166 # This filter is now deprecated rather than support filtering on the new relationship
1167 return []
1170class DeploymentFilterTags(PrefectOperatorFilterBaseModel): 1c
1171 """Filter by `Deployment.tags`."""
1173 all_: Optional[list[str]] = Field( 1c
1174 default=None,
1175 examples=[["tag-1", "tag-2"]],
1176 description=(
1177 "A list of tags. Deployments will be returned only if their tags are a"
1178 " superset of the list"
1179 ),
1180 )
1181 any_: Optional[list[str]] = Field( 1c
1182 default=None,
1183 examples=[["tag-1", "tag-2"]],
1184 description="A list of tags to include",
1185 )
1187 is_null_: Optional[bool] = Field( 1c
1188 default=None, description="If true, only include deployments without tags"
1189 )
1191 def _get_filter_list( 1c
1192 self, db: "PrefectDBInterface"
1193 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1194 from prefect.server.database import orm_models 1bda
1196 filters: list[sa.ColumnElement[bool]] = [] 1bda
1197 if self.all_ is not None: 1bda
1198 filters.append(orm_models.Deployment.tags.has_all(_as_array(self.all_))) 1ba
1199 if self.any_ is not None: 1bda
1200 filters.append(orm_models.Deployment.tags.has_any(_as_array(self.any_))) 1bda
1201 if self.is_null_ is not None: 1bda
1202 filters.append( 1ba
1203 db.Deployment.tags == [] if self.is_null_ else db.Deployment.tags != []
1204 )
1205 return filters 1bda
1208class DeploymentFilter(PrefectOperatorFilterBaseModel): 1c
1209 """Filter for deployments. Only deployments matching all criteria will be returned."""
1211 id: Optional[DeploymentFilterId] = Field( 1c
1212 default=None, description="Filter criteria for `Deployment.id`"
1213 )
1214 name: Optional[DeploymentFilterName] = Field( 1c
1215 default=None, description="Filter criteria for `Deployment.name`"
1216 )
1217 flow_or_deployment_name: Optional[DeploymentOrFlowNameFilter] = Field( 1c
1218 default=None, description="Filter criteria for `Deployment.name` or `Flow.name`"
1219 )
1220 paused: Optional[DeploymentFilterPaused] = Field( 1c
1221 default=None, description="Filter criteria for `Deployment.paused`"
1222 )
1223 tags: Optional[DeploymentFilterTags] = Field( 1c
1224 default=None, description="Filter criteria for `Deployment.tags`"
1225 )
1226 work_queue_name: Optional[DeploymentFilterWorkQueueName] = Field( 1c
1227 default=None, description="Filter criteria for `Deployment.work_queue_name`"
1228 )
1229 concurrency_limit: Optional[DeploymentFilterConcurrencyLimit] = Field( 1c
1230 default=None,
1231 description="DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`. If provided, will be ignored for backwards-compatibility. Will be removed after December 2024.",
1232 deprecated=True,
1233 )
1235 def _get_filter_list( 1c
1236 self, db: "PrefectDBInterface"
1237 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1238 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1239 if self.id is not None: 1bda
1240 filters.append(self.id.as_sql_filter()) 1bda
1241 if self.name is not None: 1bda
1242 filters.append(self.name.as_sql_filter()) 1bda
1243 if self.flow_or_deployment_name is not None: 1bda
1244 filters.append(self.flow_or_deployment_name.as_sql_filter()) 1bda
1245 if self.paused is not None: 1bda
1246 filters.append(self.paused.as_sql_filter()) 1bda
1247 if self.tags is not None: 1bda
1248 filters.append(self.tags.as_sql_filter()) 1bda
1249 if self.work_queue_name is not None: 1bda
1250 filters.append(self.work_queue_name.as_sql_filter()) 1bda
1252 return filters 1bda
1255class DeploymentScheduleFilterActive(PrefectFilterBaseModel): 1c
1256 """Filter by `DeploymentSchedule.active`."""
1258 eq_: Optional[bool] = Field( 1c
1259 default=None,
1260 description="Only returns where deployment schedule is/is not active",
1261 )
1263 def _get_filter_list( 1c
1264 self, db: "PrefectDBInterface"
1265 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1266 filters: list[sa.ColumnExpressionArgument[bool]] = []
1267 if self.eq_ is not None:
1268 filters.append(db.DeploymentSchedule.active.is_(self.eq_))
1269 return filters
1272class DeploymentScheduleFilter(PrefectOperatorFilterBaseModel): 1c
1273 """Filter for deployments. Only deployments matching all criteria will be returned."""
1275 active: Optional[DeploymentScheduleFilterActive] = Field( 1c
1276 default=None, description="Filter criteria for `DeploymentSchedule.active`"
1277 )
1279 def _get_filter_list( 1c
1280 self, db: "PrefectDBInterface"
1281 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1282 filters: list[sa.ColumnExpressionArgument[bool]] = []
1284 if self.active is not None:
1285 filters.append(self.active.as_sql_filter())
1287 return filters
1290class LogFilterName(PrefectFilterBaseModel): 1c
1291 """Filter by `Log.name`."""
1293 any_: Optional[list[str]] = Field( 1c
1294 default=None,
1295 description="A list of log names to include",
1296 examples=[["prefect.logger.flow_runs", "prefect.logger.task_runs"]],
1297 )
1299 def _get_filter_list( 1c
1300 self, db: "PrefectDBInterface"
1301 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1302 filters: list[sa.ColumnExpressionArgument[bool]] = []
1303 if self.any_ is not None:
1304 filters.append(db.Log.name.in_(self.any_))
1305 return filters
1308class LogFilterLevel(PrefectFilterBaseModel): 1c
1309 """Filter by `Log.level`."""
1311 ge_: Optional[int] = Field( 1c
1312 default=None,
1313 description="Include logs with a level greater than or equal to this level",
1314 examples=[20],
1315 )
1317 le_: Optional[int] = Field( 1c
1318 default=None,
1319 description="Include logs with a level less than or equal to this level",
1320 examples=[50],
1321 )
1323 def _get_filter_list( 1c
1324 self, db: "PrefectDBInterface"
1325 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1326 filters: list[sa.ColumnExpressionArgument[bool]] = []
1327 if self.ge_ is not None: 1327 ↛ 1329line 1327 didn't jump to line 1329 because the condition on line 1327 was always true
1328 filters.append(db.Log.level >= self.ge_)
1329 if self.le_ is not None: 1329 ↛ 1331line 1329 didn't jump to line 1331 because the condition on line 1329 was always true
1330 filters.append(db.Log.level <= self.le_)
1331 return filters
1334class LogFilterTimestamp(PrefectFilterBaseModel): 1c
1335 """Filter by `Log.timestamp`."""
1337 before_: Optional[DateTime] = Field( 1c
1338 default=None,
1339 description="Only include logs with a timestamp at or before this time",
1340 )
1341 after_: Optional[DateTime] = Field( 1c
1342 default=None,
1343 description="Only include logs with a timestamp at or after this time",
1344 )
1346 def _get_filter_list( 1c
1347 self, db: "PrefectDBInterface"
1348 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1349 filters: list[sa.ColumnExpressionArgument[bool]] = []
1350 if self.before_ is not None:
1351 filters.append(db.Log.timestamp <= self.before_)
1352 if self.after_ is not None:
1353 filters.append(db.Log.timestamp >= self.after_)
1354 return filters
1357class LogFilterFlowRunId(PrefectFilterBaseModel): 1c
1358 """Filter by `Log.flow_run_id`."""
1360 any_: Optional[list[UUID]] = Field( 1c
1361 default=None, description="A list of flow run IDs to include"
1362 )
1364 def _get_filter_list( 1c
1365 self, db: "PrefectDBInterface"
1366 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1367 filters: list[sa.ColumnExpressionArgument[bool]] = []
1368 if self.any_ is not None: 1368 ↛ 1370line 1368 didn't jump to line 1370 because the condition on line 1368 was always true
1369 filters.append(db.Log.flow_run_id.in_(self.any_))
1370 return filters
1373class LogFilterTaskRunId(PrefectFilterBaseModel): 1c
1374 """Filter by `Log.task_run_id`."""
1376 any_: Optional[list[UUID]] = Field( 1c
1377 default=None, description="A list of task run IDs to include"
1378 )
1380 is_null_: Optional[bool] = Field( 1c
1381 default=None,
1382 description="If true, only include logs without a task run id",
1383 )
1385 def _get_filter_list( 1c
1386 self, db: "PrefectDBInterface"
1387 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1388 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
1389 if self.any_ is not None: 1389 ↛ 1391line 1389 didn't jump to line 1391 because the condition on line 1389 was always true1ba
1390 filters.append(db.Log.task_run_id.in_(self.any_)) 1ba
1391 if self.is_null_ is not None: 1391 ↛ 1392line 1391 didn't jump to line 1392 because the condition on line 1391 was never true1ba
1392 filters.append(
1393 db.Log.task_run_id.is_(None)
1394 if self.is_null_
1395 else db.Log.task_run_id.is_not(None)
1396 )
1397 return filters 1ba
1400class LogFilterTextSearch(PrefectFilterBaseModel): 1c
1401 """Filter by text search across log content."""
1403 query: str = Field( 1c
1404 description="Text search query string",
1405 examples=[
1406 "error",
1407 "error -debug",
1408 '"connection timeout"',
1409 "+required -excluded",
1410 ],
1411 max_length=200,
1412 )
1414 def includes(self, log: "Log") -> bool: 1c
1415 """Check if this text filter includes the given log."""
1416 from prefect.server.schemas.core import Log
1418 if not isinstance(log, Log):
1419 raise TypeError(f"Expected Log object, got {type(log)}")
1421 # Parse query into components
1422 parsed = parse_text_search_query(self.query)
1424 # Build searchable text from message and logger name
1425 searchable_text = f"{log.message} {log.name}".lower()
1427 # Check include terms (OR logic)
1428 if parsed.include:
1429 include_match = any(
1430 term.lower() in searchable_text for term in parsed.include
1431 )
1432 if not include_match:
1433 return False
1435 # Check exclude terms (NOT logic)
1436 if parsed.exclude:
1437 exclude_match = any(
1438 term.lower() in searchable_text for term in parsed.exclude
1439 )
1440 if exclude_match:
1441 return False
1443 # Check required terms (AND logic - future feature)
1444 if parsed.required:
1445 required_match = all(
1446 term.lower() in searchable_text for term in parsed.required
1447 )
1448 if not required_match:
1449 return False
1451 return True
1453 def _get_filter_list( 1c
1454 self, db: "PrefectDBInterface"
1455 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1456 """Build SQLAlchemy WHERE clauses for text search"""
1457 filters: list[sa.ColumnExpressionArgument[bool]] = []
1459 if not self.query.strip(): 1459 ↛ 1460line 1459 didn't jump to line 1460 because the condition on line 1459 was never true
1460 return filters
1462 parsed = parse_text_search_query(self.query)
1464 # Build combined searchable text field (message + name)
1465 searchable_field = sa.func.concat(db.Log.message, " ", db.Log.name)
1467 # Handle include terms (OR logic)
1468 if parsed.include:
1469 include_conditions = []
1470 for term in parsed.include:
1471 include_conditions.append(
1472 sa.func.lower(searchable_field).contains(term.lower())
1473 )
1475 if include_conditions: 1475 ↛ 1479line 1475 didn't jump to line 1479 because the condition on line 1475 was always true
1476 filters.append(sa.or_(*include_conditions))
1478 # Handle exclude terms (NOT logic)
1479 if parsed.exclude:
1480 exclude_conditions = []
1481 for term in parsed.exclude:
1482 exclude_conditions.append(
1483 ~sa.func.lower(searchable_field).contains(term.lower())
1484 )
1486 if exclude_conditions: 1486 ↛ 1490line 1486 didn't jump to line 1490 because the condition on line 1486 was always true
1487 filters.append(sa.and_(*exclude_conditions))
1489 # Handle required terms (AND logic - future feature)
1490 if parsed.required:
1491 required_conditions = []
1492 for term in parsed.required:
1493 required_conditions.append(
1494 sa.func.lower(searchable_field).contains(term.lower())
1495 )
1497 if required_conditions: 1497 ↛ 1500line 1497 didn't jump to line 1500 because the condition on line 1497 was always true
1498 filters.append(sa.and_(*required_conditions))
1500 return filters
1503class LogFilter(PrefectOperatorFilterBaseModel): 1c
1504 """Filter logs. Only logs matching all criteria will be returned"""
1506 level: Optional[LogFilterLevel] = Field( 1c
1507 default=None, description="Filter criteria for `Log.level`"
1508 )
1509 timestamp: Optional[LogFilterTimestamp] = Field( 1c
1510 default=None, description="Filter criteria for `Log.timestamp`"
1511 )
1512 flow_run_id: Optional[LogFilterFlowRunId] = Field( 1c
1513 default=None, description="Filter criteria for `Log.flow_run_id`"
1514 )
1515 task_run_id: Optional[LogFilterTaskRunId] = Field( 1c
1516 default=None, description="Filter criteria for `Log.task_run_id`"
1517 )
1518 text: Optional[LogFilterTextSearch] = Field( 1c
1519 default=None, description="Filter criteria for text search across log content"
1520 )
1522 def _get_filter_list( 1c
1523 self, db: "PrefectDBInterface"
1524 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1525 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
1527 if self.level is not None: 1ba
1528 filters.append(self.level.as_sql_filter())
1529 if self.timestamp is not None: 1529 ↛ 1530line 1529 didn't jump to line 1530 because the condition on line 1529 was never true1ba
1530 filters.append(self.timestamp.as_sql_filter())
1531 if self.flow_run_id is not None: 1ba
1532 filters.append(self.flow_run_id.as_sql_filter())
1533 if self.task_run_id is not None: 1ba
1534 filters.append(self.task_run_id.as_sql_filter()) 1ba
1535 if self.text is not None: 1ba
1536 filters.extend(self.text._get_filter_list(db))
1538 return filters 1ba
1541class FilterSet(PrefectBaseModel): 1c
1542 """A collection of filters for common objects"""
1544 flows: FlowFilter = Field( 1c
1545 default_factory=FlowFilter, description="Filters that apply to flows"
1546 )
1547 flow_runs: FlowRunFilter = Field( 1c
1548 default_factory=FlowRunFilter, description="Filters that apply to flow runs"
1549 )
1550 task_runs: TaskRunFilter = Field( 1c
1551 default_factory=TaskRunFilter, description="Filters that apply to task runs"
1552 )
1553 deployments: DeploymentFilter = Field( 1c
1554 default_factory=DeploymentFilter,
1555 description="Filters that apply to deployments",
1556 )
1559class BlockTypeFilterName(PrefectFilterBaseModel): 1c
1560 """Filter by `BlockType.name`"""
1562 like_: Optional[str] = Field( 1c
1563 default=None,
1564 description=(
1565 "A case-insensitive partial match. For example, "
1566 " passing 'marvin' will match "
1567 "'marvin', 'sad-Marvin', and 'marvin-robot'."
1568 ),
1569 examples=["marvin"],
1570 )
1572 def _get_filter_list( 1c
1573 self, db: "PrefectDBInterface"
1574 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1575 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1da
1576 if self.like_ is not None: 1da
1577 filters.append(db.BlockType.name.ilike(f"%{self.like_}%"))
1578 return filters 1da
1581class BlockTypeFilterSlug(PrefectFilterBaseModel): 1c
1582 """Filter by `BlockType.slug`"""
1584 any_: Optional[list[str]] = Field( 1c
1585 default=None, description="A list of slugs to match"
1586 )
1588 def _get_filter_list( 1c
1589 self, db: "PrefectDBInterface"
1590 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1591 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
1592 if self.any_ is not None: 1ba
1593 filters.append(db.BlockType.slug.in_(self.any_)) 1ba
1595 return filters 1ba
1598class BlockTypeFilter(PrefectFilterBaseModel): 1c
1599 """Filter BlockTypes"""
1601 name: Optional[BlockTypeFilterName] = Field( 1c
1602 default=None, description="Filter criteria for `BlockType.name`"
1603 )
1605 slug: Optional[BlockTypeFilterSlug] = Field( 1c
1606 default=None, description="Filter criteria for `BlockType.slug`"
1607 )
1609 def _get_filter_list( 1c
1610 self, db: "PrefectDBInterface"
1611 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1612 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1614 if self.name is not None: 1bda
1615 filters.append(self.name.as_sql_filter()) 1da
1616 if self.slug is not None: 1bda
1617 filters.append(self.slug.as_sql_filter()) 1ba
1619 return filters 1bda
1622class BlockSchemaFilterBlockTypeId(PrefectFilterBaseModel): 1c
1623 """Filter by `BlockSchema.block_type_id`."""
1625 any_: Optional[list[UUID]] = Field( 1c
1626 default=None, description="A list of block type ids to include"
1627 )
1629 def _get_filter_list( 1c
1630 self, db: "PrefectDBInterface"
1631 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1632 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
1633 if self.any_ is not None: 1ba
1634 filters.append(db.BlockSchema.block_type_id.in_(self.any_))
1635 return filters 1ba
1638class BlockSchemaFilterId(PrefectFilterBaseModel): 1c
1639 """Filter by BlockSchema.id"""
1641 any_: Optional[list[UUID]] = Field( 1c
1642 default=None, description="A list of IDs to include"
1643 )
1645 def _get_filter_list( 1c
1646 self, db: "PrefectDBInterface"
1647 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1648 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bfdgea
1649 if self.any_ is not None: 1bfdgea
1650 filters.append(db.BlockSchema.id.in_(self.any_)) 1bfdgea
1651 return filters 1bfdgea
1654class BlockSchemaFilterCapabilities(PrefectFilterBaseModel): 1c
1655 """Filter by `BlockSchema.capabilities`"""
1657 all_: Optional[list[str]] = Field( 1c
1658 default=None,
1659 examples=[["write-storage", "read-storage"]],
1660 description=(
1661 "A list of block capabilities. Block entities will be returned only if an"
1662 " associated block schema has a superset of the defined capabilities."
1663 ),
1664 )
1666 def _get_filter_list( 1c
1667 self, db: "PrefectDBInterface"
1668 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1669 filters: list[sa.ColumnElement[bool]] = [] 1ba
1670 if self.all_ is not None: 1ba
1671 filters.append(db.BlockSchema.capabilities.has_all(_as_array(self.all_))) 1ba
1672 return filters 1ba
1675class BlockSchemaFilterVersion(PrefectFilterBaseModel): 1c
1676 """Filter by `BlockSchema.capabilities`"""
1678 any_: Optional[list[str]] = Field( 1c
1679 default=None,
1680 examples=[["2.0.0", "2.1.0"]],
1681 description="A list of block schema versions.",
1682 )
1684 def _get_filter_list( 1c
1685 self, db: "PrefectDBInterface"
1686 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1687 filters: list[sa.ColumnElement[bool]] = [] 1ba
1688 if self.any_ is not None: 1ba
1689 filters.append(db.BlockSchema.version.in_(self.any_)) 1ba
1690 return filters 1ba
1693class BlockSchemaFilter(PrefectOperatorFilterBaseModel): 1c
1694 """Filter BlockSchemas"""
1696 block_type_id: Optional[BlockSchemaFilterBlockTypeId] = Field( 1c
1697 default=None, description="Filter criteria for `BlockSchema.block_type_id`"
1698 )
1699 block_capabilities: Optional[BlockSchemaFilterCapabilities] = Field( 1c
1700 default=None, description="Filter criteria for `BlockSchema.capabilities`"
1701 )
1702 id: Optional[BlockSchemaFilterId] = Field( 1c
1703 default=None, description="Filter criteria for `BlockSchema.id`"
1704 )
1705 version: Optional[BlockSchemaFilterVersion] = Field( 1c
1706 default=None, description="Filter criteria for `BlockSchema.version`"
1707 )
1709 def _get_filter_list( 1c
1710 self, db: "PrefectDBInterface"
1711 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1712 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bfdgea
1714 if self.block_type_id is not None: 1bfdgea
1715 filters.append(self.block_type_id.as_sql_filter()) 1ba
1716 if self.block_capabilities is not None: 1bfdgea
1717 filters.append(self.block_capabilities.as_sql_filter()) 1ba
1718 if self.id is not None: 1bfdgea
1719 filters.append(self.id.as_sql_filter()) 1bfdgea
1720 if self.version is not None: 1bfdgea
1721 filters.append(self.version.as_sql_filter()) 1ba
1723 return filters 1bfdgea
1726class BlockDocumentFilterIsAnonymous(PrefectFilterBaseModel): 1c
1727 """Filter by `BlockDocument.is_anonymous`."""
1729 eq_: Optional[bool] = Field( 1c
1730 default=None,
1731 description=(
1732 "Filter block documents for only those that are or are not anonymous."
1733 ),
1734 )
1736 def _get_filter_list( 1c
1737 self, db: "PrefectDBInterface"
1738 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1739 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
1740 if self.eq_ is not None: 1740 ↛ 1742line 1740 didn't jump to line 1742 because the condition on line 1740 was always true1ba
1741 filters.append(db.BlockDocument.is_anonymous.is_(self.eq_)) 1ba
1742 return filters 1ba
1745class BlockDocumentFilterBlockTypeId(PrefectFilterBaseModel): 1c
1746 """Filter by `BlockDocument.block_type_id`."""
1748 any_: Optional[list[UUID]] = Field( 1c
1749 default=None, description="A list of block type ids to include"
1750 )
1752 def _get_filter_list( 1c
1753 self, db: "PrefectDBInterface"
1754 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1755 filters: list[sa.ColumnExpressionArgument[bool]] = []
1756 if self.any_ is not None: 1756 ↛ 1757line 1756 didn't jump to line 1757 because the condition on line 1756 was never true
1757 filters.append(db.BlockDocument.block_type_id.in_(self.any_))
1758 return filters
1761class BlockDocumentFilterId(PrefectFilterBaseModel): 1c
1762 """Filter by `BlockDocument.id`."""
1764 any_: Optional[list[UUID]] = Field( 1c
1765 default=None, description="A list of block ids to include"
1766 )
1768 def _get_filter_list( 1c
1769 self, db: "PrefectDBInterface"
1770 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1771 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bfdga
1772 if self.any_ is not None: 1bfdga
1773 filters.append(db.BlockDocument.id.in_(self.any_)) 1bfdga
1774 return filters 1bfdga
1777class BlockDocumentFilterName(PrefectFilterBaseModel): 1c
1778 """Filter by `BlockDocument.name`."""
1780 any_: Optional[list[str]] = Field( 1c
1781 default=None, description="A list of block names to include"
1782 )
1783 like_: Optional[str] = Field( 1c
1784 default=None,
1785 description=(
1786 "A string to match block names against. This can include "
1787 "SQL wildcard characters like `%` and `_`."
1788 ),
1789 examples=["my-block%"],
1790 )
1792 def _get_filter_list( 1c
1793 self, db: "PrefectDBInterface"
1794 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1795 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
1796 if self.any_ is not None: 1ba
1797 filters.append(db.BlockDocument.name.in_(self.any_)) 1ba
1798 if self.like_ is not None: 1ba
1799 filters.append(db.BlockDocument.name.ilike(f"%{self.like_}%"))
1800 return filters 1ba
1803class BlockDocumentFilter(PrefectOperatorFilterBaseModel): 1c
1804 """Filter BlockDocuments. Only BlockDocuments matching all criteria will be returned"""
1806 id: Optional[BlockDocumentFilterId] = Field( 1c
1807 default=None, description="Filter criteria for `BlockDocument.id`"
1808 )
1809 is_anonymous: Optional[BlockDocumentFilterIsAnonymous] = Field( 1c
1810 # default is to exclude anonymous blocks
1811 BlockDocumentFilterIsAnonymous(eq_=False),
1812 description=(
1813 "Filter criteria for `BlockDocument.is_anonymous`. "
1814 "Defaults to excluding anonymous blocks."
1815 ),
1816 )
1817 block_type_id: Optional[BlockDocumentFilterBlockTypeId] = Field( 1c
1818 default=None, description="Filter criteria for `BlockDocument.block_type_id`"
1819 )
1820 name: Optional[BlockDocumentFilterName] = Field( 1c
1821 default=None, description="Filter criteria for `BlockDocument.name`"
1822 )
1824 def _get_filter_list( 1c
1825 self, db: "PrefectDBInterface"
1826 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1827 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bfdga
1828 if self.id is not None: 1bfdga
1829 filters.append(self.id.as_sql_filter()) 1bfdga
1830 if self.is_anonymous is not None: 1bfdga
1831 filters.append(self.is_anonymous.as_sql_filter()) 1ba
1832 if self.block_type_id is not None: 1bfdga
1833 filters.append(self.block_type_id.as_sql_filter())
1834 if self.name is not None: 1bfdga
1835 filters.append(self.name.as_sql_filter()) 1ba
1836 return filters 1bfdga
1839class WorkQueueFilterId(PrefectFilterBaseModel): 1c
1840 """Filter by `WorkQueue.id`."""
1842 any_: Optional[list[UUID]] = Field( 1c
1843 default=None,
1844 description="A list of work queue ids to include",
1845 )
1847 def _get_filter_list( 1c
1848 self, db: "PrefectDBInterface"
1849 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1850 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
1851 if self.any_ is not None: 1851 ↛ 1852line 1851 didn't jump to line 1852 because the condition on line 1851 was never true1ba
1852 filters.append(db.WorkQueue.id.in_(self.any_))
1853 return filters 1ba
1856class WorkQueueFilterName(PrefectFilterBaseModel): 1c
1857 """Filter by `WorkQueue.name`."""
1859 any_: Optional[list[str]] = Field( 1c
1860 default=None,
1861 description="A list of work queue names to include",
1862 examples=[["wq-1", "wq-2"]],
1863 )
1865 startswith_: Optional[list[str]] = Field( 1c
1866 default=None,
1867 description=(
1868 "A list of case-insensitive starts-with matches. For example, "
1869 " passing 'marvin' will match "
1870 "'marvin', and 'Marvin-robot', but not 'sad-marvin'."
1871 ),
1872 examples=[["marvin", "Marvin-robot"]],
1873 )
1875 def _get_filter_list( 1c
1876 self, db: "PrefectDBInterface"
1877 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1878 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
1879 if self.any_ is not None: 1ba
1880 filters.append(db.WorkQueue.name.in_(self.any_)) 1ba
1881 if self.startswith_ is not None: 1ba
1882 filters.append( 1ba
1883 sa.or_(
1884 *[db.WorkQueue.name.ilike(f"{item}%") for item in self.startswith_]
1885 )
1886 )
1887 return filters 1ba
1890class WorkQueueFilter(PrefectOperatorFilterBaseModel): 1c
1891 """Filter work queues. Only work queues matching all criteria will be
1892 returned"""
1894 id: Optional[WorkQueueFilterId] = Field( 1c
1895 default=None, description="Filter criteria for `WorkQueue.id`"
1896 )
1898 name: Optional[WorkQueueFilterName] = Field( 1c
1899 default=None, description="Filter criteria for `WorkQueue.name`"
1900 )
1902 def _get_filter_list( 1c
1903 self, db: "PrefectDBInterface"
1904 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1905 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1907 if self.id is not None: 1bda
1908 filters.append(self.id.as_sql_filter()) 1ba
1909 if self.name is not None: 1bda
1910 filters.append(self.name.as_sql_filter()) 1ba
1912 return filters 1bda
1915class WorkPoolFilterId(PrefectFilterBaseModel): 1c
1916 """Filter by `WorkPool.id`."""
1918 any_: Optional[list[UUID]] = Field( 1c
1919 default=None, description="A list of work pool ids to include"
1920 )
1922 def _get_filter_list( 1c
1923 self, db: "PrefectDBInterface"
1924 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1925 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1926 if self.any_ is not None: 1bda
1927 filters.append(db.WorkPool.id.in_(self.any_))
1928 return filters 1bda
1931class WorkPoolFilterName(PrefectFilterBaseModel): 1c
1932 """Filter by `WorkPool.name`."""
1934 any_: Optional[list[str]] = Field( 1c
1935 default=None, description="A list of work pool names to include"
1936 )
1938 def _get_filter_list( 1c
1939 self, db: "PrefectDBInterface"
1940 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1941 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1942 if self.any_ is not None: 1bda
1943 filters.append(db.WorkPool.name.in_(self.any_)) 1bda
1944 return filters 1bda
1947class WorkPoolFilterType(PrefectFilterBaseModel): 1c
1948 """Filter by `WorkPool.type`."""
1950 any_: Optional[list[str]] = Field( 1c
1951 default=None, description="A list of work pool types to include"
1952 )
1954 def _get_filter_list( 1c
1955 self, db: "PrefectDBInterface"
1956 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1957 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1958 if self.any_ is not None: 1bda
1959 filters.append(db.WorkPool.type.in_(self.any_)) 1ba
1960 return filters 1bda
1963class WorkPoolFilter(PrefectOperatorFilterBaseModel): 1c
1964 """Filter work pools. Only work pools matching all criteria will be returned"""
1966 id: Optional[WorkPoolFilterId] = Field( 1c
1967 default=None, description="Filter criteria for `WorkPool.id`"
1968 )
1969 name: Optional[WorkPoolFilterName] = Field( 1c
1970 default=None, description="Filter criteria for `WorkPool.name`"
1971 )
1972 type: Optional[WorkPoolFilterType] = Field( 1c
1973 default=None, description="Filter criteria for `WorkPool.type`"
1974 )
1976 def _get_filter_list( 1c
1977 self, db: "PrefectDBInterface"
1978 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
1979 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
1981 if self.id is not None: 1bda
1982 filters.append(self.id.as_sql_filter()) 1bda
1983 if self.name is not None: 1bda
1984 filters.append(self.name.as_sql_filter()) 1bda
1985 if self.type is not None: 1bda
1986 filters.append(self.type.as_sql_filter()) 1bda
1988 return filters 1bda
1991class WorkerFilterWorkPoolId(PrefectFilterBaseModel): 1c
1992 """Filter by `Worker.worker_config_id`."""
1994 any_: Optional[list[UUID]] = Field( 1c
1995 default=None, description="A list of work pool ids to include"
1996 )
1998 def _get_filter_list( 1c
1999 self, db: "PrefectDBInterface"
2000 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2001 filters: list[sa.ColumnExpressionArgument[bool]] = []
2002 if self.any_ is not None:
2003 filters.append(db.Worker.work_pool_id.in_(self.any_))
2004 return filters
2007class WorkerFilterStatus(PrefectFilterBaseModel): 1c
2008 """Filter by `Worker.status`."""
2010 any_: Optional[list[schemas.statuses.WorkerStatus]] = Field( 1c
2011 default=None, description="A list of worker statuses to include"
2012 )
2013 not_any_: Optional[list[schemas.statuses.WorkerStatus]] = Field( 1c
2014 default=None, description="A list of worker statuses to exclude"
2015 )
2017 def _get_filter_list( 1c
2018 self, db: "PrefectDBInterface"
2019 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2020 filters: list[sa.ColumnExpressionArgument[bool]] = []
2021 if self.any_ is not None:
2022 filters.append(db.Worker.status.in_(self.any_))
2023 if self.not_any_ is not None:
2024 filters.append(db.Worker.status.notin_(self.not_any_))
2025 return filters
2028class WorkerFilterLastHeartbeatTime(PrefectFilterBaseModel): 1c
2029 """Filter by `Worker.last_heartbeat_time`."""
2031 before_: Optional[DateTime] = Field( 1c
2032 default=None,
2033 description=(
2034 "Only include processes whose last heartbeat was at or before this time"
2035 ),
2036 )
2037 after_: Optional[DateTime] = Field( 1c
2038 default=None,
2039 description=(
2040 "Only include processes whose last heartbeat was at or after this time"
2041 ),
2042 )
2044 def _get_filter_list( 1c
2045 self, db: "PrefectDBInterface"
2046 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2047 filters: list[sa.ColumnExpressionArgument[bool]] = []
2048 if self.before_ is not None:
2049 filters.append(db.Worker.last_heartbeat_time <= self.before_)
2050 if self.after_ is not None:
2051 filters.append(db.Worker.last_heartbeat_time >= self.after_)
2052 return filters
2055class WorkerFilter(PrefectOperatorFilterBaseModel): 1c
2056 """Filter by `Worker.last_heartbeat_time`."""
2058 # worker_config_id: Optional[WorkerFilterWorkPoolId] = Field(
2059 # default=None, description="Filter criteria for `Worker.worker_config_id`"
2060 # )
2062 last_heartbeat_time: Optional[WorkerFilterLastHeartbeatTime] = Field( 1c
2063 default=None,
2064 description="Filter criteria for `Worker.last_heartbeat_time`",
2065 )
2067 status: Optional[WorkerFilterStatus] = Field( 1c
2068 default=None, description="Filter criteria for `Worker.status`"
2069 )
2071 def _get_filter_list( 1c
2072 self, db: "PrefectDBInterface"
2073 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2074 filters: list[sa.ColumnExpressionArgument[bool]] = []
2076 if self.last_heartbeat_time is not None:
2077 filters.append(self.last_heartbeat_time.as_sql_filter())
2079 if self.status is not None:
2080 filters.append(self.status.as_sql_filter())
2082 return filters
2085class ArtifactFilterId(PrefectFilterBaseModel): 1c
2086 """Filter by `Artifact.id`."""
2088 any_: Optional[list[UUID]] = Field( 1c
2089 default=None, description="A list of artifact ids to include"
2090 )
2092 def _get_filter_list( 1c
2093 self, db: "PrefectDBInterface"
2094 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2095 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
2096 if self.any_ is not None: 1ba
2097 filters.append(db.Artifact.id.in_(self.any_))
2098 return filters 1ba
2101class ArtifactFilterKey(PrefectFilterBaseModel): 1c
2102 """Filter by `Artifact.key`."""
2104 any_: Optional[list[str]] = Field( 1c
2105 default=None, description="A list of artifact keys to include"
2106 )
2108 like_: Optional[str] = Field( 1c
2109 default=None,
2110 description=(
2111 "A string to match artifact keys against. This can include "
2112 "SQL wildcard characters like `%` and `_`."
2113 ),
2114 examples=["my-artifact-%"],
2115 )
2117 exists_: Optional[bool] = Field( 1c
2118 default=None,
2119 description=(
2120 "If `true`, only include artifacts with a non-null key. If `false`, "
2121 "only include artifacts with a null key."
2122 ),
2123 )
2125 def _get_filter_list( 1c
2126 self, db: "PrefectDBInterface"
2127 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2128 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bea
2129 if self.any_ is not None: 1bea
2130 filters.append(db.Artifact.key.in_(self.any_)) 1b
2131 if self.like_ is not None: 1bea
2132 filters.append(db.Artifact.key.ilike(f"%{self.like_}%")) 1bea
2133 if self.exists_ is not None: 1bea
2134 filters.append(
2135 db.Artifact.key.isnot(None)
2136 if self.exists_
2137 else db.Artifact.key.is_(None)
2138 )
2139 return filters 1bea
2142class ArtifactFilterFlowRunId(PrefectFilterBaseModel): 1c
2143 """Filter by `Artifact.flow_run_id`."""
2145 any_: Optional[list[UUID]] = Field( 1c
2146 default=None, description="A list of flow run IDs to include"
2147 )
2149 def _get_filter_list( 1c
2150 self, db: "PrefectDBInterface"
2151 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2152 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
2153 if self.any_ is not None: 1ba
2154 filters.append(db.Artifact.flow_run_id.in_(self.any_))
2155 return filters 1ba
2158class ArtifactFilterTaskRunId(PrefectFilterBaseModel): 1c
2159 """Filter by `Artifact.task_run_id`."""
2161 any_: Optional[list[UUID]] = Field( 1c
2162 default=None, description="A list of task run IDs to include"
2163 )
2165 def _get_filter_list( 1c
2166 self, db: "PrefectDBInterface"
2167 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2168 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1ba
2169 if self.any_ is not None: 1ba
2170 filters.append(db.Artifact.task_run_id.in_(self.any_)) 1ba
2171 return filters 1ba
2174class ArtifactFilterType(PrefectFilterBaseModel): 1c
2175 """Filter by `Artifact.type`."""
2177 any_: Optional[list[str]] = Field( 1c
2178 default=None, description="A list of artifact types to include"
2179 )
2180 not_any_: Optional[list[str]] = Field( 1c
2181 default=None, description="A list of artifact types to exclude"
2182 )
2184 def _get_filter_list( 1c
2185 self, db: "PrefectDBInterface"
2186 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2187 filters: list[sa.ColumnExpressionArgument[bool]] = []
2188 if self.any_ is not None:
2189 filters.append(db.Artifact.type.in_(self.any_))
2190 if self.not_any_ is not None:
2191 filters.append(db.Artifact.type.notin_(self.not_any_))
2192 return filters
2195class ArtifactFilter(PrefectOperatorFilterBaseModel): 1c
2196 """Filter artifacts. Only artifacts matching all criteria will be returned"""
2198 id: Optional[ArtifactFilterId] = Field( 1c
2199 default=None, description="Filter criteria for `Artifact.id`"
2200 )
2201 key: Optional[ArtifactFilterKey] = Field( 1c
2202 default=None, description="Filter criteria for `Artifact.key`"
2203 )
2204 flow_run_id: Optional[ArtifactFilterFlowRunId] = Field( 1c
2205 default=None, description="Filter criteria for `Artifact.flow_run_id`"
2206 )
2207 task_run_id: Optional[ArtifactFilterTaskRunId] = Field( 1c
2208 default=None, description="Filter criteria for `Artifact.task_run_id`"
2209 )
2210 type: Optional[ArtifactFilterType] = Field( 1c
2211 default=None, description="Filter criteria for `Artifact.type`"
2212 )
2214 def _get_filter_list( 1c
2215 self, db: "PrefectDBInterface"
2216 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2217 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bdea
2219 if self.id is not None: 1bdea
2220 filters.append(self.id.as_sql_filter()) 1ba
2221 if self.key is not None: 1bdea
2222 filters.append(self.key.as_sql_filter()) 1bea
2223 if self.flow_run_id is not None: 1bdea
2224 filters.append(self.flow_run_id.as_sql_filter()) 1ba
2225 if self.task_run_id is not None: 1bdea
2226 filters.append(self.task_run_id.as_sql_filter()) 1ba
2227 if self.type is not None: 1bdea
2228 filters.append(self.type.as_sql_filter())
2230 return filters 1bdea
2233class ArtifactCollectionFilterLatestId(PrefectFilterBaseModel): 1c
2234 """Filter by `ArtifactCollection.latest_id`."""
2236 any_: Optional[list[UUID]] = Field( 1c
2237 default=None, description="A list of artifact ids to include"
2238 )
2240 def _get_filter_list( 1c
2241 self, db: "PrefectDBInterface"
2242 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2243 filters: list[sa.ColumnExpressionArgument[bool]] = []
2244 if self.any_ is not None:
2245 filters.append(db.ArtifactCollection.latest_id.in_(self.any_))
2246 return filters
2249class ArtifactCollectionFilterKey(PrefectFilterBaseModel): 1c
2250 """Filter by `ArtifactCollection.key`."""
2252 any_: Optional[list[str]] = Field( 1c
2253 default=None, description="A list of artifact keys to include"
2254 )
2256 like_: Optional[str] = Field( 1c
2257 default=None,
2258 description=(
2259 "A string to match artifact keys against. This can include "
2260 "SQL wildcard characters like `%` and `_`."
2261 ),
2262 examples=["my-artifact-%"],
2263 )
2265 exists_: Optional[bool] = Field( 1c
2266 default=None,
2267 description=(
2268 "If `true`, only include artifacts with a non-null key. If `false`, "
2269 "only include artifacts with a null key. Should return all rows in "
2270 "the ArtifactCollection table if specified."
2271 ),
2272 )
2274 def _get_filter_list( 1c
2275 self, db: "PrefectDBInterface"
2276 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2277 filters: list[sa.ColumnExpressionArgument[bool]] = []
2278 if self.any_ is not None: 2278 ↛ 2279line 2278 didn't jump to line 2279 because the condition on line 2278 was never true
2279 filters.append(db.ArtifactCollection.key.in_(self.any_))
2280 if self.like_ is not None: 2280 ↛ 2282line 2280 didn't jump to line 2282 because the condition on line 2280 was always true
2281 filters.append(db.ArtifactCollection.key.ilike(f"%{self.like_}%"))
2282 if self.exists_ is not None: 2282 ↛ 2283line 2282 didn't jump to line 2283 because the condition on line 2282 was never true
2283 filters.append(
2284 db.ArtifactCollection.key.isnot(None)
2285 if self.exists_
2286 else db.ArtifactCollection.key.is_(None)
2287 )
2288 return filters
2291class ArtifactCollectionFilterFlowRunId(PrefectFilterBaseModel): 1c
2292 """Filter by `ArtifactCollection.flow_run_id`."""
2294 any_: Optional[list[UUID]] = Field( 1c
2295 default=None, description="A list of flow run IDs to include"
2296 )
2298 def _get_filter_list( 1c
2299 self, db: "PrefectDBInterface"
2300 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2301 filters: list[sa.ColumnExpressionArgument[bool]] = []
2302 if self.any_ is not None:
2303 filters.append(db.ArtifactCollection.flow_run_id.in_(self.any_))
2304 return filters
2307class ArtifactCollectionFilterTaskRunId(PrefectFilterBaseModel): 1c
2308 """Filter by `ArtifactCollection.task_run_id`."""
2310 any_: Optional[list[UUID]] = Field( 1c
2311 default=None, description="A list of task run IDs to include"
2312 )
2314 def _get_filter_list( 1c
2315 self, db: "PrefectDBInterface"
2316 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2317 filters: list[sa.ColumnExpressionArgument[bool]] = []
2318 if self.any_ is not None:
2319 filters.append(db.ArtifactCollection.task_run_id.in_(self.any_))
2320 return filters
2323class ArtifactCollectionFilterType(PrefectFilterBaseModel): 1c
2324 """Filter by `ArtifactCollection.type`."""
2326 any_: Optional[list[str]] = Field( 1c
2327 default=None, description="A list of artifact types to include"
2328 )
2329 not_any_: Optional[list[str]] = Field( 1c
2330 default=None, description="A list of artifact types to exclude"
2331 )
2333 def _get_filter_list( 1c
2334 self, db: "PrefectDBInterface"
2335 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2336 filters: list[sa.ColumnExpressionArgument[bool]] = []
2337 if self.any_ is not None:
2338 filters.append(db.ArtifactCollection.type.in_(self.any_))
2339 if self.not_any_ is not None:
2340 filters.append(db.ArtifactCollection.type.notin_(self.not_any_))
2341 return filters
2344class ArtifactCollectionFilter(PrefectOperatorFilterBaseModel): 1c
2345 """Filter artifact collections. Only artifact collections matching all criteria will be returned"""
2347 latest_id: Optional[ArtifactCollectionFilterLatestId] = Field( 1c
2348 default=None, description="Filter criteria for `Artifact.id`"
2349 )
2350 key: Optional[ArtifactCollectionFilterKey] = Field( 1c
2351 default=None, description="Filter criteria for `Artifact.key`"
2352 )
2353 flow_run_id: Optional[ArtifactCollectionFilterFlowRunId] = Field( 1c
2354 default=None, description="Filter criteria for `Artifact.flow_run_id`"
2355 )
2356 task_run_id: Optional[ArtifactCollectionFilterTaskRunId] = Field( 1c
2357 default=None, description="Filter criteria for `Artifact.task_run_id`"
2358 )
2359 type: Optional[ArtifactCollectionFilterType] = Field( 1c
2360 default=None, description="Filter criteria for `Artifact.type`"
2361 )
2363 def _get_filter_list( 1c
2364 self, db: "PrefectDBInterface"
2365 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2366 filters: list[sa.ColumnExpressionArgument[bool]] = []
2368 if self.latest_id is not None: 2368 ↛ 2369line 2368 didn't jump to line 2369 because the condition on line 2368 was never true
2369 filters.append(self.latest_id.as_sql_filter())
2370 if self.key is not None: 2370 ↛ 2372line 2370 didn't jump to line 2372 because the condition on line 2370 was always true
2371 filters.append(self.key.as_sql_filter())
2372 if self.flow_run_id is not None: 2372 ↛ 2373line 2372 didn't jump to line 2373 because the condition on line 2372 was never true
2373 filters.append(self.flow_run_id.as_sql_filter())
2374 if self.task_run_id is not None: 2374 ↛ 2375line 2374 didn't jump to line 2375 because the condition on line 2374 was never true
2375 filters.append(self.task_run_id.as_sql_filter())
2376 if self.type is not None: 2376 ↛ 2377line 2376 didn't jump to line 2377 because the condition on line 2376 was never true
2377 filters.append(self.type.as_sql_filter())
2379 return filters
2382class VariableFilterId(PrefectFilterBaseModel): 1c
2383 """Filter by `Variable.id`."""
2385 any_: Optional[list[UUID]] = Field( 1c
2386 default=None, description="A list of variable ids to include"
2387 )
2389 def _get_filter_list( 1c
2390 self, db: "PrefectDBInterface"
2391 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2392 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1da
2393 if self.any_ is not None: 1da
2394 filters.append(db.Variable.id.in_(self.any_))
2395 return filters 1da
2398class VariableFilterName(PrefectFilterBaseModel): 1c
2399 """Filter by `Variable.name`."""
2401 any_: Optional[list[str]] = Field( 1c
2402 default=None, description="A list of variables names to include"
2403 )
2404 like_: Optional[str] = Field( 1c
2405 default=None,
2406 description=(
2407 "A string to match variable names against. This can include "
2408 "SQL wildcard characters like `%` and `_`."
2409 ),
2410 examples=["my_variable_%"],
2411 )
2413 def _get_filter_list( 1c
2414 self, db: "PrefectDBInterface"
2415 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2416 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1da
2417 if self.any_ is not None: 2417 ↛ 2418line 2417 didn't jump to line 2418 because the condition on line 2417 was never true1da
2418 filters.append(db.Variable.name.in_(self.any_))
2419 if self.like_ is not None: 1da
2420 filters.append(db.Variable.name.ilike(f"%{self.like_}%")) 1da
2421 return filters 1da
2424class VariableFilterTags(PrefectOperatorFilterBaseModel): 1c
2425 """Filter by `Variable.tags`."""
2427 all_: Optional[list[str]] = Field( 1c
2428 default=None,
2429 examples=[["tag-1", "tag-2"]],
2430 description=(
2431 "A list of tags. Variables will be returned only if their tags are a"
2432 " superset of the list"
2433 ),
2434 )
2435 is_null_: Optional[bool] = Field( 1c
2436 default=None, description="If true, only include Variables without tags"
2437 )
2439 def _get_filter_list( 1c
2440 self, db: "PrefectDBInterface"
2441 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2442 filters: list[sa.ColumnElement[bool]] = [] 1ba
2443 if self.all_ is not None: 1ba
2444 filters.append(db.Variable.tags.has_all(_as_array(self.all_)))
2445 if self.is_null_ is not None: 2445 ↛ 2446line 2445 didn't jump to line 2446 because the condition on line 2445 was never true1ba
2446 filters.append(
2447 db.Variable.tags == [] if self.is_null_ else db.Variable.tags != []
2448 )
2449 return filters 1ba
2452class VariableFilter(PrefectOperatorFilterBaseModel): 1c
2453 """Filter variables. Only variables matching all criteria will be returned"""
2455 id: Optional[VariableFilterId] = Field( 1c
2456 default=None, description="Filter criteria for `Variable.id`"
2457 )
2458 name: Optional[VariableFilterName] = Field( 1c
2459 default=None, description="Filter criteria for `Variable.name`"
2460 )
2461 tags: Optional[VariableFilterTags] = Field( 1c
2462 default=None, description="Filter criteria for `Variable.tags`"
2463 )
2465 def _get_filter_list( 1c
2466 self, db: "PrefectDBInterface"
2467 ) -> Iterable[sa.ColumnExpressionArgument[bool]]:
2468 filters: list[sa.ColumnExpressionArgument[bool]] = [] 1bda
2470 if self.id is not None: 1bda
2471 filters.append(self.id.as_sql_filter()) 1da
2472 if self.name is not None: 1bda
2473 filters.append(self.name.as_sql_filter()) 1da
2474 if self.tags is not None: 1bda
2475 filters.append(self.tags.as_sql_filter()) 1ba
2476 return filters 1bda