Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/filters.py: 28%

1032 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Schemas that define Prefect REST API filtering operations. 

3 

4Each filter schema includes logic for transforming itself into a SQL `where` clause. 

5""" 

6 

7from collections.abc import Iterable, Sequence 1a

8from typing import TYPE_CHECKING, ClassVar, Optional 1a

9from uuid import UUID 1a

10 

11from pydantic import ConfigDict, Field 1a

12from sqlalchemy.sql.functions import coalesce 1a

13 

14import prefect.server.schemas as schemas 1a

15from prefect.server.utilities.database import db_injector 1a

16from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a

17from prefect.server.utilities.text_search_parser import ( 1a

18 parse_text_search_query, 

19) 

20from prefect.types import DateTime 1a

21from prefect.utilities.collections import AutoEnum 1a

22from prefect.utilities.importtools import lazy_import 1a

23 

24if TYPE_CHECKING: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true1a

25 import sqlalchemy as sa 

26 from sqlalchemy.dialects import postgresql 

27 

28 from prefect.server.database import PrefectDBInterface 

29 from prefect.server.schemas.core import Log 

30else: 

31 sa = lazy_import("sqlalchemy") 1a

32 postgresql = lazy_import("sqlalchemy.dialects.postgresql") 1a

33 

34# TODO: Consider moving the `as_sql_filter` functions out of here since they are a 

35# database model level function and do not properly separate concerns when 

36# present in the schemas module 

37 

38 

39def _as_array(elems: Sequence[str]) -> sa.ColumnElement[Sequence[str]]: 1a

40 return sa.cast(postgresql.array(elems), type_=postgresql.ARRAY(sa.String())) 

41 

42 

43class Operator(AutoEnum): 1a

44 """Operators for combining filter criteria.""" 

45 

46 and_ = AutoEnum.auto() 1a

47 or_ = AutoEnum.auto() 1a

48 

49 

50class PrefectFilterBaseModel(PrefectBaseModel): 1a

51 """Base model for Prefect filters""" 

52 

53 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

54 

55 @db_injector 1a

56 def as_sql_filter(self, db: "PrefectDBInterface") -> sa.ColumnElement[bool]: 1a

57 """Generate SQL filter from provided filter parameters. If no filters parameters are available, return a TRUE filter.""" 

58 filters = self._get_filter_list(db) 

59 if not filters: 

60 return sa.true() 

61 return sa.and_(*filters) 

62 

63 def _get_filter_list( 1a

64 self, db: "PrefectDBInterface" 

65 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

66 """Return a list of boolean filter statements based on filter parameters""" 

67 raise NotImplementedError("_get_filter_list must be implemented") 

68 

69 

70class PrefectOperatorFilterBaseModel(PrefectFilterBaseModel): 1a

71 """Base model for Prefect filters that combines criteria with a user-provided operator""" 

72 

73 operator: Operator = Field( 1a

74 default=Operator.and_, 

75 description="Operator for combining filter criteria. Defaults to 'and_'.", 

76 ) 

77 

78 @db_injector 1a

79 def as_sql_filter(self, db: "PrefectDBInterface") -> sa.ColumnElement[bool]: 1a

80 filters = self._get_filter_list(db) 

81 if not filters: 

82 return sa.true() 

83 return sa.and_(*filters) if self.operator == Operator.and_ else sa.or_(*filters) 

84 

85 

86class FlowFilterId(PrefectFilterBaseModel): 1a

87 """Filter by `Flow.id`.""" 

88 

89 any_: Optional[list[UUID]] = Field( 1a

90 default=None, description="A list of flow ids to include" 

91 ) 

92 

93 def _get_filter_list( 1a

94 self, db: "PrefectDBInterface" 

95 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

96 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

97 if self.any_ is not None: 

98 filters.append(db.Flow.id.in_(self.any_)) 

99 return filters 

100 

101 

102class FlowFilterDeployment(PrefectOperatorFilterBaseModel): 1a

103 """Filter by flows by deployment""" 

104 

105 is_null_: Optional[bool] = Field( 1a

106 default=None, 

107 description="If true, only include flows without deployments", 

108 ) 

109 

110 def _get_filter_list( 1a

111 self, db: "PrefectDBInterface" 

112 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

113 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

114 

115 if self.is_null_ is not None: 

116 deployments_subquery = ( 

117 sa.select(db.Deployment.flow_id).distinct().subquery() 

118 ) 

119 

120 if self.is_null_: 

121 filters.append( 

122 db.Flow.id.not_in(sa.select(deployments_subquery.c.flow_id)) 

123 ) 

124 else: 

125 filters.append( 

126 db.Flow.id.in_(sa.select(deployments_subquery.c.flow_id)) 

127 ) 

128 

129 return filters 

130 

131 

132class FlowFilterName(PrefectFilterBaseModel): 1a

133 """Filter by `Flow.name`.""" 

134 

135 any_: Optional[list[str]] = Field( 1a

136 default=None, 

137 description="A list of flow names to include", 

138 examples=[["my-flow-1", "my-flow-2"]], 

139 ) 

140 

141 like_: Optional[str] = Field( 1a

142 default=None, 

143 description=( 

144 "A case-insensitive partial match. For example, " 

145 " passing 'marvin' will match " 

146 "'marvin', 'sad-Marvin', and 'marvin-robot'." 

147 ), 

148 examples=["marvin"], 

149 ) 

150 

151 def _get_filter_list( 1a

152 self, db: "PrefectDBInterface" 

153 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

154 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

155 if self.any_ is not None: 

156 filters.append(db.Flow.name.in_(self.any_)) 

157 if self.like_ is not None: 

158 filters.append(db.Flow.name.ilike(f"%{self.like_}%")) 

159 return filters 

160 

161 

162class FlowFilterTags(PrefectOperatorFilterBaseModel): 1a

163 """Filter by `Flow.tags`.""" 

164 

165 all_: Optional[list[str]] = Field( 1a

166 default=None, 

167 examples=[["tag-1", "tag-2"]], 

168 description=( 

169 "A list of tags. Flows will be returned only if their tags are a superset" 

170 " of the list" 

171 ), 

172 ) 

173 is_null_: Optional[bool] = Field( 1a

174 default=None, description="If true, only include flows without tags" 

175 ) 

176 

177 def _get_filter_list( 1a

178 self, db: "PrefectDBInterface" 

179 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

180 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

181 if self.all_ is not None: 

182 filters.append(db.Flow.tags.has_all(_as_array(self.all_))) 

183 if self.is_null_ is not None: 

184 filters.append(db.Flow.tags == [] if self.is_null_ else db.Flow.tags != []) 

185 return filters 

186 

187 

188class FlowFilter(PrefectOperatorFilterBaseModel): 1a

189 """Filter for flows. Only flows matching all criteria will be returned.""" 

190 

191 id: Optional[FlowFilterId] = Field( 1a

192 default=None, description="Filter criteria for `Flow.id`" 

193 ) 

194 deployment: Optional[FlowFilterDeployment] = Field( 1a

195 default=None, description="Filter criteria for Flow deployments" 

196 ) 

197 name: Optional[FlowFilterName] = Field( 1a

198 default=None, description="Filter criteria for `Flow.name`" 

199 ) 

200 tags: Optional[FlowFilterTags] = Field( 1a

201 default=None, description="Filter criteria for `Flow.tags`" 

202 ) 

203 

204 def _get_filter_list( 1a

205 self, db: "PrefectDBInterface" 

206 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

207 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

208 

209 if self.id is not None: 

210 filters.append(self.id.as_sql_filter()) 

211 if self.deployment is not None: 

212 filters.append(self.deployment.as_sql_filter()) 

213 if self.name is not None: 

214 filters.append(self.name.as_sql_filter()) 

215 if self.tags is not None: 

216 filters.append(self.tags.as_sql_filter()) 

217 

218 return filters 

219 

220 

221class FlowRunFilterId(PrefectFilterBaseModel): 1a

222 """Filter by `FlowRun.id`.""" 

223 

224 any_: Optional[list[UUID]] = Field( 1a

225 default=None, description="A list of flow run ids to include" 

226 ) 

227 not_any_: Optional[list[UUID]] = Field( 1a

228 default=None, description="A list of flow run ids to exclude" 

229 ) 

230 

231 def _get_filter_list( 1a

232 self, db: "PrefectDBInterface" 

233 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

234 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

235 if self.any_ is not None: 

236 filters.append(db.FlowRun.id.in_(self.any_)) 

237 if self.not_any_ is not None: 

238 filters.append(db.FlowRun.id.not_in(self.not_any_)) 

239 return filters 

240 

241 

242class FlowRunFilterName(PrefectFilterBaseModel): 1a

243 """Filter by `FlowRun.name`.""" 

244 

245 any_: Optional[list[str]] = Field( 1a

246 default=None, 

247 description="A list of flow run names to include", 

248 examples=[["my-flow-run-1", "my-flow-run-2"]], 

249 ) 

250 

251 like_: Optional[str] = Field( 1a

252 default=None, 

253 description=( 

254 "A case-insensitive partial match. For example, " 

255 " passing 'marvin' will match " 

256 "'marvin', 'sad-Marvin', and 'marvin-robot'." 

257 ), 

258 examples=["marvin"], 

259 ) 

260 

261 def _get_filter_list( 1a

262 self, db: "PrefectDBInterface" 

263 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

264 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

265 if self.any_ is not None: 

266 filters.append(db.FlowRun.name.in_(self.any_)) 

267 if self.like_ is not None: 

268 filters.append(db.FlowRun.name.ilike(f"%{self.like_}%")) 

269 return filters 

270 

271 

272class FlowRunFilterTags(PrefectOperatorFilterBaseModel): 1a

273 """Filter by `FlowRun.tags`.""" 

274 

275 all_: Optional[list[str]] = Field( 1a

276 default=None, 

277 examples=[["tag-1", "tag-2"]], 

278 description=( 

279 "A list of tags. Flow runs will be returned only if their tags are a" 

280 " superset of the list" 

281 ), 

282 ) 

283 

284 any_: Optional[list[str]] = Field( 1a

285 default=None, 

286 examples=[["tag-1", "tag-2"]], 

287 description="A list of tags to include", 

288 ) 

289 

290 is_null_: Optional[bool] = Field( 1a

291 default=None, description="If true, only include flow runs without tags" 

292 ) 

293 

294 def _get_filter_list( 1a

295 self, db: "PrefectDBInterface" 

296 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

297 def as_array(elems: Sequence[str]) -> sa.ColumnElement[Sequence[str]]: 

298 return sa.cast(postgresql.array(elems), type_=postgresql.ARRAY(sa.String())) 

299 

300 filters: list[sa.ColumnElement[bool]] = [] 

301 if self.all_ is not None: 

302 filters.append(db.FlowRun.tags.has_all(as_array(self.all_))) 

303 if self.any_ is not None: 

304 filters.append(db.FlowRun.tags.has_any(as_array(self.any_))) 

305 if self.is_null_ is not None: 

306 filters.append( 

307 db.FlowRun.tags == [] if self.is_null_ else db.FlowRun.tags != [] 

308 ) 

309 return filters 

310 

311 

312class FlowRunFilterDeploymentId(PrefectOperatorFilterBaseModel): 1a

313 """Filter by `FlowRun.deployment_id`.""" 

314 

315 any_: Optional[list[UUID]] = Field( 1a

316 default=None, description="A list of flow run deployment ids to include" 

317 ) 

318 is_null_: Optional[bool] = Field( 1a

319 default=None, 

320 description="If true, only include flow runs without deployment ids", 

321 ) 

322 

323 def _get_filter_list( 1a

324 self, db: "PrefectDBInterface" 

325 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

326 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

327 if self.any_ is not None: 

328 filters.append(db.FlowRun.deployment_id.in_(self.any_)) 

329 if self.is_null_ is not None: 

330 filters.append( 

331 db.FlowRun.deployment_id.is_(None) 

332 if self.is_null_ 

333 else db.FlowRun.deployment_id.is_not(None) 

334 ) 

335 return filters 

336 

337 

338class FlowRunFilterWorkQueueName(PrefectOperatorFilterBaseModel): 1a

339 """Filter by `FlowRun.work_queue_name`.""" 

340 

341 any_: Optional[list[str]] = Field( 1a

342 default=None, 

343 description="A list of work queue names to include", 

344 examples=[["work_queue_1", "work_queue_2"]], 

345 ) 

346 is_null_: Optional[bool] = Field( 1a

347 default=None, 

348 description="If true, only include flow runs without work queue names", 

349 ) 

350 

351 def _get_filter_list( 1a

352 self, db: "PrefectDBInterface" 

353 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

354 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

355 if self.any_ is not None: 

356 filters.append(db.FlowRun.work_queue_name.in_(self.any_)) 

357 if self.is_null_ is not None: 

358 filters.append( 

359 db.FlowRun.work_queue_name.is_(None) 

360 if self.is_null_ 

361 else db.FlowRun.work_queue_name.is_not(None) 

362 ) 

363 return filters 

364 

365 

366class FlowRunFilterStateType(PrefectFilterBaseModel): 1a

367 """Filter by `FlowRun.state_type`.""" 

368 

369 any_: Optional[list[schemas.states.StateType]] = Field( 1a

370 default=None, description="A list of flow run state types to include" 

371 ) 

372 not_any_: Optional[list[schemas.states.StateType]] = Field( 1a

373 default=None, description="A list of flow run state types to exclude" 

374 ) 

375 

376 def _get_filter_list( 1a

377 self, db: "PrefectDBInterface" 

378 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

379 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

380 if self.any_ is not None: 

381 filters.append(db.FlowRun.state_type.in_(self.any_)) 

382 if self.not_any_ is not None: 

383 filters.append(db.FlowRun.state_type.not_in(self.not_any_)) 

384 return filters 

385 

386 

387class FlowRunFilterStateName(PrefectFilterBaseModel): 1a

388 """Filter by `FlowRun.state_name`.""" 

389 

390 any_: Optional[list[str]] = Field( 1a

391 default=None, description="A list of flow run state names to include" 

392 ) 

393 not_any_: Optional[list[str]] = Field( 1a

394 default=None, description="A list of flow run state names to exclude" 

395 ) 

396 

397 def _get_filter_list( 1a

398 self, db: "PrefectDBInterface" 

399 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

400 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

401 if self.any_ is not None: 

402 filters.append(db.FlowRun.state_name.in_(self.any_)) 

403 if self.not_any_ is not None: 

404 filters.append(db.FlowRun.state_name.not_in(self.not_any_)) 

405 return filters 

406 

407 

408class FlowRunFilterState(PrefectOperatorFilterBaseModel): 1a

409 """Filter by `FlowRun.state_type` and `FlowRun.state_name`.""" 

410 

411 type: Optional[FlowRunFilterStateType] = Field( 1a

412 default=None, description="Filter criteria for `FlowRun.state_type`" 

413 ) 

414 name: Optional[FlowRunFilterStateName] = Field( 1a

415 default=None, description="Filter criteria for `FlowRun.state_name`" 

416 ) 

417 

418 def _get_filter_list( 1a

419 self, db: "PrefectDBInterface" 

420 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

421 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

422 if self.type is not None: 

423 filter = self.type.as_sql_filter() 

424 if isinstance(filter, sa.BinaryExpression): 

425 filters.append(filter) 

426 if self.name is not None: 

427 filter = self.name.as_sql_filter() 

428 if isinstance(filter, sa.BinaryExpression): 

429 filters.append(filter) 

430 return filters 

431 

432 

433class FlowRunFilterFlowVersion(PrefectFilterBaseModel): 1a

434 """Filter by `FlowRun.flow_version`.""" 

435 

436 any_: Optional[list[str]] = Field( 1a

437 default=None, description="A list of flow run flow_versions to include" 

438 ) 

439 

440 def _get_filter_list( 1a

441 self, db: "PrefectDBInterface" 

442 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

443 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

444 if self.any_ is not None: 

445 filters.append(db.FlowRun.flow_version.in_(self.any_)) 

446 return filters 

447 

448 

449class FlowRunFilterStartTime(PrefectFilterBaseModel): 1a

450 """Filter by `FlowRun.start_time`.""" 

451 

452 before_: Optional[DateTime] = Field( 1a

453 default=None, 

454 description="Only include flow runs starting at or before this time", 

455 ) 

456 after_: Optional[DateTime] = Field( 1a

457 default=None, 

458 description="Only include flow runs starting at or after this time", 

459 ) 

460 is_null_: Optional[bool] = Field( 1a

461 default=None, description="If true, only return flow runs without a start time" 

462 ) 

463 

464 def _get_filter_list( 1a

465 self, db: "PrefectDBInterface" 

466 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

467 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

468 if self.before_ is not None: 

469 filters.append( 

470 coalesce(db.FlowRun.start_time, db.FlowRun.expected_start_time) 

471 <= self.before_ 

472 ) 

473 if self.after_ is not None: 

474 filters.append( 

475 coalesce(db.FlowRun.start_time, db.FlowRun.expected_start_time) 

476 >= self.after_ 

477 ) 

478 if self.is_null_ is not None: 

479 filters.append( 

480 db.FlowRun.start_time.is_(None) 

481 if self.is_null_ 

482 else db.FlowRun.start_time.is_not(None) 

483 ) 

484 return filters 

485 

486 

487class FlowRunFilterEndTime(PrefectFilterBaseModel): 1a

488 """Filter by `FlowRun.end_time`.""" 

489 

490 before_: Optional[DateTime] = Field( 1a

491 default=None, 

492 description="Only include flow runs ending at or before this time", 

493 ) 

494 after_: Optional[DateTime] = Field( 1a

495 default=None, 

496 description="Only include flow runs ending at or after this time", 

497 ) 

498 is_null_: Optional[bool] = Field( 1a

499 default=None, description="If true, only return flow runs without an end time" 

500 ) 

501 

502 def _get_filter_list( 1a

503 self, db: "PrefectDBInterface" 

504 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

505 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

506 if self.before_ is not None: 

507 filters.append(db.FlowRun.end_time <= self.before_) 

508 if self.after_ is not None: 

509 filters.append(db.FlowRun.end_time >= self.after_) 

510 if self.is_null_ is not None: 

511 filters.append( 

512 db.FlowRun.end_time.is_(None) 

513 if self.is_null_ 

514 else db.FlowRun.end_time.is_not(None) 

515 ) 

516 return filters 

517 

518 

519class FlowRunFilterExpectedStartTime(PrefectFilterBaseModel): 1a

520 """Filter by `FlowRun.expected_start_time`.""" 

521 

522 before_: Optional[DateTime] = Field( 1a

523 default=None, 

524 description="Only include flow runs scheduled to start at or before this time", 

525 ) 

526 after_: Optional[DateTime] = Field( 1a

527 default=None, 

528 description="Only include flow runs scheduled to start at or after this time", 

529 ) 

530 

531 def _get_filter_list( 1a

532 self, db: "PrefectDBInterface" 

533 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

534 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

535 if self.before_ is not None: 

536 filters.append(db.FlowRun.expected_start_time <= self.before_) 

537 if self.after_ is not None: 

538 filters.append(db.FlowRun.expected_start_time >= self.after_) 

539 return filters 

540 

541 

542class FlowRunFilterNextScheduledStartTime(PrefectFilterBaseModel): 1a

543 """Filter by `FlowRun.next_scheduled_start_time`.""" 

544 

545 before_: Optional[DateTime] = Field( 1a

546 default=None, 

547 description=( 

548 "Only include flow runs with a next_scheduled_start_time or before this" 

549 " time" 

550 ), 

551 ) 

552 after_: Optional[DateTime] = Field( 1a

553 default=None, 

554 description=( 

555 "Only include flow runs with a next_scheduled_start_time at or after this" 

556 " time" 

557 ), 

558 ) 

559 

560 def _get_filter_list( 1a

561 self, db: "PrefectDBInterface" 

562 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

563 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

564 if self.before_ is not None: 

565 filters.append(db.FlowRun.next_scheduled_start_time <= self.before_) 

566 if self.after_ is not None: 

567 filters.append(db.FlowRun.next_scheduled_start_time >= self.after_) 

568 return filters 

569 

570 

571class FlowRunFilterParentFlowRunId(PrefectOperatorFilterBaseModel): 1a

572 """Filter for subflows of a given flow run""" 

573 

574 any_: Optional[list[UUID]] = Field( 1a

575 default=None, description="A list of parent flow run ids to include" 

576 ) 

577 

578 def _get_filter_list( 1a

579 self, db: "PrefectDBInterface" 

580 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

581 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

582 if self.any_ is not None: 

583 filters.append( 

584 db.FlowRun.id.in_( 

585 sa.select(db.FlowRun.id) 

586 .join( 

587 db.TaskRun, 

588 sa.and_( 

589 db.TaskRun.id == db.FlowRun.parent_task_run_id, 

590 ), 

591 ) 

592 .where(db.TaskRun.flow_run_id.in_(self.any_)) 

593 ) 

594 ) 

595 return filters 

596 

597 

598class FlowRunFilterParentTaskRunId(PrefectOperatorFilterBaseModel): 1a

599 """Filter by `FlowRun.parent_task_run_id`.""" 

600 

601 any_: Optional[list[UUID]] = Field( 1a

602 default=None, description="A list of flow run parent_task_run_ids to include" 

603 ) 

604 is_null_: Optional[bool] = Field( 1a

605 default=None, 

606 description="If true, only include flow runs without parent_task_run_id", 

607 ) 

608 

609 def _get_filter_list( 1a

610 self, db: "PrefectDBInterface" 

611 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

612 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

613 if self.any_ is not None: 

614 filters.append(db.FlowRun.parent_task_run_id.in_(self.any_)) 

615 if self.is_null_ is not None: 

616 filters.append( 

617 db.FlowRun.parent_task_run_id.is_(None) 

618 if self.is_null_ 

619 else db.FlowRun.parent_task_run_id.is_not(None) 

620 ) 

621 return filters 

622 

623 

624class FlowRunFilterIdempotencyKey(PrefectFilterBaseModel): 1a

625 """Filter by FlowRun.idempotency_key.""" 

626 

627 any_: Optional[list[str]] = Field( 1a

628 default=None, description="A list of flow run idempotency keys to include" 

629 ) 

630 not_any_: Optional[list[str]] = Field( 1a

631 default=None, description="A list of flow run idempotency keys to exclude" 

632 ) 

633 

634 def _get_filter_list( 1a

635 self, db: "PrefectDBInterface" 

636 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

637 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

638 if self.any_ is not None: 

639 filters.append(db.FlowRun.idempotency_key.in_(self.any_)) 

640 if self.not_any_ is not None: 

641 filters.append(db.FlowRun.idempotency_key.not_in(self.not_any_)) 

642 return filters 

643 

644 

645class FlowRunFilter(PrefectOperatorFilterBaseModel): 1a

646 """Filter flow runs. Only flow runs matching all criteria will be returned""" 

647 

648 id: Optional[FlowRunFilterId] = Field( 1a

649 default=None, description="Filter criteria for `FlowRun.id`" 

650 ) 

651 name: Optional[FlowRunFilterName] = Field( 1a

652 default=None, description="Filter criteria for `FlowRun.name`" 

653 ) 

654 tags: Optional[FlowRunFilterTags] = Field( 1a

655 default=None, description="Filter criteria for `FlowRun.tags`" 

656 ) 

657 deployment_id: Optional[FlowRunFilterDeploymentId] = Field( 1a

658 default=None, description="Filter criteria for `FlowRun.deployment_id`" 

659 ) 

660 work_queue_name: Optional[FlowRunFilterWorkQueueName] = Field( 1a

661 default=None, description="Filter criteria for `FlowRun.work_queue_name" 

662 ) 

663 state: Optional[FlowRunFilterState] = Field( 1a

664 default=None, description="Filter criteria for `FlowRun.state`" 

665 ) 

666 flow_version: Optional[FlowRunFilterFlowVersion] = Field( 1a

667 default=None, description="Filter criteria for `FlowRun.flow_version`" 

668 ) 

669 start_time: Optional[FlowRunFilterStartTime] = Field( 1a

670 default=None, description="Filter criteria for `FlowRun.start_time`" 

671 ) 

672 end_time: Optional[FlowRunFilterEndTime] = Field( 1a

673 default=None, description="Filter criteria for `FlowRun.end_time`" 

674 ) 

675 expected_start_time: Optional[FlowRunFilterExpectedStartTime] = Field( 1a

676 default=None, description="Filter criteria for `FlowRun.expected_start_time`" 

677 ) 

678 next_scheduled_start_time: Optional[FlowRunFilterNextScheduledStartTime] = Field( 1a

679 default=None, 

680 description="Filter criteria for `FlowRun.next_scheduled_start_time`", 

681 ) 

682 parent_flow_run_id: Optional[FlowRunFilterParentFlowRunId] = Field( 1a

683 default=None, description="Filter criteria for subflows of the given flow runs" 

684 ) 

685 parent_task_run_id: Optional[FlowRunFilterParentTaskRunId] = Field( 1a

686 default=None, description="Filter criteria for `FlowRun.parent_task_run_id`" 

687 ) 

688 idempotency_key: Optional[FlowRunFilterIdempotencyKey] = Field( 1a

689 default=None, description="Filter criteria for `FlowRun.idempotency_key`" 

690 ) 

691 

692 def only_filters_on_id(self) -> bool: 1a

693 return bool( 

694 self.id is not None 

695 and (self.id.any_ and not self.id.not_any_) 

696 and self.name is None 

697 and self.tags is None 

698 and self.deployment_id is None 

699 and self.work_queue_name is None 

700 and self.state is None 

701 and self.flow_version is None 

702 and self.start_time is None 

703 and self.end_time is None 

704 and self.expected_start_time is None 

705 and self.next_scheduled_start_time is None 

706 and self.parent_flow_run_id is None 

707 and self.parent_task_run_id is None 

708 and self.idempotency_key is None 

709 ) 

710 

711 def _get_filter_list( 1a

712 self, db: "PrefectDBInterface" 

713 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

714 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

715 

716 if self.id is not None: 

717 filters.append(self.id.as_sql_filter()) 

718 if self.name is not None: 

719 filters.append(self.name.as_sql_filter()) 

720 if self.tags is not None: 

721 filters.append(self.tags.as_sql_filter()) 

722 if self.deployment_id is not None: 

723 filters.append(self.deployment_id.as_sql_filter()) 

724 if self.work_queue_name is not None: 

725 filters.append(self.work_queue_name.as_sql_filter()) 

726 if self.flow_version is not None: 

727 filters.append(self.flow_version.as_sql_filter()) 

728 if self.state is not None: 

729 filters.append(self.state.as_sql_filter()) 

730 if self.start_time is not None: 

731 filters.append(self.start_time.as_sql_filter()) 

732 if self.end_time is not None: 

733 filters.append(self.end_time.as_sql_filter()) 

734 if self.expected_start_time is not None: 

735 filters.append(self.expected_start_time.as_sql_filter()) 

736 if self.next_scheduled_start_time is not None: 

737 filters.append(self.next_scheduled_start_time.as_sql_filter()) 

738 if self.parent_flow_run_id is not None: 

739 filters.append(self.parent_flow_run_id.as_sql_filter()) 

740 if self.parent_task_run_id is not None: 

741 filters.append(self.parent_task_run_id.as_sql_filter()) 

742 if self.idempotency_key is not None: 

743 filters.append(self.idempotency_key.as_sql_filter()) 

744 

745 return filters 

746 

747 

748class TaskRunFilterFlowRunId(PrefectOperatorFilterBaseModel): 1a

749 """Filter by `TaskRun.flow_run_id`.""" 

750 

751 any_: Optional[list[UUID]] = Field( 1a

752 default=None, description="A list of task run flow run ids to include" 

753 ) 

754 

755 is_null_: Optional[bool] = Field( 1a

756 default=False, description="Filter for task runs with None as their flow run id" 

757 ) 

758 

759 def _get_filter_list( 1a

760 self, db: "PrefectDBInterface" 

761 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

762 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

763 if self.is_null_ is True: 

764 filters.append(db.TaskRun.flow_run_id.is_(None)) 

765 elif self.is_null_ is False and self.any_ is None: 

766 filters.append(db.TaskRun.flow_run_id.is_not(None)) 

767 else: 

768 if self.any_ is not None: 

769 filters.append(db.TaskRun.flow_run_id.in_(self.any_)) 

770 return filters 

771 

772 

773class TaskRunFilterId(PrefectFilterBaseModel): 1a

774 """Filter by `TaskRun.id`.""" 

775 

776 any_: Optional[list[UUID]] = Field( 1a

777 default=None, description="A list of task run ids to include" 

778 ) 

779 

780 def _get_filter_list( 1a

781 self, db: "PrefectDBInterface" 

782 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

783 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

784 if self.any_ is not None: 

785 filters.append(db.TaskRun.id.in_(self.any_)) 

786 return filters 

787 

788 

789class TaskRunFilterName(PrefectFilterBaseModel): 1a

790 """Filter by `TaskRun.name`.""" 

791 

792 any_: Optional[list[str]] = Field( 1a

793 default=None, 

794 description="A list of task run names to include", 

795 examples=[["my-task-run-1", "my-task-run-2"]], 

796 ) 

797 

798 like_: Optional[str] = Field( 1a

799 default=None, 

800 description=( 

801 "A case-insensitive partial match. For example, " 

802 " passing 'marvin' will match " 

803 "'marvin', 'sad-Marvin', and 'marvin-robot'." 

804 ), 

805 examples=["marvin"], 

806 ) 

807 

808 def _get_filter_list( 1a

809 self, db: "PrefectDBInterface" 

810 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

811 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

812 if self.any_ is not None: 

813 filters.append(db.TaskRun.name.in_(self.any_)) 

814 if self.like_ is not None: 

815 filters.append(db.TaskRun.name.ilike(f"%{self.like_}%")) 

816 return filters 

817 

818 

819class TaskRunFilterTags(PrefectOperatorFilterBaseModel): 1a

820 """Filter by `TaskRun.tags`.""" 

821 

822 all_: Optional[list[str]] = Field( 1a

823 default=None, 

824 examples=[["tag-1", "tag-2"]], 

825 description=( 

826 "A list of tags. Task runs will be returned only if their tags are a" 

827 " superset of the list" 

828 ), 

829 ) 

830 is_null_: Optional[bool] = Field( 1a

831 default=None, description="If true, only include task runs without tags" 

832 ) 

833 

834 def _get_filter_list( 1a

835 self, db: "PrefectDBInterface" 

836 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

837 filters: list[sa.ColumnElement[bool]] = [] 

838 if self.all_ is not None: 

839 filters.append(db.TaskRun.tags.has_all(_as_array(self.all_))) 

840 if self.is_null_ is not None: 

841 filters.append( 

842 db.TaskRun.tags == [] if self.is_null_ else db.TaskRun.tags != [] 

843 ) 

844 return filters 

845 

846 

847class TaskRunFilterStateType(PrefectFilterBaseModel): 1a

848 """Filter by `TaskRun.state_type`.""" 

849 

850 any_: Optional[list[schemas.states.StateType]] = Field( 1a

851 default=None, description="A list of task run state types to include" 

852 ) 

853 

854 def _get_filter_list( 1a

855 self, db: "PrefectDBInterface" 

856 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

857 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

858 if self.any_ is not None: 

859 filters.append(db.TaskRun.state_type.in_(self.any_)) 

860 return filters 

861 

862 

863class TaskRunFilterStateName(PrefectFilterBaseModel): 1a

864 """Filter by `TaskRun.state_name`.""" 

865 

866 any_: Optional[list[str]] = Field( 1a

867 default=None, description="A list of task run state names to include" 

868 ) 

869 

870 def _get_filter_list( 1a

871 self, db: "PrefectDBInterface" 

872 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

873 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

874 if self.any_ is not None: 

875 filters.append(db.TaskRun.state_name.in_(self.any_)) 

876 return filters 

877 

878 

879class TaskRunFilterState(PrefectOperatorFilterBaseModel): 1a

880 """Filter by `TaskRun.type` and `TaskRun.name`.""" 

881 

882 type: Optional[TaskRunFilterStateType] = Field( 1a

883 default=None, description="Filter criteria for `TaskRun.state_type`" 

884 ) 

885 name: Optional[TaskRunFilterStateName] = Field( 1a

886 default=None, description="Filter criteria for `TaskRun.state_name`" 

887 ) 

888 

889 def _get_filter_list( 1a

890 self, db: "PrefectDBInterface" 

891 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

892 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

893 if self.type is not None: 

894 filter = self.type.as_sql_filter() 

895 if isinstance(filter, sa.BinaryExpression): 

896 filters.append(filter) 

897 if self.name is not None: 

898 filter = self.name.as_sql_filter() 

899 if isinstance(filter, sa.BinaryExpression): 

900 filters.append(filter) 

901 return filters 

902 

903 

904class TaskRunFilterSubFlowRuns(PrefectFilterBaseModel): 1a

905 """Filter by `TaskRun.subflow_run`.""" 

906 

907 exists_: Optional[bool] = Field( 1a

908 default=None, 

909 description=( 

910 "If true, only include task runs that are subflow run parents; if false," 

911 " exclude parent task runs" 

912 ), 

913 ) 

914 

915 def _get_filter_list( 1a

916 self, db: "PrefectDBInterface" 

917 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

918 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

919 if self.exists_ is True: 

920 filters.append(db.TaskRun.subflow_run.has()) 

921 elif self.exists_ is False: 

922 filters.append(sa.not_(db.TaskRun.subflow_run.has())) 

923 return filters 

924 

925 

926class TaskRunFilterStartTime(PrefectFilterBaseModel): 1a

927 """Filter by `TaskRun.start_time`.""" 

928 

929 before_: Optional[DateTime] = Field( 1a

930 default=None, 

931 description="Only include task runs starting at or before this time", 

932 ) 

933 after_: Optional[DateTime] = Field( 1a

934 default=None, 

935 description="Only include task runs starting at or after this time", 

936 ) 

937 is_null_: Optional[bool] = Field( 1a

938 default=None, description="If true, only return task runs without a start time" 

939 ) 

940 

941 def _get_filter_list( 1a

942 self, db: "PrefectDBInterface" 

943 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

944 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

945 if self.before_ is not None: 

946 filters.append(db.TaskRun.start_time <= self.before_) 

947 if self.after_ is not None: 

948 filters.append(db.TaskRun.start_time >= self.after_) 

949 if self.is_null_ is not None: 

950 filters.append( 

951 db.TaskRun.start_time.is_(None) 

952 if self.is_null_ 

953 else db.TaskRun.start_time.is_not(None) 

954 ) 

955 return filters 

956 

957 

958class TaskRunFilterExpectedStartTime(PrefectFilterBaseModel): 1a

959 """Filter by `TaskRun.expected_start_time`.""" 

960 

961 before_: Optional[DateTime] = Field( 1a

962 default=None, 

963 description="Only include task runs expected to start at or before this time", 

964 ) 

965 after_: Optional[DateTime] = Field( 1a

966 default=None, 

967 description="Only include task runs expected to start at or after this time", 

968 ) 

969 

970 def _get_filter_list( 1a

971 self, db: "PrefectDBInterface" 

972 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

973 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

974 if self.before_ is not None: 

975 filters.append(db.TaskRun.expected_start_time <= self.before_) 

976 if self.after_ is not None: 

977 filters.append(db.TaskRun.expected_start_time >= self.after_) 

978 return filters 

979 

980 

981class TaskRunFilter(PrefectOperatorFilterBaseModel): 1a

982 """Filter task runs. Only task runs matching all criteria will be returned""" 

983 

984 id: Optional[TaskRunFilterId] = Field( 1a

985 default=None, description="Filter criteria for `TaskRun.id`" 

986 ) 

987 name: Optional[TaskRunFilterName] = Field( 1a

988 default=None, description="Filter criteria for `TaskRun.name`" 

989 ) 

990 tags: Optional[TaskRunFilterTags] = Field( 1a

991 default=None, description="Filter criteria for `TaskRun.tags`" 

992 ) 

993 state: Optional[TaskRunFilterState] = Field( 1a

994 default=None, description="Filter criteria for `TaskRun.state`" 

995 ) 

996 start_time: Optional[TaskRunFilterStartTime] = Field( 1a

997 default=None, description="Filter criteria for `TaskRun.start_time`" 

998 ) 

999 expected_start_time: Optional[TaskRunFilterExpectedStartTime] = Field( 1a

1000 default=None, description="Filter criteria for `TaskRun.expected_start_time`" 

1001 ) 

1002 subflow_runs: Optional[TaskRunFilterSubFlowRuns] = Field( 1a

1003 default=None, description="Filter criteria for `TaskRun.subflow_run`" 

1004 ) 

1005 flow_run_id: Optional[TaskRunFilterFlowRunId] = Field( 1a

1006 default=None, description="Filter criteria for `TaskRun.flow_run_id`" 

1007 ) 

1008 

1009 def _get_filter_list( 1a

1010 self, db: "PrefectDBInterface" 

1011 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1012 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1013 

1014 if self.id is not None: 

1015 filters.append(self.id.as_sql_filter()) 

1016 if self.name is not None: 

1017 filters.append(self.name.as_sql_filter()) 

1018 if self.tags is not None: 

1019 filters.append(self.tags.as_sql_filter()) 

1020 if self.state is not None: 

1021 filters.append(self.state.as_sql_filter()) 

1022 if self.start_time is not None: 

1023 filters.append(self.start_time.as_sql_filter()) 

1024 if self.expected_start_time is not None: 

1025 filters.append(self.expected_start_time.as_sql_filter()) 

1026 if self.subflow_runs is not None: 

1027 filters.append(self.subflow_runs.as_sql_filter()) 

1028 if self.flow_run_id is not None: 

1029 filters.append(self.flow_run_id.as_sql_filter()) 

1030 

1031 return filters 

1032 

1033 

1034class DeploymentFilterId(PrefectFilterBaseModel): 1a

1035 """Filter by `Deployment.id`.""" 

1036 

1037 any_: Optional[list[UUID]] = Field( 1a

1038 default=None, description="A list of deployment ids to include" 

1039 ) 

1040 not_any_: Optional[list[UUID]] = Field( 1a

1041 default=None, description="A list of deployment ids to exclude" 

1042 ) 

1043 

1044 def _get_filter_list( 1a

1045 self, db: "PrefectDBInterface" 

1046 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1047 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1048 if self.any_ is not None: 

1049 filters.append(db.Deployment.id.in_(self.any_)) 

1050 if self.not_any_ is not None: 

1051 filters.append(db.Deployment.id.not_in(self.not_any_)) 

1052 return filters 

1053 

1054 

1055class DeploymentFilterName(PrefectFilterBaseModel): 1a

1056 """Filter by `Deployment.name`.""" 

1057 

1058 any_: Optional[list[str]] = Field( 1a

1059 default=None, 

1060 description="A list of deployment names to include", 

1061 examples=[["my-deployment-1", "my-deployment-2"]], 

1062 ) 

1063 

1064 like_: Optional[str] = Field( 1a

1065 default=None, 

1066 description=( 

1067 "A case-insensitive partial match. For example, " 

1068 " passing 'marvin' will match " 

1069 "'marvin', 'sad-Marvin', and 'marvin-robot'." 

1070 ), 

1071 examples=["marvin"], 

1072 ) 

1073 

1074 def _get_filter_list( 1a

1075 self, db: "PrefectDBInterface" 

1076 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1077 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1078 if self.any_ is not None: 

1079 filters.append(db.Deployment.name.in_(self.any_)) 

1080 if self.like_ is not None: 

1081 filters.append(db.Deployment.name.ilike(f"%{self.like_}%")) 

1082 return filters 

1083 

1084 

1085class DeploymentOrFlowNameFilter(PrefectFilterBaseModel): 1a

1086 """Filter by `Deployment.name` or `Flow.name` with a single input string for ilike filtering.""" 

1087 

1088 like_: Optional[str] = Field( 1a

1089 default=None, 

1090 description=( 

1091 "A case-insensitive partial match on deployment or flow names. For example, " 

1092 "passing 'example' might match deployments or flows with 'example' in their names." 

1093 ), 

1094 ) 

1095 

1096 def _get_filter_list( 1a

1097 self, db: "PrefectDBInterface" 

1098 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1099 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1100 if self.like_ is not None: 

1101 deployment_name_filter = db.Deployment.name.ilike(f"%{self.like_}%") 

1102 

1103 flow_name_filter = db.Deployment.flow.has( 

1104 db.Flow.name.ilike(f"%{self.like_}%") 

1105 ) 

1106 filters.append(sa.or_(deployment_name_filter, flow_name_filter)) 

1107 return filters 

1108 

1109 

1110class DeploymentFilterPaused(PrefectFilterBaseModel): 1a

1111 """Filter by `Deployment.paused`.""" 

1112 

1113 eq_: Optional[bool] = Field( 1a

1114 default=None, 

1115 description="Only returns where deployment is/is not paused", 

1116 ) 

1117 

1118 def _get_filter_list( 1a

1119 self, db: "PrefectDBInterface" 

1120 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1121 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1122 if self.eq_ is not None: 

1123 filters.append(db.Deployment.paused.is_(self.eq_)) 

1124 return filters 

1125 

1126 

1127class DeploymentFilterWorkQueueName(PrefectFilterBaseModel): 1a

1128 """Filter by `Deployment.work_queue_name`.""" 

1129 

1130 any_: Optional[list[str]] = Field( 1a

1131 default=None, 

1132 description="A list of work queue names to include", 

1133 examples=[["work_queue_1", "work_queue_2"]], 

1134 ) 

1135 

1136 def _get_filter_list( 1a

1137 self, db: "PrefectDBInterface" 

1138 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1139 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1140 if self.any_ is not None: 

1141 filters.append(db.Deployment.work_queue_name.in_(self.any_)) 

1142 return filters 

1143 

1144 

1145class DeploymentFilterConcurrencyLimit(PrefectFilterBaseModel): 1a

1146 """DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`.""" 

1147 

1148 ge_: Optional[int] = Field( 1a

1149 default=None, 

1150 description="Only include deployments with a concurrency limit greater than or equal to this value", 

1151 ) 

1152 

1153 le_: Optional[int] = Field( 1a

1154 default=None, 

1155 description="Only include deployments with a concurrency limit less than or equal to this value", 

1156 ) 

1157 is_null_: Optional[bool] = Field( 1a

1158 default=None, 

1159 description="If true, only include deployments without a concurrency limit", 

1160 ) 

1161 

1162 def _get_filter_list( 1a

1163 self, db: "PrefectDBInterface" 

1164 ) -> list[sa.ColumnElement[bool]]: 

1165 # This used to filter on an `int` column that was moved to a `ForeignKey` relationship 

1166 # This filter is now deprecated rather than support filtering on the new relationship 

1167 return [] 

1168 

1169 

1170class DeploymentFilterTags(PrefectOperatorFilterBaseModel): 1a

1171 """Filter by `Deployment.tags`.""" 

1172 

1173 all_: Optional[list[str]] = Field( 1a

1174 default=None, 

1175 examples=[["tag-1", "tag-2"]], 

1176 description=( 

1177 "A list of tags. Deployments will be returned only if their tags are a" 

1178 " superset of the list" 

1179 ), 

1180 ) 

1181 any_: Optional[list[str]] = Field( 1a

1182 default=None, 

1183 examples=[["tag-1", "tag-2"]], 

1184 description="A list of tags to include", 

1185 ) 

1186 

1187 is_null_: Optional[bool] = Field( 1a

1188 default=None, description="If true, only include deployments without tags" 

1189 ) 

1190 

1191 def _get_filter_list( 1a

1192 self, db: "PrefectDBInterface" 

1193 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1194 from prefect.server.database import orm_models 

1195 

1196 filters: list[sa.ColumnElement[bool]] = [] 

1197 if self.all_ is not None: 

1198 filters.append(orm_models.Deployment.tags.has_all(_as_array(self.all_))) 

1199 if self.any_ is not None: 

1200 filters.append(orm_models.Deployment.tags.has_any(_as_array(self.any_))) 

1201 if self.is_null_ is not None: 

1202 filters.append( 

1203 db.Deployment.tags == [] if self.is_null_ else db.Deployment.tags != [] 

1204 ) 

1205 return filters 

1206 

1207 

1208class DeploymentFilter(PrefectOperatorFilterBaseModel): 1a

1209 """Filter for deployments. Only deployments matching all criteria will be returned.""" 

1210 

1211 id: Optional[DeploymentFilterId] = Field( 1a

1212 default=None, description="Filter criteria for `Deployment.id`" 

1213 ) 

1214 name: Optional[DeploymentFilterName] = Field( 1a

1215 default=None, description="Filter criteria for `Deployment.name`" 

1216 ) 

1217 flow_or_deployment_name: Optional[DeploymentOrFlowNameFilter] = Field( 1a

1218 default=None, description="Filter criteria for `Deployment.name` or `Flow.name`" 

1219 ) 

1220 paused: Optional[DeploymentFilterPaused] = Field( 1a

1221 default=None, description="Filter criteria for `Deployment.paused`" 

1222 ) 

1223 tags: Optional[DeploymentFilterTags] = Field( 1a

1224 default=None, description="Filter criteria for `Deployment.tags`" 

1225 ) 

1226 work_queue_name: Optional[DeploymentFilterWorkQueueName] = Field( 1a

1227 default=None, description="Filter criteria for `Deployment.work_queue_name`" 

1228 ) 

1229 concurrency_limit: Optional[DeploymentFilterConcurrencyLimit] = Field( 1a

1230 default=None, 

1231 description="DEPRECATED: Prefer `Deployment.concurrency_limit_id` over `Deployment.concurrency_limit`. If provided, will be ignored for backwards-compatibility. Will be removed after December 2024.", 

1232 deprecated=True, 

1233 ) 

1234 

1235 def _get_filter_list( 1a

1236 self, db: "PrefectDBInterface" 

1237 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1238 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1239 if self.id is not None: 

1240 filters.append(self.id.as_sql_filter()) 

1241 if self.name is not None: 

1242 filters.append(self.name.as_sql_filter()) 

1243 if self.flow_or_deployment_name is not None: 

1244 filters.append(self.flow_or_deployment_name.as_sql_filter()) 

1245 if self.paused is not None: 

1246 filters.append(self.paused.as_sql_filter()) 

1247 if self.tags is not None: 

1248 filters.append(self.tags.as_sql_filter()) 

1249 if self.work_queue_name is not None: 

1250 filters.append(self.work_queue_name.as_sql_filter()) 

1251 

1252 return filters 

1253 

1254 

1255class DeploymentScheduleFilterActive(PrefectFilterBaseModel): 1a

1256 """Filter by `DeploymentSchedule.active`.""" 

1257 

1258 eq_: Optional[bool] = Field( 1a

1259 default=None, 

1260 description="Only returns where deployment schedule is/is not active", 

1261 ) 

1262 

1263 def _get_filter_list( 1a

1264 self, db: "PrefectDBInterface" 

1265 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1266 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1267 if self.eq_ is not None: 

1268 filters.append(db.DeploymentSchedule.active.is_(self.eq_)) 

1269 return filters 

1270 

1271 

1272class DeploymentScheduleFilter(PrefectOperatorFilterBaseModel): 1a

1273 """Filter for deployments. Only deployments matching all criteria will be returned.""" 

1274 

1275 active: Optional[DeploymentScheduleFilterActive] = Field( 1a

1276 default=None, description="Filter criteria for `DeploymentSchedule.active`" 

1277 ) 

1278 

1279 def _get_filter_list( 1a

1280 self, db: "PrefectDBInterface" 

1281 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1282 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1283 

1284 if self.active is not None: 

1285 filters.append(self.active.as_sql_filter()) 

1286 

1287 return filters 

1288 

1289 

1290class LogFilterName(PrefectFilterBaseModel): 1a

1291 """Filter by `Log.name`.""" 

1292 

1293 any_: Optional[list[str]] = Field( 1a

1294 default=None, 

1295 description="A list of log names to include", 

1296 examples=[["prefect.logger.flow_runs", "prefect.logger.task_runs"]], 

1297 ) 

1298 

1299 def _get_filter_list( 1a

1300 self, db: "PrefectDBInterface" 

1301 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1302 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1303 if self.any_ is not None: 

1304 filters.append(db.Log.name.in_(self.any_)) 

1305 return filters 

1306 

1307 

1308class LogFilterLevel(PrefectFilterBaseModel): 1a

1309 """Filter by `Log.level`.""" 

1310 

1311 ge_: Optional[int] = Field( 1a

1312 default=None, 

1313 description="Include logs with a level greater than or equal to this level", 

1314 examples=[20], 

1315 ) 

1316 

1317 le_: Optional[int] = Field( 1a

1318 default=None, 

1319 description="Include logs with a level less than or equal to this level", 

1320 examples=[50], 

1321 ) 

1322 

1323 def _get_filter_list( 1a

1324 self, db: "PrefectDBInterface" 

1325 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1326 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1327 if self.ge_ is not None: 

1328 filters.append(db.Log.level >= self.ge_) 

1329 if self.le_ is not None: 

1330 filters.append(db.Log.level <= self.le_) 

1331 return filters 

1332 

1333 

1334class LogFilterTimestamp(PrefectFilterBaseModel): 1a

1335 """Filter by `Log.timestamp`.""" 

1336 

1337 before_: Optional[DateTime] = Field( 1a

1338 default=None, 

1339 description="Only include logs with a timestamp at or before this time", 

1340 ) 

1341 after_: Optional[DateTime] = Field( 1a

1342 default=None, 

1343 description="Only include logs with a timestamp at or after this time", 

1344 ) 

1345 

1346 def _get_filter_list( 1a

1347 self, db: "PrefectDBInterface" 

1348 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1349 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1350 if self.before_ is not None: 

1351 filters.append(db.Log.timestamp <= self.before_) 

1352 if self.after_ is not None: 

1353 filters.append(db.Log.timestamp >= self.after_) 

1354 return filters 

1355 

1356 

1357class LogFilterFlowRunId(PrefectFilterBaseModel): 1a

1358 """Filter by `Log.flow_run_id`.""" 

1359 

1360 any_: Optional[list[UUID]] = Field( 1a

1361 default=None, description="A list of flow run IDs to include" 

1362 ) 

1363 

1364 def _get_filter_list( 1a

1365 self, db: "PrefectDBInterface" 

1366 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1367 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1368 if self.any_ is not None: 

1369 filters.append(db.Log.flow_run_id.in_(self.any_)) 

1370 return filters 

1371 

1372 

1373class LogFilterTaskRunId(PrefectFilterBaseModel): 1a

1374 """Filter by `Log.task_run_id`.""" 

1375 

1376 any_: Optional[list[UUID]] = Field( 1a

1377 default=None, description="A list of task run IDs to include" 

1378 ) 

1379 

1380 is_null_: Optional[bool] = Field( 1a

1381 default=None, 

1382 description="If true, only include logs without a task run id", 

1383 ) 

1384 

1385 def _get_filter_list( 1a

1386 self, db: "PrefectDBInterface" 

1387 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1388 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1389 if self.any_ is not None: 

1390 filters.append(db.Log.task_run_id.in_(self.any_)) 

1391 if self.is_null_ is not None: 

1392 filters.append( 

1393 db.Log.task_run_id.is_(None) 

1394 if self.is_null_ 

1395 else db.Log.task_run_id.is_not(None) 

1396 ) 

1397 return filters 

1398 

1399 

1400class LogFilterTextSearch(PrefectFilterBaseModel): 1a

1401 """Filter by text search across log content.""" 

1402 

1403 query: str = Field( 1a

1404 description="Text search query string", 

1405 examples=[ 

1406 "error", 

1407 "error -debug", 

1408 '"connection timeout"', 

1409 "+required -excluded", 

1410 ], 

1411 max_length=200, 

1412 ) 

1413 

1414 def includes(self, log: "Log") -> bool: 1a

1415 """Check if this text filter includes the given log.""" 

1416 from prefect.server.schemas.core import Log 

1417 

1418 if not isinstance(log, Log): 

1419 raise TypeError(f"Expected Log object, got {type(log)}") 

1420 

1421 # Parse query into components 

1422 parsed = parse_text_search_query(self.query) 

1423 

1424 # Build searchable text from message and logger name 

1425 searchable_text = f"{log.message} {log.name}".lower() 

1426 

1427 # Check include terms (OR logic) 

1428 if parsed.include: 

1429 include_match = any( 

1430 term.lower() in searchable_text for term in parsed.include 

1431 ) 

1432 if not include_match: 

1433 return False 

1434 

1435 # Check exclude terms (NOT logic) 

1436 if parsed.exclude: 

1437 exclude_match = any( 

1438 term.lower() in searchable_text for term in parsed.exclude 

1439 ) 

1440 if exclude_match: 

1441 return False 

1442 

1443 # Check required terms (AND logic - future feature) 

1444 if parsed.required: 

1445 required_match = all( 

1446 term.lower() in searchable_text for term in parsed.required 

1447 ) 

1448 if not required_match: 

1449 return False 

1450 

1451 return True 

1452 

1453 def _get_filter_list( 1a

1454 self, db: "PrefectDBInterface" 

1455 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1456 """Build SQLAlchemy WHERE clauses for text search""" 

1457 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1458 

1459 if not self.query.strip(): 

1460 return filters 

1461 

1462 parsed = parse_text_search_query(self.query) 

1463 

1464 # Build combined searchable text field (message + name) 

1465 searchable_field = sa.func.concat(db.Log.message, " ", db.Log.name) 

1466 

1467 # Handle include terms (OR logic) 

1468 if parsed.include: 

1469 include_conditions = [] 

1470 for term in parsed.include: 

1471 include_conditions.append( 

1472 sa.func.lower(searchable_field).contains(term.lower()) 

1473 ) 

1474 

1475 if include_conditions: 

1476 filters.append(sa.or_(*include_conditions)) 

1477 

1478 # Handle exclude terms (NOT logic) 

1479 if parsed.exclude: 

1480 exclude_conditions = [] 

1481 for term in parsed.exclude: 

1482 exclude_conditions.append( 

1483 ~sa.func.lower(searchable_field).contains(term.lower()) 

1484 ) 

1485 

1486 if exclude_conditions: 

1487 filters.append(sa.and_(*exclude_conditions)) 

1488 

1489 # Handle required terms (AND logic - future feature) 

1490 if parsed.required: 

1491 required_conditions = [] 

1492 for term in parsed.required: 

1493 required_conditions.append( 

1494 sa.func.lower(searchable_field).contains(term.lower()) 

1495 ) 

1496 

1497 if required_conditions: 

1498 filters.append(sa.and_(*required_conditions)) 

1499 

1500 return filters 

1501 

1502 

1503class LogFilter(PrefectOperatorFilterBaseModel): 1a

1504 """Filter logs. Only logs matching all criteria will be returned""" 

1505 

1506 level: Optional[LogFilterLevel] = Field( 1a

1507 default=None, description="Filter criteria for `Log.level`" 

1508 ) 

1509 timestamp: Optional[LogFilterTimestamp] = Field( 1a

1510 default=None, description="Filter criteria for `Log.timestamp`" 

1511 ) 

1512 flow_run_id: Optional[LogFilterFlowRunId] = Field( 1a

1513 default=None, description="Filter criteria for `Log.flow_run_id`" 

1514 ) 

1515 task_run_id: Optional[LogFilterTaskRunId] = Field( 1a

1516 default=None, description="Filter criteria for `Log.task_run_id`" 

1517 ) 

1518 text: Optional[LogFilterTextSearch] = Field( 1a

1519 default=None, description="Filter criteria for text search across log content" 

1520 ) 

1521 

1522 def _get_filter_list( 1a

1523 self, db: "PrefectDBInterface" 

1524 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1525 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1526 

1527 if self.level is not None: 

1528 filters.append(self.level.as_sql_filter()) 

1529 if self.timestamp is not None: 

1530 filters.append(self.timestamp.as_sql_filter()) 

1531 if self.flow_run_id is not None: 

1532 filters.append(self.flow_run_id.as_sql_filter()) 

1533 if self.task_run_id is not None: 

1534 filters.append(self.task_run_id.as_sql_filter()) 

1535 if self.text is not None: 

1536 filters.extend(self.text._get_filter_list(db)) 

1537 

1538 return filters 

1539 

1540 

1541class FilterSet(PrefectBaseModel): 1a

1542 """A collection of filters for common objects""" 

1543 

1544 flows: FlowFilter = Field( 1a

1545 default_factory=FlowFilter, description="Filters that apply to flows" 

1546 ) 

1547 flow_runs: FlowRunFilter = Field( 1a

1548 default_factory=FlowRunFilter, description="Filters that apply to flow runs" 

1549 ) 

1550 task_runs: TaskRunFilter = Field( 1a

1551 default_factory=TaskRunFilter, description="Filters that apply to task runs" 

1552 ) 

1553 deployments: DeploymentFilter = Field( 1a

1554 default_factory=DeploymentFilter, 

1555 description="Filters that apply to deployments", 

1556 ) 

1557 

1558 

1559class BlockTypeFilterName(PrefectFilterBaseModel): 1a

1560 """Filter by `BlockType.name`""" 

1561 

1562 like_: Optional[str] = Field( 1a

1563 default=None, 

1564 description=( 

1565 "A case-insensitive partial match. For example, " 

1566 " passing 'marvin' will match " 

1567 "'marvin', 'sad-Marvin', and 'marvin-robot'." 

1568 ), 

1569 examples=["marvin"], 

1570 ) 

1571 

1572 def _get_filter_list( 1a

1573 self, db: "PrefectDBInterface" 

1574 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1575 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1576 if self.like_ is not None: 

1577 filters.append(db.BlockType.name.ilike(f"%{self.like_}%")) 

1578 return filters 

1579 

1580 

1581class BlockTypeFilterSlug(PrefectFilterBaseModel): 1a

1582 """Filter by `BlockType.slug`""" 

1583 

1584 any_: Optional[list[str]] = Field( 1a

1585 default=None, description="A list of slugs to match" 

1586 ) 

1587 

1588 def _get_filter_list( 1a

1589 self, db: "PrefectDBInterface" 

1590 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1591 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1592 if self.any_ is not None: 

1593 filters.append(db.BlockType.slug.in_(self.any_)) 

1594 

1595 return filters 

1596 

1597 

1598class BlockTypeFilter(PrefectFilterBaseModel): 1a

1599 """Filter BlockTypes""" 

1600 

1601 name: Optional[BlockTypeFilterName] = Field( 1a

1602 default=None, description="Filter criteria for `BlockType.name`" 

1603 ) 

1604 

1605 slug: Optional[BlockTypeFilterSlug] = Field( 1a

1606 default=None, description="Filter criteria for `BlockType.slug`" 

1607 ) 

1608 

1609 def _get_filter_list( 1a

1610 self, db: "PrefectDBInterface" 

1611 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1612 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1613 

1614 if self.name is not None: 

1615 filters.append(self.name.as_sql_filter()) 

1616 if self.slug is not None: 

1617 filters.append(self.slug.as_sql_filter()) 

1618 

1619 return filters 

1620 

1621 

1622class BlockSchemaFilterBlockTypeId(PrefectFilterBaseModel): 1a

1623 """Filter by `BlockSchema.block_type_id`.""" 

1624 

1625 any_: Optional[list[UUID]] = Field( 1a

1626 default=None, description="A list of block type ids to include" 

1627 ) 

1628 

1629 def _get_filter_list( 1a

1630 self, db: "PrefectDBInterface" 

1631 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1632 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1633 if self.any_ is not None: 

1634 filters.append(db.BlockSchema.block_type_id.in_(self.any_)) 

1635 return filters 

1636 

1637 

1638class BlockSchemaFilterId(PrefectFilterBaseModel): 1a

1639 """Filter by BlockSchema.id""" 

1640 

1641 any_: Optional[list[UUID]] = Field( 1a

1642 default=None, description="A list of IDs to include" 

1643 ) 

1644 

1645 def _get_filter_list( 1a

1646 self, db: "PrefectDBInterface" 

1647 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1648 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1649 if self.any_ is not None: 

1650 filters.append(db.BlockSchema.id.in_(self.any_)) 

1651 return filters 

1652 

1653 

1654class BlockSchemaFilterCapabilities(PrefectFilterBaseModel): 1a

1655 """Filter by `BlockSchema.capabilities`""" 

1656 

1657 all_: Optional[list[str]] = Field( 1a

1658 default=None, 

1659 examples=[["write-storage", "read-storage"]], 

1660 description=( 

1661 "A list of block capabilities. Block entities will be returned only if an" 

1662 " associated block schema has a superset of the defined capabilities." 

1663 ), 

1664 ) 

1665 

1666 def _get_filter_list( 1a

1667 self, db: "PrefectDBInterface" 

1668 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1669 filters: list[sa.ColumnElement[bool]] = [] 

1670 if self.all_ is not None: 

1671 filters.append(db.BlockSchema.capabilities.has_all(_as_array(self.all_))) 

1672 return filters 

1673 

1674 

1675class BlockSchemaFilterVersion(PrefectFilterBaseModel): 1a

1676 """Filter by `BlockSchema.capabilities`""" 

1677 

1678 any_: Optional[list[str]] = Field( 1a

1679 default=None, 

1680 examples=[["2.0.0", "2.1.0"]], 

1681 description="A list of block schema versions.", 

1682 ) 

1683 

1684 def _get_filter_list( 1a

1685 self, db: "PrefectDBInterface" 

1686 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1687 filters: list[sa.ColumnElement[bool]] = [] 

1688 if self.any_ is not None: 

1689 filters.append(db.BlockSchema.version.in_(self.any_)) 

1690 return filters 

1691 

1692 

1693class BlockSchemaFilter(PrefectOperatorFilterBaseModel): 1a

1694 """Filter BlockSchemas""" 

1695 

1696 block_type_id: Optional[BlockSchemaFilterBlockTypeId] = Field( 1a

1697 default=None, description="Filter criteria for `BlockSchema.block_type_id`" 

1698 ) 

1699 block_capabilities: Optional[BlockSchemaFilterCapabilities] = Field( 1a

1700 default=None, description="Filter criteria for `BlockSchema.capabilities`" 

1701 ) 

1702 id: Optional[BlockSchemaFilterId] = Field( 1a

1703 default=None, description="Filter criteria for `BlockSchema.id`" 

1704 ) 

1705 version: Optional[BlockSchemaFilterVersion] = Field( 1a

1706 default=None, description="Filter criteria for `BlockSchema.version`" 

1707 ) 

1708 

1709 def _get_filter_list( 1a

1710 self, db: "PrefectDBInterface" 

1711 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1712 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1713 

1714 if self.block_type_id is not None: 

1715 filters.append(self.block_type_id.as_sql_filter()) 

1716 if self.block_capabilities is not None: 

1717 filters.append(self.block_capabilities.as_sql_filter()) 

1718 if self.id is not None: 

1719 filters.append(self.id.as_sql_filter()) 

1720 if self.version is not None: 

1721 filters.append(self.version.as_sql_filter()) 

1722 

1723 return filters 

1724 

1725 

1726class BlockDocumentFilterIsAnonymous(PrefectFilterBaseModel): 1a

1727 """Filter by `BlockDocument.is_anonymous`.""" 

1728 

1729 eq_: Optional[bool] = Field( 1a

1730 default=None, 

1731 description=( 

1732 "Filter block documents for only those that are or are not anonymous." 

1733 ), 

1734 ) 

1735 

1736 def _get_filter_list( 1a

1737 self, db: "PrefectDBInterface" 

1738 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1739 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1740 if self.eq_ is not None: 

1741 filters.append(db.BlockDocument.is_anonymous.is_(self.eq_)) 

1742 return filters 

1743 

1744 

1745class BlockDocumentFilterBlockTypeId(PrefectFilterBaseModel): 1a

1746 """Filter by `BlockDocument.block_type_id`.""" 

1747 

1748 any_: Optional[list[UUID]] = Field( 1a

1749 default=None, description="A list of block type ids to include" 

1750 ) 

1751 

1752 def _get_filter_list( 1a

1753 self, db: "PrefectDBInterface" 

1754 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1755 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1756 if self.any_ is not None: 

1757 filters.append(db.BlockDocument.block_type_id.in_(self.any_)) 

1758 return filters 

1759 

1760 

1761class BlockDocumentFilterId(PrefectFilterBaseModel): 1a

1762 """Filter by `BlockDocument.id`.""" 

1763 

1764 any_: Optional[list[UUID]] = Field( 1a

1765 default=None, description="A list of block ids to include" 

1766 ) 

1767 

1768 def _get_filter_list( 1a

1769 self, db: "PrefectDBInterface" 

1770 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1771 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1772 if self.any_ is not None: 

1773 filters.append(db.BlockDocument.id.in_(self.any_)) 

1774 return filters 

1775 

1776 

1777class BlockDocumentFilterName(PrefectFilterBaseModel): 1a

1778 """Filter by `BlockDocument.name`.""" 

1779 

1780 any_: Optional[list[str]] = Field( 1a

1781 default=None, description="A list of block names to include" 

1782 ) 

1783 like_: Optional[str] = Field( 1a

1784 default=None, 

1785 description=( 

1786 "A string to match block names against. This can include " 

1787 "SQL wildcard characters like `%` and `_`." 

1788 ), 

1789 examples=["my-block%"], 

1790 ) 

1791 

1792 def _get_filter_list( 1a

1793 self, db: "PrefectDBInterface" 

1794 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1795 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1796 if self.any_ is not None: 

1797 filters.append(db.BlockDocument.name.in_(self.any_)) 

1798 if self.like_ is not None: 

1799 filters.append(db.BlockDocument.name.ilike(f"%{self.like_}%")) 

1800 return filters 

1801 

1802 

1803class BlockDocumentFilter(PrefectOperatorFilterBaseModel): 1a

1804 """Filter BlockDocuments. Only BlockDocuments matching all criteria will be returned""" 

1805 

1806 id: Optional[BlockDocumentFilterId] = Field( 1a

1807 default=None, description="Filter criteria for `BlockDocument.id`" 

1808 ) 

1809 is_anonymous: Optional[BlockDocumentFilterIsAnonymous] = Field( 1a

1810 # default is to exclude anonymous blocks 

1811 BlockDocumentFilterIsAnonymous(eq_=False), 

1812 description=( 

1813 "Filter criteria for `BlockDocument.is_anonymous`. " 

1814 "Defaults to excluding anonymous blocks." 

1815 ), 

1816 ) 

1817 block_type_id: Optional[BlockDocumentFilterBlockTypeId] = Field( 1a

1818 default=None, description="Filter criteria for `BlockDocument.block_type_id`" 

1819 ) 

1820 name: Optional[BlockDocumentFilterName] = Field( 1a

1821 default=None, description="Filter criteria for `BlockDocument.name`" 

1822 ) 

1823 

1824 def _get_filter_list( 1a

1825 self, db: "PrefectDBInterface" 

1826 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1827 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1828 if self.id is not None: 

1829 filters.append(self.id.as_sql_filter()) 

1830 if self.is_anonymous is not None: 

1831 filters.append(self.is_anonymous.as_sql_filter()) 

1832 if self.block_type_id is not None: 

1833 filters.append(self.block_type_id.as_sql_filter()) 

1834 if self.name is not None: 

1835 filters.append(self.name.as_sql_filter()) 

1836 return filters 

1837 

1838 

1839class WorkQueueFilterId(PrefectFilterBaseModel): 1a

1840 """Filter by `WorkQueue.id`.""" 

1841 

1842 any_: Optional[list[UUID]] = Field( 1a

1843 default=None, 

1844 description="A list of work queue ids to include", 

1845 ) 

1846 

1847 def _get_filter_list( 1a

1848 self, db: "PrefectDBInterface" 

1849 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1850 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1851 if self.any_ is not None: 

1852 filters.append(db.WorkQueue.id.in_(self.any_)) 

1853 return filters 

1854 

1855 

1856class WorkQueueFilterName(PrefectFilterBaseModel): 1a

1857 """Filter by `WorkQueue.name`.""" 

1858 

1859 any_: Optional[list[str]] = Field( 1a

1860 default=None, 

1861 description="A list of work queue names to include", 

1862 examples=[["wq-1", "wq-2"]], 

1863 ) 

1864 

1865 startswith_: Optional[list[str]] = Field( 1a

1866 default=None, 

1867 description=( 

1868 "A list of case-insensitive starts-with matches. For example, " 

1869 " passing 'marvin' will match " 

1870 "'marvin', and 'Marvin-robot', but not 'sad-marvin'." 

1871 ), 

1872 examples=[["marvin", "Marvin-robot"]], 

1873 ) 

1874 

1875 def _get_filter_list( 1a

1876 self, db: "PrefectDBInterface" 

1877 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1878 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1879 if self.any_ is not None: 

1880 filters.append(db.WorkQueue.name.in_(self.any_)) 

1881 if self.startswith_ is not None: 

1882 filters.append( 

1883 sa.or_( 

1884 *[db.WorkQueue.name.ilike(f"{item}%") for item in self.startswith_] 

1885 ) 

1886 ) 

1887 return filters 

1888 

1889 

1890class WorkQueueFilter(PrefectOperatorFilterBaseModel): 1a

1891 """Filter work queues. Only work queues matching all criteria will be 

1892 returned""" 

1893 

1894 id: Optional[WorkQueueFilterId] = Field( 1a

1895 default=None, description="Filter criteria for `WorkQueue.id`" 

1896 ) 

1897 

1898 name: Optional[WorkQueueFilterName] = Field( 1a

1899 default=None, description="Filter criteria for `WorkQueue.name`" 

1900 ) 

1901 

1902 def _get_filter_list( 1a

1903 self, db: "PrefectDBInterface" 

1904 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1905 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1906 

1907 if self.id is not None: 

1908 filters.append(self.id.as_sql_filter()) 

1909 if self.name is not None: 

1910 filters.append(self.name.as_sql_filter()) 

1911 

1912 return filters 

1913 

1914 

1915class WorkPoolFilterId(PrefectFilterBaseModel): 1a

1916 """Filter by `WorkPool.id`.""" 

1917 

1918 any_: Optional[list[UUID]] = Field( 1a

1919 default=None, description="A list of work pool ids to include" 

1920 ) 

1921 

1922 def _get_filter_list( 1a

1923 self, db: "PrefectDBInterface" 

1924 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1925 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1926 if self.any_ is not None: 

1927 filters.append(db.WorkPool.id.in_(self.any_)) 

1928 return filters 

1929 

1930 

1931class WorkPoolFilterName(PrefectFilterBaseModel): 1a

1932 """Filter by `WorkPool.name`.""" 

1933 

1934 any_: Optional[list[str]] = Field( 1a

1935 default=None, description="A list of work pool names to include" 

1936 ) 

1937 

1938 def _get_filter_list( 1a

1939 self, db: "PrefectDBInterface" 

1940 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1941 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1942 if self.any_ is not None: 

1943 filters.append(db.WorkPool.name.in_(self.any_)) 

1944 return filters 

1945 

1946 

1947class WorkPoolFilterType(PrefectFilterBaseModel): 1a

1948 """Filter by `WorkPool.type`.""" 

1949 

1950 any_: Optional[list[str]] = Field( 1a

1951 default=None, description="A list of work pool types to include" 

1952 ) 

1953 

1954 def _get_filter_list( 1a

1955 self, db: "PrefectDBInterface" 

1956 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1957 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1958 if self.any_ is not None: 

1959 filters.append(db.WorkPool.type.in_(self.any_)) 

1960 return filters 

1961 

1962 

1963class WorkPoolFilter(PrefectOperatorFilterBaseModel): 1a

1964 """Filter work pools. Only work pools matching all criteria will be returned""" 

1965 

1966 id: Optional[WorkPoolFilterId] = Field( 1a

1967 default=None, description="Filter criteria for `WorkPool.id`" 

1968 ) 

1969 name: Optional[WorkPoolFilterName] = Field( 1a

1970 default=None, description="Filter criteria for `WorkPool.name`" 

1971 ) 

1972 type: Optional[WorkPoolFilterType] = Field( 1a

1973 default=None, description="Filter criteria for `WorkPool.type`" 

1974 ) 

1975 

1976 def _get_filter_list( 1a

1977 self, db: "PrefectDBInterface" 

1978 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

1979 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

1980 

1981 if self.id is not None: 

1982 filters.append(self.id.as_sql_filter()) 

1983 if self.name is not None: 

1984 filters.append(self.name.as_sql_filter()) 

1985 if self.type is not None: 

1986 filters.append(self.type.as_sql_filter()) 

1987 

1988 return filters 

1989 

1990 

1991class WorkerFilterWorkPoolId(PrefectFilterBaseModel): 1a

1992 """Filter by `Worker.worker_config_id`.""" 

1993 

1994 any_: Optional[list[UUID]] = Field( 1a

1995 default=None, description="A list of work pool ids to include" 

1996 ) 

1997 

1998 def _get_filter_list( 1a

1999 self, db: "PrefectDBInterface" 

2000 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2001 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2002 if self.any_ is not None: 

2003 filters.append(db.Worker.work_pool_id.in_(self.any_)) 

2004 return filters 

2005 

2006 

2007class WorkerFilterStatus(PrefectFilterBaseModel): 1a

2008 """Filter by `Worker.status`.""" 

2009 

2010 any_: Optional[list[schemas.statuses.WorkerStatus]] = Field( 1a

2011 default=None, description="A list of worker statuses to include" 

2012 ) 

2013 not_any_: Optional[list[schemas.statuses.WorkerStatus]] = Field( 1a

2014 default=None, description="A list of worker statuses to exclude" 

2015 ) 

2016 

2017 def _get_filter_list( 1a

2018 self, db: "PrefectDBInterface" 

2019 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2020 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2021 if self.any_ is not None: 

2022 filters.append(db.Worker.status.in_(self.any_)) 

2023 if self.not_any_ is not None: 

2024 filters.append(db.Worker.status.notin_(self.not_any_)) 

2025 return filters 

2026 

2027 

2028class WorkerFilterLastHeartbeatTime(PrefectFilterBaseModel): 1a

2029 """Filter by `Worker.last_heartbeat_time`.""" 

2030 

2031 before_: Optional[DateTime] = Field( 1a

2032 default=None, 

2033 description=( 

2034 "Only include processes whose last heartbeat was at or before this time" 

2035 ), 

2036 ) 

2037 after_: Optional[DateTime] = Field( 1a

2038 default=None, 

2039 description=( 

2040 "Only include processes whose last heartbeat was at or after this time" 

2041 ), 

2042 ) 

2043 

2044 def _get_filter_list( 1a

2045 self, db: "PrefectDBInterface" 

2046 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2047 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2048 if self.before_ is not None: 

2049 filters.append(db.Worker.last_heartbeat_time <= self.before_) 

2050 if self.after_ is not None: 

2051 filters.append(db.Worker.last_heartbeat_time >= self.after_) 

2052 return filters 

2053 

2054 

2055class WorkerFilter(PrefectOperatorFilterBaseModel): 1a

2056 """Filter by `Worker.last_heartbeat_time`.""" 

2057 

2058 # worker_config_id: Optional[WorkerFilterWorkPoolId] = Field( 

2059 # default=None, description="Filter criteria for `Worker.worker_config_id`" 

2060 # ) 

2061 

2062 last_heartbeat_time: Optional[WorkerFilterLastHeartbeatTime] = Field( 1a

2063 default=None, 

2064 description="Filter criteria for `Worker.last_heartbeat_time`", 

2065 ) 

2066 

2067 status: Optional[WorkerFilterStatus] = Field( 1a

2068 default=None, description="Filter criteria for `Worker.status`" 

2069 ) 

2070 

2071 def _get_filter_list( 1a

2072 self, db: "PrefectDBInterface" 

2073 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2074 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2075 

2076 if self.last_heartbeat_time is not None: 

2077 filters.append(self.last_heartbeat_time.as_sql_filter()) 

2078 

2079 if self.status is not None: 

2080 filters.append(self.status.as_sql_filter()) 

2081 

2082 return filters 

2083 

2084 

2085class ArtifactFilterId(PrefectFilterBaseModel): 1a

2086 """Filter by `Artifact.id`.""" 

2087 

2088 any_: Optional[list[UUID]] = Field( 1a

2089 default=None, description="A list of artifact ids to include" 

2090 ) 

2091 

2092 def _get_filter_list( 1a

2093 self, db: "PrefectDBInterface" 

2094 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2095 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2096 if self.any_ is not None: 

2097 filters.append(db.Artifact.id.in_(self.any_)) 

2098 return filters 

2099 

2100 

2101class ArtifactFilterKey(PrefectFilterBaseModel): 1a

2102 """Filter by `Artifact.key`.""" 

2103 

2104 any_: Optional[list[str]] = Field( 1a

2105 default=None, description="A list of artifact keys to include" 

2106 ) 

2107 

2108 like_: Optional[str] = Field( 1a

2109 default=None, 

2110 description=( 

2111 "A string to match artifact keys against. This can include " 

2112 "SQL wildcard characters like `%` and `_`." 

2113 ), 

2114 examples=["my-artifact-%"], 

2115 ) 

2116 

2117 exists_: Optional[bool] = Field( 1a

2118 default=None, 

2119 description=( 

2120 "If `true`, only include artifacts with a non-null key. If `false`, " 

2121 "only include artifacts with a null key." 

2122 ), 

2123 ) 

2124 

2125 def _get_filter_list( 1a

2126 self, db: "PrefectDBInterface" 

2127 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2128 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2129 if self.any_ is not None: 

2130 filters.append(db.Artifact.key.in_(self.any_)) 

2131 if self.like_ is not None: 

2132 filters.append(db.Artifact.key.ilike(f"%{self.like_}%")) 

2133 if self.exists_ is not None: 

2134 filters.append( 

2135 db.Artifact.key.isnot(None) 

2136 if self.exists_ 

2137 else db.Artifact.key.is_(None) 

2138 ) 

2139 return filters 

2140 

2141 

2142class ArtifactFilterFlowRunId(PrefectFilterBaseModel): 1a

2143 """Filter by `Artifact.flow_run_id`.""" 

2144 

2145 any_: Optional[list[UUID]] = Field( 1a

2146 default=None, description="A list of flow run IDs to include" 

2147 ) 

2148 

2149 def _get_filter_list( 1a

2150 self, db: "PrefectDBInterface" 

2151 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2152 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2153 if self.any_ is not None: 

2154 filters.append(db.Artifact.flow_run_id.in_(self.any_)) 

2155 return filters 

2156 

2157 

2158class ArtifactFilterTaskRunId(PrefectFilterBaseModel): 1a

2159 """Filter by `Artifact.task_run_id`.""" 

2160 

2161 any_: Optional[list[UUID]] = Field( 1a

2162 default=None, description="A list of task run IDs to include" 

2163 ) 

2164 

2165 def _get_filter_list( 1a

2166 self, db: "PrefectDBInterface" 

2167 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2168 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2169 if self.any_ is not None: 

2170 filters.append(db.Artifact.task_run_id.in_(self.any_)) 

2171 return filters 

2172 

2173 

2174class ArtifactFilterType(PrefectFilterBaseModel): 1a

2175 """Filter by `Artifact.type`.""" 

2176 

2177 any_: Optional[list[str]] = Field( 1a

2178 default=None, description="A list of artifact types to include" 

2179 ) 

2180 not_any_: Optional[list[str]] = Field( 1a

2181 default=None, description="A list of artifact types to exclude" 

2182 ) 

2183 

2184 def _get_filter_list( 1a

2185 self, db: "PrefectDBInterface" 

2186 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2187 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2188 if self.any_ is not None: 

2189 filters.append(db.Artifact.type.in_(self.any_)) 

2190 if self.not_any_ is not None: 

2191 filters.append(db.Artifact.type.notin_(self.not_any_)) 

2192 return filters 

2193 

2194 

2195class ArtifactFilter(PrefectOperatorFilterBaseModel): 1a

2196 """Filter artifacts. Only artifacts matching all criteria will be returned""" 

2197 

2198 id: Optional[ArtifactFilterId] = Field( 1a

2199 default=None, description="Filter criteria for `Artifact.id`" 

2200 ) 

2201 key: Optional[ArtifactFilterKey] = Field( 1a

2202 default=None, description="Filter criteria for `Artifact.key`" 

2203 ) 

2204 flow_run_id: Optional[ArtifactFilterFlowRunId] = Field( 1a

2205 default=None, description="Filter criteria for `Artifact.flow_run_id`" 

2206 ) 

2207 task_run_id: Optional[ArtifactFilterTaskRunId] = Field( 1a

2208 default=None, description="Filter criteria for `Artifact.task_run_id`" 

2209 ) 

2210 type: Optional[ArtifactFilterType] = Field( 1a

2211 default=None, description="Filter criteria for `Artifact.type`" 

2212 ) 

2213 

2214 def _get_filter_list( 1a

2215 self, db: "PrefectDBInterface" 

2216 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2217 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2218 

2219 if self.id is not None: 

2220 filters.append(self.id.as_sql_filter()) 

2221 if self.key is not None: 

2222 filters.append(self.key.as_sql_filter()) 

2223 if self.flow_run_id is not None: 

2224 filters.append(self.flow_run_id.as_sql_filter()) 

2225 if self.task_run_id is not None: 

2226 filters.append(self.task_run_id.as_sql_filter()) 

2227 if self.type is not None: 

2228 filters.append(self.type.as_sql_filter()) 

2229 

2230 return filters 

2231 

2232 

2233class ArtifactCollectionFilterLatestId(PrefectFilterBaseModel): 1a

2234 """Filter by `ArtifactCollection.latest_id`.""" 

2235 

2236 any_: Optional[list[UUID]] = Field( 1a

2237 default=None, description="A list of artifact ids to include" 

2238 ) 

2239 

2240 def _get_filter_list( 1a

2241 self, db: "PrefectDBInterface" 

2242 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2243 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2244 if self.any_ is not None: 

2245 filters.append(db.ArtifactCollection.latest_id.in_(self.any_)) 

2246 return filters 

2247 

2248 

2249class ArtifactCollectionFilterKey(PrefectFilterBaseModel): 1a

2250 """Filter by `ArtifactCollection.key`.""" 

2251 

2252 any_: Optional[list[str]] = Field( 1a

2253 default=None, description="A list of artifact keys to include" 

2254 ) 

2255 

2256 like_: Optional[str] = Field( 1a

2257 default=None, 

2258 description=( 

2259 "A string to match artifact keys against. This can include " 

2260 "SQL wildcard characters like `%` and `_`." 

2261 ), 

2262 examples=["my-artifact-%"], 

2263 ) 

2264 

2265 exists_: Optional[bool] = Field( 1a

2266 default=None, 

2267 description=( 

2268 "If `true`, only include artifacts with a non-null key. If `false`, " 

2269 "only include artifacts with a null key. Should return all rows in " 

2270 "the ArtifactCollection table if specified." 

2271 ), 

2272 ) 

2273 

2274 def _get_filter_list( 1a

2275 self, db: "PrefectDBInterface" 

2276 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2277 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2278 if self.any_ is not None: 

2279 filters.append(db.ArtifactCollection.key.in_(self.any_)) 

2280 if self.like_ is not None: 

2281 filters.append(db.ArtifactCollection.key.ilike(f"%{self.like_}%")) 

2282 if self.exists_ is not None: 

2283 filters.append( 

2284 db.ArtifactCollection.key.isnot(None) 

2285 if self.exists_ 

2286 else db.ArtifactCollection.key.is_(None) 

2287 ) 

2288 return filters 

2289 

2290 

2291class ArtifactCollectionFilterFlowRunId(PrefectFilterBaseModel): 1a

2292 """Filter by `ArtifactCollection.flow_run_id`.""" 

2293 

2294 any_: Optional[list[UUID]] = Field( 1a

2295 default=None, description="A list of flow run IDs to include" 

2296 ) 

2297 

2298 def _get_filter_list( 1a

2299 self, db: "PrefectDBInterface" 

2300 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2301 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2302 if self.any_ is not None: 

2303 filters.append(db.ArtifactCollection.flow_run_id.in_(self.any_)) 

2304 return filters 

2305 

2306 

2307class ArtifactCollectionFilterTaskRunId(PrefectFilterBaseModel): 1a

2308 """Filter by `ArtifactCollection.task_run_id`.""" 

2309 

2310 any_: Optional[list[UUID]] = Field( 1a

2311 default=None, description="A list of task run IDs to include" 

2312 ) 

2313 

2314 def _get_filter_list( 1a

2315 self, db: "PrefectDBInterface" 

2316 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2317 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2318 if self.any_ is not None: 

2319 filters.append(db.ArtifactCollection.task_run_id.in_(self.any_)) 

2320 return filters 

2321 

2322 

2323class ArtifactCollectionFilterType(PrefectFilterBaseModel): 1a

2324 """Filter by `ArtifactCollection.type`.""" 

2325 

2326 any_: Optional[list[str]] = Field( 1a

2327 default=None, description="A list of artifact types to include" 

2328 ) 

2329 not_any_: Optional[list[str]] = Field( 1a

2330 default=None, description="A list of artifact types to exclude" 

2331 ) 

2332 

2333 def _get_filter_list( 1a

2334 self, db: "PrefectDBInterface" 

2335 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2336 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2337 if self.any_ is not None: 

2338 filters.append(db.ArtifactCollection.type.in_(self.any_)) 

2339 if self.not_any_ is not None: 

2340 filters.append(db.ArtifactCollection.type.notin_(self.not_any_)) 

2341 return filters 

2342 

2343 

2344class ArtifactCollectionFilter(PrefectOperatorFilterBaseModel): 1a

2345 """Filter artifact collections. Only artifact collections matching all criteria will be returned""" 

2346 

2347 latest_id: Optional[ArtifactCollectionFilterLatestId] = Field( 1a

2348 default=None, description="Filter criteria for `Artifact.id`" 

2349 ) 

2350 key: Optional[ArtifactCollectionFilterKey] = Field( 1a

2351 default=None, description="Filter criteria for `Artifact.key`" 

2352 ) 

2353 flow_run_id: Optional[ArtifactCollectionFilterFlowRunId] = Field( 1a

2354 default=None, description="Filter criteria for `Artifact.flow_run_id`" 

2355 ) 

2356 task_run_id: Optional[ArtifactCollectionFilterTaskRunId] = Field( 1a

2357 default=None, description="Filter criteria for `Artifact.task_run_id`" 

2358 ) 

2359 type: Optional[ArtifactCollectionFilterType] = Field( 1a

2360 default=None, description="Filter criteria for `Artifact.type`" 

2361 ) 

2362 

2363 def _get_filter_list( 1a

2364 self, db: "PrefectDBInterface" 

2365 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2366 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2367 

2368 if self.latest_id is not None: 

2369 filters.append(self.latest_id.as_sql_filter()) 

2370 if self.key is not None: 

2371 filters.append(self.key.as_sql_filter()) 

2372 if self.flow_run_id is not None: 

2373 filters.append(self.flow_run_id.as_sql_filter()) 

2374 if self.task_run_id is not None: 

2375 filters.append(self.task_run_id.as_sql_filter()) 

2376 if self.type is not None: 

2377 filters.append(self.type.as_sql_filter()) 

2378 

2379 return filters 

2380 

2381 

2382class VariableFilterId(PrefectFilterBaseModel): 1a

2383 """Filter by `Variable.id`.""" 

2384 

2385 any_: Optional[list[UUID]] = Field( 1a

2386 default=None, description="A list of variable ids to include" 

2387 ) 

2388 

2389 def _get_filter_list( 1a

2390 self, db: "PrefectDBInterface" 

2391 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2392 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2393 if self.any_ is not None: 

2394 filters.append(db.Variable.id.in_(self.any_)) 

2395 return filters 

2396 

2397 

2398class VariableFilterName(PrefectFilterBaseModel): 1a

2399 """Filter by `Variable.name`.""" 

2400 

2401 any_: Optional[list[str]] = Field( 1a

2402 default=None, description="A list of variables names to include" 

2403 ) 

2404 like_: Optional[str] = Field( 1a

2405 default=None, 

2406 description=( 

2407 "A string to match variable names against. This can include " 

2408 "SQL wildcard characters like `%` and `_`." 

2409 ), 

2410 examples=["my_variable_%"], 

2411 ) 

2412 

2413 def _get_filter_list( 1a

2414 self, db: "PrefectDBInterface" 

2415 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2416 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2417 if self.any_ is not None: 

2418 filters.append(db.Variable.name.in_(self.any_)) 

2419 if self.like_ is not None: 

2420 filters.append(db.Variable.name.ilike(f"%{self.like_}%")) 

2421 return filters 

2422 

2423 

2424class VariableFilterTags(PrefectOperatorFilterBaseModel): 1a

2425 """Filter by `Variable.tags`.""" 

2426 

2427 all_: Optional[list[str]] = Field( 1a

2428 default=None, 

2429 examples=[["tag-1", "tag-2"]], 

2430 description=( 

2431 "A list of tags. Variables will be returned only if their tags are a" 

2432 " superset of the list" 

2433 ), 

2434 ) 

2435 is_null_: Optional[bool] = Field( 1a

2436 default=None, description="If true, only include Variables without tags" 

2437 ) 

2438 

2439 def _get_filter_list( 1a

2440 self, db: "PrefectDBInterface" 

2441 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2442 filters: list[sa.ColumnElement[bool]] = [] 

2443 if self.all_ is not None: 

2444 filters.append(db.Variable.tags.has_all(_as_array(self.all_))) 

2445 if self.is_null_ is not None: 

2446 filters.append( 

2447 db.Variable.tags == [] if self.is_null_ else db.Variable.tags != [] 

2448 ) 

2449 return filters 

2450 

2451 

2452class VariableFilter(PrefectOperatorFilterBaseModel): 1a

2453 """Filter variables. Only variables matching all criteria will be returned""" 

2454 

2455 id: Optional[VariableFilterId] = Field( 1a

2456 default=None, description="Filter criteria for `Variable.id`" 

2457 ) 

2458 name: Optional[VariableFilterName] = Field( 1a

2459 default=None, description="Filter criteria for `Variable.name`" 

2460 ) 

2461 tags: Optional[VariableFilterTags] = Field( 1a

2462 default=None, description="Filter criteria for `Variable.tags`" 

2463 ) 

2464 

2465 def _get_filter_list( 1a

2466 self, db: "PrefectDBInterface" 

2467 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

2468 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

2469 

2470 if self.id is not None: 

2471 filters.append(self.id.as_sql_filter()) 

2472 if self.name is not None: 

2473 filters.append(self.name.as_sql_filter()) 

2474 if self.tags is not None: 

2475 filters.append(self.tags.as_sql_filter()) 

2476 return filters