Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/filters.py: 19%

411 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import sys 1a

4from collections.abc import Iterable 1a

5from dataclasses import dataclass, field 1a

6from datetime import datetime, timedelta 1a

7from typing import TYPE_CHECKING, Optional, Sequence, Union 1a

8from uuid import UUID 1a

9from zoneinfo import ZoneInfo 1a

10 

11import sqlalchemy as sa 1a

12from pydantic import Field, PrivateAttr 1a

13from sqlalchemy.sql import Select 1a

14 

15import prefect.types._datetime 1a

16from prefect.server.database import PrefectDBInterface, db_injector 1a

17from prefect.server.schemas.filters import ( 1a

18 PrefectFilterBaseModel, 

19 PrefectOperatorFilterBaseModel, 

20) 

21from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a

22from prefect.server.utilities.text_search_parser import ( 1a

23 parse_text_search_query, 

24) 

25from prefect.types import DateTime 1a

26from prefect.utilities.collections import AutoEnum 1a

27 

28from .schemas.events import Event, Resource, ResourceSpecification 1a

29 

30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a

31 from sqlalchemy.sql.expression import ColumnElement, ColumnExpressionArgument 

32 

33 

34class AutomationFilterCreated(PrefectFilterBaseModel): 1a

35 """Filter by `Automation.created`.""" 

36 

37 before_: Optional[DateTime] = Field( 1a

38 default=None, 

39 description="Only include automations created before this datetime", 

40 ) 

41 

42 def _get_filter_list( 1a

43 self, db: PrefectDBInterface 

44 ) -> Iterable[sa.ColumnElement[bool]]: 

45 if self.before_ is not None: 

46 return [db.Automation.created <= self.before_] 

47 return () 

48 

49 

50class AutomationFilterName(PrefectFilterBaseModel): 1a

51 """Filter by `Automation.created`.""" 

52 

53 any_: Optional[list[str]] = Field( 1a

54 default=None, 

55 description="Only include automations with names that match any of these strings", 

56 ) 

57 

58 def _get_filter_list(self, db: PrefectDBInterface) -> list[sa.ColumnElement[bool]]: 1a

59 if self.any_ is not None: 

60 return [db.Automation.name.in_(self.any_)] 

61 return [] 

62 

63 

64class AutomationFilterTags(PrefectOperatorFilterBaseModel): 1a

65 """Filter by `Automation.tags`.""" 

66 

67 all_: Optional[list[str]] = Field( 1a

68 default=None, 

69 examples=[["tag-1", "tag-2"]], 

70 description="A list of tags. Automations will be returned only if their tags are a superset of the list", 

71 ) 

72 any_: Optional[list[str]] = Field( 1a

73 default=None, 

74 examples=[["tag-1", "tag-2"]], 

75 description="A list of tags. Automations will be returned if their tags contain any of the tags in the list", 

76 ) 

77 is_null_: Optional[bool] = Field( 1a

78 default=None, description="If true, only include automations without tags" 

79 ) 

80 

81 def _get_filter_list( 1a

82 self, db: PrefectDBInterface 

83 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

84 from prefect.server.schemas.filters import _as_array 

85 

86 filters: list[sa.ColumnElement[bool]] = [] 

87 if self.all_ is not None: 

88 filters.append(db.Automation.tags.has_all(_as_array(self.all_))) 

89 if self.any_ is not None: 

90 filters.append(db.Automation.tags.has_any(_as_array(self.any_))) 

91 if self.is_null_ is not None: 

92 filters.append( 

93 db.Automation.tags == [] if self.is_null_ else db.Automation.tags != [] 

94 ) 

95 return filters 

96 

97 

98class AutomationFilter(PrefectOperatorFilterBaseModel): 1a

99 name: Optional[AutomationFilterName] = Field( 1a

100 default=None, description="Filter criteria for `Automation.name`" 

101 ) 

102 created: Optional[AutomationFilterCreated] = Field( 1a

103 default=None, description="Filter criteria for `Automation.created`" 

104 ) 

105 tags: Optional[AutomationFilterTags] = Field( 1a

106 default=None, description="Filter criteria for `Automation.tags`" 

107 ) 

108 

109 def _get_filter_list( 1a

110 self, db: PrefectDBInterface 

111 ) -> Iterable[sa.ColumnExpressionArgument[bool]]: 

112 filters: list[sa.ColumnExpressionArgument[bool]] = [] 

113 

114 if self.name is not None: 

115 filters.append(self.name.as_sql_filter()) 

116 if self.created is not None: 

117 filters.append(self.created.as_sql_filter()) 

118 if self.tags is not None: 

119 filters.append(self.tags.as_sql_filter()) 

120 

121 return filters 

122 

123 

124class EventDataFilter(PrefectBaseModel, extra="forbid"): 1a

125 """A base class for filtering event data.""" 

126 

127 _top_level_filter: Optional[Select[tuple[UUID]]] = PrivateAttr(None) 1a

128 

129 def get_filters(self) -> list["EventDataFilter"]: 1a

130 filters: list[EventDataFilter] = [] 

131 for filter in [ 

132 getattr(self, name) for name in self.__class__.model_fields.keys() 

133 ]: 

134 # Any embedded list of filters are flattened and thus ANDed together 

135 subfilters: list[EventDataFilter] = ( 

136 filter if isinstance(filter, list) else [filter] 

137 ) 

138 

139 for subfilter in subfilters: 

140 if not isinstance(subfilter, EventDataFilter): 

141 continue 

142 

143 subfilter._top_level_filter = self._top_level_filter 

144 filters.append(subfilter) 

145 

146 return filters 

147 

148 def includes(self, event: Event) -> bool: 1a

149 """Does the given event match the criteria of this filter?""" 

150 return all(filter.includes(event) for filter in self.get_filters()) 

151 

152 def excludes(self, event: Event) -> bool: 1a

153 """Would the given filter exclude this event?""" 

154 return not self.includes(event) 

155 

156 def build_where_clauses(self) -> Sequence["ColumnExpressionArgument[bool]"]: 1a

157 """Convert the criteria to a WHERE clause.""" 

158 clauses: list["ColumnExpressionArgument[bool]"] = [] 

159 for filter in self.get_filters(): 

160 clauses.extend(filter.build_where_clauses()) 

161 return clauses 

162 

163 

164class EventOccurredFilter(EventDataFilter): 1a

165 since: DateTime = Field( 1a

166 default_factory=lambda: prefect.types._datetime.start_of_day( 

167 prefect.types._datetime.now("UTC") 

168 ) 

169 - timedelta(days=180), 

170 description="Only include events after this time (inclusive)", 

171 ) 

172 until: DateTime = Field( 1a

173 default_factory=lambda: prefect.types._datetime.now("UTC"), 

174 description="Only include events prior to this time (inclusive)", 

175 ) 

176 

177 def clamp(self, max_duration: timedelta) -> None: 1a

178 """Limit how far the query can look back based on the given duration""" 

179 # Using datetime.now() instead of prefect.types._datetime.now() to avoid 

180 # dropping timezone information which happens if pendulum is used 

181 earliest = datetime.now(ZoneInfo("UTC")) - max_duration 

182 self.since = max(earliest, self.since) 

183 

184 def includes(self, event: Event) -> bool: 1a

185 return self.since <= event.occurred <= self.until 

186 

187 @db_injector 1a

188 def build_where_clauses( 1a

189 self, db: PrefectDBInterface 

190 ) -> Sequence["ColumnExpressionArgument[bool]"]: 

191 return [ 

192 db.Event.occurred >= self.since, 

193 db.Event.occurred <= self.until, 

194 ] 

195 

196 

197class EventNameFilter(EventDataFilter): 1a

198 prefix: Optional[list[str]] = Field( 1a

199 default=None, description="Only include events matching one of these prefixes" 

200 ) 

201 exclude_prefix: Optional[list[str]] = Field( 1a

202 default=None, description="Exclude events matching one of these prefixes" 

203 ) 

204 

205 name: Optional[list[str]] = Field( 1a

206 default=None, 

207 description="Only include events matching one of these names exactly", 

208 ) 

209 exclude_name: Optional[list[str]] = Field( 1a

210 default=None, description="Exclude events matching one of these names exactly" 

211 ) 

212 

213 def includes(self, event: Event) -> bool: 1a

214 if self.prefix: 

215 if not any(event.event.startswith(prefix) for prefix in self.prefix): 

216 return False 

217 

218 if self.exclude_prefix: 

219 if any(event.event.startswith(prefix) for prefix in self.exclude_prefix): 

220 return False 

221 

222 if self.name: 

223 if not any(event.event == name for name in self.name): 

224 return False 

225 

226 if self.exclude_name: 

227 if any(event.event == name for name in self.exclude_name): 

228 return False 

229 

230 return True 

231 

232 @db_injector 1a

233 def build_where_clauses( 1a

234 self, db: PrefectDBInterface 

235 ) -> Sequence["ColumnExpressionArgument[bool]"]: 

236 filters: list["ColumnExpressionArgument[bool]"] = [] 

237 

238 if self.prefix: 

239 filters.append( 

240 sa.or_(*(db.Event.event.startswith(prefix) for prefix in self.prefix)) 

241 ) 

242 

243 if self.exclude_prefix: 

244 filters.extend( 

245 sa.not_(db.Event.event.startswith(prefix)) 

246 for prefix in self.exclude_prefix 

247 ) 

248 

249 if self.name: 

250 filters.append(db.Event.event.in_(self.name)) 

251 

252 if self.exclude_name: 

253 filters.append(db.Event.event.not_in(self.exclude_name)) 

254 

255 return filters 

256 

257 

258@dataclass 1a

259class LabelSet: 1a

260 simple: list[str] = field(default_factory=list) 1a

261 prefixes: list[str] = field(default_factory=list) 1a

262 

263 

264@dataclass 1a

265class LabelOperations: 1a

266 values: list[str] 1a

267 positive: LabelSet = field(default_factory=LabelSet) 1a

268 negative: LabelSet = field(default_factory=LabelSet) 1a

269 

270 def __post_init__(self) -> None: 1a

271 for value in self.values: 

272 label_set = self.positive 

273 if value.startswith("!"): 

274 label_set = self.negative 

275 value = value[1:] 

276 

277 if value.endswith("*"): 

278 label_set.prefixes.append(value.rstrip("*")) 

279 else: 

280 label_set.simple.append(value) 

281 

282 

283class EventResourceFilter(EventDataFilter): 1a

284 id: Optional[list[str]] = Field( 1a

285 default=None, description="Only include events for resources with these IDs" 

286 ) 

287 id_prefix: Optional[list[str]] = Field( 1a

288 default=None, 

289 description=( 

290 "Only include events for resources with IDs starting with these prefixes." 

291 ), 

292 ) 

293 labels: Optional[ResourceSpecification] = Field( 1a

294 default=None, description="Only include events for resources with these labels" 

295 ) 

296 distinct: bool = Field( 1a

297 default=False, 

298 description="Only include events for distinct resources", 

299 ) 

300 

301 def includes(self, event: Event) -> bool: 1a

302 if self.id: 

303 if not any(event.resource.id == resource_id for resource_id in self.id): 

304 return False 

305 

306 if self.id_prefix: 

307 if not any( 

308 event.resource.id.startswith(prefix) for prefix in self.id_prefix 

309 ): 

310 return False 

311 

312 if self.labels: 

313 if not self.labels.matches(event.resource): 

314 return False 

315 

316 return True 

317 

318 @db_injector 1a

319 def build_where_clauses( 1a

320 self, db: PrefectDBInterface 

321 ) -> Sequence["ColumnExpressionArgument[bool]"]: 

322 filters: list["ColumnExpressionArgument[bool]"] = [] 

323 

324 # If we're doing an exact or prefix search on resource_id, this is efficient 

325 # enough to do on the events table without going to the event_resources table 

326 

327 if self.id: 

328 filters.append(db.Event.resource_id.in_(self.id)) 

329 

330 if self.id_prefix: 

331 filters.append( 

332 sa.or_( 

333 *( 

334 db.Event.resource_id.startswith(prefix) 

335 for prefix in self.id_prefix 

336 ) 

337 ) 

338 ) 

339 

340 if self.labels: 

341 labels = self.labels.deepcopy() 

342 

343 # We are explicitly searching for the primary resource here so the 

344 # resource_role must be '' 

345 label_filters = [db.EventResource.resource_role == ""] 

346 

347 # On the event_resources table, resource_id is unpacked 

348 # into a column, so we should search for it there 

349 if resource_ids := labels.pop("prefect.resource.id", None): 

350 label_ops = LabelOperations(resource_ids) 

351 

352 resource_id_column = db.EventResource.resource_id 

353 

354 if values := label_ops.positive.simple: 

355 label_filters.append(resource_id_column.in_(values)) 

356 if values := label_ops.negative.simple: 

357 label_filters.append(resource_id_column.not_in(values)) 

358 for prefix in label_ops.positive.prefixes: 

359 label_filters.append(resource_id_column.startswith(prefix)) 

360 for prefix in label_ops.negative.prefixes: 

361 label_filters.append(sa.not_(resource_id_column.startswith(prefix))) 

362 

363 if labels: 

364 for _, (label, values) in enumerate(labels.items()): 

365 # Empty label value arrays should match nothing 

366 if not values: 

367 label_filters.append(sa.false()) 

368 continue 

369 

370 label_ops = LabelOperations(values) 

371 

372 label_column = db.EventResource.resource[label].astext 

373 

374 # With negative labels, the resource _must_ have the label 

375 if label_ops.negative.simple or label_ops.negative.prefixes: 

376 label_filters.append(label_column.is_not(None)) 

377 

378 if values := label_ops.positive.simple: 

379 label_filters.append(label_column.in_(values)) 

380 if values := label_ops.negative.simple: 

381 label_filters.append(label_column.notin_(values)) 

382 for prefix in label_ops.positive.prefixes: 

383 label_filters.append(label_column.startswith(prefix)) 

384 for prefix in label_ops.negative.prefixes: 

385 label_filters.append(sa.not_(label_column.startswith(prefix))) 

386 

387 assert self._top_level_filter is not None 

388 filters.append( 

389 db.Event.id.in_(self._top_level_filter.where(*label_filters)) 

390 ) 

391 

392 return filters 

393 

394 

395class EventRelatedFilter(EventDataFilter): 1a

396 id: Optional[list[str]] = Field( 1a

397 None, description="Only include events for related resources with these IDs" 

398 ) 

399 role: Optional[list[str]] = Field( 1a

400 None, description="Only include events for related resources in these roles" 

401 ) 

402 resources_in_roles: Optional[list[tuple[str, str]]] = Field( 1a

403 None, 

404 description=( 

405 "Only include events with specific related resources in specific roles" 

406 ), 

407 ) 

408 labels: Optional[ResourceSpecification] = Field( 1a

409 None, description="Only include events for related resources with these labels" 

410 ) 

411 

412 @db_injector 1a

413 def build_where_clauses( 1a

414 self, db: PrefectDBInterface 

415 ) -> Sequence["ColumnExpressionArgument[bool]"]: 

416 filters: list["ColumnExpressionArgument[bool]"] = [] 

417 

418 if self.id: 

419 filters.append(db.EventResource.resource_id.in_(self.id)) 

420 

421 if self.role: 

422 filters.append(db.EventResource.resource_role.in_(self.role)) 

423 

424 if self.resources_in_roles: 

425 filters.append( 

426 sa.or_( 

427 *( 

428 sa.and_( 

429 db.EventResource.resource_id == resource_id, 

430 db.EventResource.resource_role == role, 

431 ) 

432 for resource_id, role in self.resources_in_roles 

433 ) 

434 ) 

435 ) 

436 

437 if self.labels: 

438 label_filters: list[ColumnElement[bool]] = [] 

439 labels = self.labels.deepcopy() 

440 

441 # On the event_resources table, resource_id and resource_role are unpacked 

442 # into columns, so we should search there for them 

443 if resource_ids := labels.pop("prefect.resource.id", None): 

444 label_ops = LabelOperations(resource_ids) 

445 

446 resource_id_column = db.EventResource.resource_id 

447 

448 if values := label_ops.positive.simple: 

449 label_filters.append(resource_id_column.in_(values)) 

450 if values := label_ops.negative.simple: 

451 label_filters.append(resource_id_column.notin_(values)) 

452 for prefix in label_ops.positive.prefixes: 

453 label_filters.append(resource_id_column.startswith(prefix)) 

454 for prefix in label_ops.negative.prefixes: 

455 label_filters.append(sa.not_(resource_id_column.startswith(prefix))) 

456 

457 if roles := labels.pop("prefect.resource.role", None): 

458 label_filters.append(db.EventResource.resource_role.in_(roles)) 

459 

460 if labels: 

461 for _, (label, values) in enumerate(labels.items()): 

462 # Empty label value arrays should match nothing 

463 if not values: 

464 label_filters.append(sa.false()) 

465 continue 

466 

467 label_ops = LabelOperations(values) 

468 

469 label_column = db.EventResource.resource[label].astext 

470 

471 if label_ops.negative.simple or label_ops.negative.prefixes: 

472 label_filters.append(label_column.is_not(None)) 

473 

474 if values := label_ops.positive.simple: 

475 label_filters.append(label_column.in_(values)) 

476 if values := label_ops.negative.simple: 

477 label_filters.append(label_column.notin_(values)) 

478 for prefix in label_ops.positive.prefixes: 

479 label_filters.append(label_column.startswith(prefix)) 

480 for prefix in label_ops.negative.prefixes: 

481 label_filters.append(sa.not_(label_column.startswith(prefix))) 

482 

483 filters.append(sa.and_(*label_filters)) 

484 

485 if filters: 

486 # This filter is explicitly searching for related resources, so if no other 

487 # role is specified, and we're doing any kind of filtering with this filter, 

488 # also filter out primary resources (those with an empty role) for any of 

489 # these queries 

490 if not self.role: 

491 filters.append(db.EventResource.resource_role != "") 

492 

493 assert self._top_level_filter is not None 

494 filters = [db.Event.id.in_(self._top_level_filter.where(*filters))] 

495 

496 return filters 

497 

498 

499class EventAnyResourceFilter(EventDataFilter): 1a

500 id: Optional[list[str]] = Field( 1a

501 default=None, description="Only include events for resources with these IDs" 

502 ) 

503 id_prefix: Optional[list[str]] = Field( 1a

504 default=None, 

505 description=( 

506 "Only include events for resources with IDs starting with these prefixes" 

507 ), 

508 ) 

509 labels: Optional[ResourceSpecification] = Field( 1a

510 default=None, 

511 description="Only include events for related resources with these labels", 

512 ) 

513 

514 def includes(self, event: Event) -> bool: 1a

515 resources = [event.resource] + event.related 

516 if not any(self._includes(resource) for resource in resources): 

517 return False 

518 return True 

519 

520 def _includes(self, resource: Resource) -> bool: 1a

521 if self.id: 

522 if not any(resource.id == resource_id for resource_id in self.id): 

523 return False 

524 

525 if self.id_prefix: 

526 if not any(resource.id.startswith(prefix) for prefix in self.id_prefix): 

527 return False 

528 

529 if self.labels: 

530 if not self.labels.matches(resource): 

531 return False 

532 

533 return True 

534 

535 @db_injector 1a

536 def build_where_clauses( 1a

537 self, db: PrefectDBInterface 

538 ) -> Sequence["ColumnExpressionArgument[bool]"]: 

539 filters: list["ColumnExpressionArgument[bool]"] = [] 

540 

541 if self.id: 

542 filters.append(db.EventResource.resource_id.in_(self.id)) 

543 

544 if self.id_prefix: 

545 filters.append( 

546 sa.or_( 

547 *[ 

548 db.EventResource.resource_id.startswith(prefix) 

549 for prefix in self.id_prefix 

550 ] 

551 ) 

552 ) 

553 

554 if self.labels: 

555 label_filters: list[ColumnElement[bool]] = [] 

556 labels = self.labels.deepcopy() 

557 

558 # On the event_resources table, resource_id and resource_role are unpacked 

559 # into columns, so we should search there for them 

560 if resource_ids := labels.pop("prefect.resource.id", None): 

561 label_ops = LabelOperations(resource_ids) 

562 

563 resource_id_column = db.EventResource.resource_id 

564 

565 if values := label_ops.positive.simple: 

566 label_filters.append(resource_id_column.in_(values)) 

567 if values := label_ops.negative.simple: 

568 label_filters.append(resource_id_column.notin_(values)) 

569 for prefix in label_ops.positive.prefixes: 

570 label_filters.append(resource_id_column.startswith(prefix)) 

571 for prefix in label_ops.negative.prefixes: 

572 label_filters.append(sa.not_(resource_id_column.startswith(prefix))) 

573 

574 if roles := labels.pop("prefect.resource.role", None): 

575 label_filters.append(db.EventResource.resource_role.in_(roles)) 

576 

577 if labels: 

578 for _, (label, values) in enumerate(labels.items()): 

579 # Empty label value arrays should match nothing 

580 if not values: 

581 label_filters.append(sa.false()) 

582 continue 

583 

584 label_ops = LabelOperations(values) 

585 

586 label_column = db.EventResource.resource[label].astext 

587 

588 if label_ops.negative.simple or label_ops.negative.prefixes: 

589 label_filters.append(label_column.is_not(None)) 

590 

591 if values := label_ops.positive.simple: 

592 label_filters.append(label_column.in_(values)) 

593 if values := label_ops.negative.simple: 

594 label_filters.append(label_column.notin_(values)) 

595 for prefix in label_ops.positive.prefixes: 

596 label_filters.append(label_column.startswith(prefix)) 

597 for prefix in label_ops.negative.prefixes: 

598 label_filters.append(sa.not_(label_column.startswith(prefix))) 

599 

600 filters.append(sa.and_(*label_filters)) 

601 

602 if filters: 

603 assert self._top_level_filter is not None 

604 filters = [db.Event.id.in_(self._top_level_filter.where(*filters))] 

605 

606 return filters 

607 

608 

609class EventIDFilter(EventDataFilter): 1a

610 id: Optional[list[UUID]] = Field( 1a

611 default=None, description="Only include events with one of these IDs" 

612 ) 

613 

614 def includes(self, event: Event) -> bool: 1a

615 if self.id: 

616 if not any(event.id == id for id in self.id): 

617 return False 

618 

619 return True 

620 

621 @db_injector 1a

622 def build_where_clauses( 1a

623 self, db: PrefectDBInterface 

624 ) -> Sequence["ColumnExpressionArgument[bool]"]: 

625 filters: list["ColumnExpressionArgument[bool]"] = [] 

626 

627 if self.id: 

628 filters.append(db.Event.id.in_(self.id)) 

629 

630 return filters 

631 

632 

633class EventTextFilter(EventDataFilter): 1a

634 """Filter by text search across event content.""" 

635 

636 query: str = Field( 1a

637 description="Text search query string", 

638 examples=[ 

639 "error", 

640 "error -debug", 

641 '"connection timeout"', 

642 "+required -excluded", 

643 ], 

644 max_length=200, 

645 ) 

646 

647 def includes(self, event: Event) -> bool: 1a

648 """Check if this text filter includes the given event.""" 

649 # Parse query into components 

650 parsed = parse_text_search_query(self.query) 

651 

652 # Build searchable text from all event string fields 

653 searchable_text = self._build_searchable_text(event).lower() 

654 

655 # Check include terms (OR logic) 

656 if parsed.include: 

657 include_match = any( 

658 term.lower() in searchable_text for term in parsed.include 

659 ) 

660 if not include_match: 

661 return False 

662 

663 # Check exclude terms (NOT logic) 

664 if parsed.exclude: 

665 exclude_match = any( 

666 term.lower() in searchable_text for term in parsed.exclude 

667 ) 

668 if exclude_match: 

669 return False 

670 

671 # Check required terms (AND logic - future feature) 

672 if parsed.required: 

673 required_match = all( 

674 term.lower() in searchable_text for term in parsed.required 

675 ) 

676 if not required_match: 

677 return False 

678 

679 return True 

680 

681 def _build_searchable_text(self, event: Event) -> str: 1a

682 """Build searchable text from all relevant event fields""" 

683 text_parts = [ 

684 event.event, # Event type/name 

685 ] 

686 

687 # Add resource VALUES only (not keys) 

688 text_parts.extend(event.resource.root.values()) 

689 

690 # Add related resource VALUES only (not keys) 

691 for related in event.related: 

692 text_parts.extend(related.root.values()) 

693 

694 # Add full payload as string (can include keys and values) 

695 text_parts.append(str(event.payload)) 

696 

697 return " ".join(text_parts) 

698 

699 @db_injector 1a

700 def build_where_clauses( 1a

701 self, db: PrefectDBInterface 

702 ) -> Sequence["ColumnExpressionArgument[bool]"]: 

703 """Build SQLAlchemy WHERE clauses for text search""" 

704 filters: list["ColumnExpressionArgument[bool]"] = [] 

705 

706 if not self.query.strip(): 

707 return filters 

708 

709 parsed = parse_text_search_query(self.query) 

710 

711 # Build combined searchable text field 

712 # This concatenates event, resource VALUES only, related resource VALUES only, and full payload 

713 event_field = db.Event.event 

714 

715 payload_field = sa.cast(db.Event.payload, sa.Text) 

716 

717 # For now, include resource and related as JSON strings - this will find the data 

718 # even though it also includes keys. We can optimize later to extract only values. 

719 resource_field = sa.cast(db.Event.resource, sa.Text) 

720 related_field = sa.cast(db.Event.related, sa.Text) 

721 

722 # Combine all searchable fields 

723 searchable_field = sa.func.concat( 

724 event_field, " ", resource_field, " ", related_field, " ", payload_field 

725 ) 

726 

727 # Handle include terms (OR logic) 

728 if parsed.include: 

729 include_conditions = [] 

730 for term in parsed.include: 

731 include_conditions.append( 

732 sa.func.lower(searchable_field).contains(term.lower()) 

733 ) 

734 

735 if include_conditions: 

736 filters.append(sa.or_(*include_conditions)) 

737 

738 # Handle exclude terms (NOT logic) 

739 if parsed.exclude: 

740 exclude_conditions = [] 

741 for term in parsed.exclude: 

742 exclude_conditions.append( 

743 ~sa.func.lower(searchable_field).contains(term.lower()) 

744 ) 

745 

746 if exclude_conditions: 

747 filters.append(sa.and_(*exclude_conditions)) 

748 

749 # Handle required terms (AND logic - future feature) 

750 if parsed.required: 

751 required_conditions = [] 

752 for term in parsed.required: 

753 required_conditions.append( 

754 sa.func.lower(searchable_field).contains(term.lower()) 

755 ) 

756 

757 if required_conditions: 

758 filters.append(sa.and_(*required_conditions)) 

759 

760 return filters 

761 

762 

763class EventOrder(AutoEnum): 1a

764 ASC = "ASC" 1a

765 DESC = "DESC" 1a

766 

767 

768class EventFilter(EventDataFilter): 1a

769 occurred: EventOccurredFilter = Field( 1a

770 default_factory=lambda: EventOccurredFilter(), 

771 description="Filter criteria for when the events occurred", 

772 ) 

773 event: Optional[EventNameFilter] = Field( 1a

774 default=None, 

775 description="Filter criteria for the event name", 

776 ) 

777 resource: Optional[EventResourceFilter] = Field( 1a

778 default=None, description="Filter criteria for the resource of the event" 

779 ) 

780 related: Optional[Union[EventRelatedFilter, list[EventRelatedFilter]]] = Field( 1a

781 default=None, 

782 description="Filter criteria for the related resources of the event", 

783 ) 

784 any_resource: Optional[ 1a

785 Union[EventAnyResourceFilter, list[EventAnyResourceFilter]] 

786 ] = Field( 

787 default=None, 

788 description="Filter criteria for any resource involved in the event", 

789 ) 

790 id: EventIDFilter = Field( 1a

791 default_factory=lambda: EventIDFilter(), 

792 description="Filter criteria for the events' ID", 

793 ) 

794 text: Optional[EventTextFilter] = Field( 1a

795 default=None, 

796 description="Filter criteria for text search across event content", 

797 ) 

798 

799 order: EventOrder = Field( 1a

800 default=EventOrder.DESC, 

801 description="The order to return filtered events", 

802 ) 

803 

804 @db_injector 1a

805 def build_where_clauses( 1a

806 self, db: PrefectDBInterface 

807 ) -> Sequence["ColumnExpressionArgument[bool]"]: 

808 self._top_level_filter = self._scoped_event_resources(db) 

809 result = super().build_where_clauses() 

810 self._top_level_filter = None 

811 return result 

812 

813 def _scoped_event_resources(self, db: PrefectDBInterface) -> Select[tuple[UUID]]: 1a

814 """Returns an event_resources query that is scoped to this filter's scope by occurred.""" 

815 query = sa.select(db.EventResource.event_id).where( 

816 db.EventResource.occurred >= self.occurred.since, 

817 db.EventResource.occurred <= self.occurred.until, 

818 ) 

819 return query 

820 

821 @property 1a

822 def logical_limit(self) -> int: 1a

823 """The logical limit for this query, which is a maximum number of rows that it 

824 _could_ return (regardless of what the caller has requested). May be used as 

825 an optimization for DB queries""" 

826 if self.id and self.id.id: 

827 # If we're asking for a specific set of IDs, the most we could get back is 

828 # that number of rows 

829 return len(self.id.id) 

830 

831 return sys.maxsize