Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/filters.py: 38%

117 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3import datetime 1a

4from typing import Optional, Union 1a

5from uuid import UUID 1a

6 

7from pydantic import Field 1a

8 

9import prefect.types._datetime 1a

10from prefect._internal.schemas.bases import PrefectBaseModel 1a

11from prefect.types import DateTime 1a

12from prefect.utilities.collections import AutoEnum 1a

13 

14from .schemas.events import Event, Resource, ResourceSpecification 1a

15 

16 

17class AutomationFilterCreated(PrefectBaseModel): 1a

18 """Filter by `Automation.created`.""" 

19 

20 before_: Optional[DateTime] = Field( 1a

21 default=None, 

22 description="Only include automations created before this datetime", 

23 ) 

24 

25 

26class AutomationFilterName(PrefectBaseModel): 1a

27 """Filter by `Automation.created`.""" 

28 

29 any_: Optional[list[str]] = Field( 1a

30 default=None, 

31 description="Only include automations with names that match any of these strings", 

32 ) 

33 

34 

35class AutomationFilter(PrefectBaseModel): 1a

36 name: Optional[AutomationFilterName] = Field( 1a

37 default=None, description="Filter criteria for `Automation.name`" 

38 ) 

39 created: Optional[AutomationFilterCreated] = Field( 1a

40 default=None, description="Filter criteria for `Automation.created`" 

41 ) 

42 

43 

44class EventDataFilter(PrefectBaseModel, extra="forbid"): # type: ignore[call-arg] 1a

45 """A base class for filtering event data.""" 

46 

47 def get_filters(self) -> list["EventDataFilter"]: 1a

48 filters: list[EventDataFilter] = [] 

49 for filter in [ 

50 getattr(self, name) for name in self.__class__.model_fields.keys() 

51 ]: 

52 # Any embedded list of filters are flattened and thus ANDed together 

53 subfilters: list[EventDataFilter] = ( 

54 filter if isinstance(filter, list) else [filter] 

55 ) 

56 

57 for subfilter in subfilters: 

58 if not isinstance(subfilter, EventDataFilter): 

59 continue 

60 

61 filters.append(subfilter) 

62 

63 return filters 

64 

65 def includes(self, event: Event) -> bool: 1a

66 """Does the given event match the criteria of this filter?""" 

67 return all(filter.includes(event) for filter in self.get_filters()) 

68 

69 def excludes(self, event: Event) -> bool: 1a

70 """Would the given filter exclude this event?""" 

71 return not self.includes(event) 

72 

73 

74class EventOccurredFilter(EventDataFilter): 1a

75 since: DateTime = Field( 1a

76 default_factory=lambda: prefect.types._datetime.start_of_day( 

77 prefect.types._datetime.now("UTC") 

78 ) 

79 - datetime.timedelta(days=180), 

80 description="Only include events after this time (inclusive)", 

81 ) 

82 until: DateTime = Field( 1a

83 default_factory=lambda: prefect.types._datetime.now("UTC"), 

84 description="Only include events prior to this time (inclusive)", 

85 ) 

86 

87 def includes(self, event: Event) -> bool: 1a

88 return self.since <= event.occurred <= self.until 

89 

90 

91class EventNameFilter(EventDataFilter): 1a

92 prefix: Optional[list[str]] = Field( 1a

93 default=None, description="Only include events matching one of these prefixes" 

94 ) 

95 exclude_prefix: Optional[list[str]] = Field( 1a

96 default=None, description="Exclude events matching one of these prefixes" 

97 ) 

98 

99 name: Optional[list[str]] = Field( 1a

100 default=None, 

101 description="Only include events matching one of these names exactly", 

102 ) 

103 exclude_name: Optional[list[str]] = Field( 1a

104 default=None, description="Exclude events matching one of these names exactly" 

105 ) 

106 

107 def includes(self, event: Event) -> bool: 1a

108 if self.prefix: 

109 if not any(event.event.startswith(prefix) for prefix in self.prefix): 

110 return False 

111 

112 if self.exclude_prefix: 

113 if any(event.event.startswith(prefix) for prefix in self.exclude_prefix): 

114 return False 

115 

116 if self.name: 

117 if not any(event.event == name for name in self.name): 

118 return False 

119 

120 if self.exclude_name: 

121 if any(event.event == name for name in self.exclude_name): 

122 return False 

123 

124 return True 

125 

126 

127class EventResourceFilter(EventDataFilter): 1a

128 id: Optional[list[str]] = Field( 1a

129 default=None, description="Only include events for resources with these IDs" 

130 ) 

131 id_prefix: Optional[list[str]] = Field( 1a

132 default=None, 

133 description=( 

134 "Only include events for resources with IDs starting with these prefixes." 

135 ), 

136 ) 

137 labels: Optional[ResourceSpecification] = Field( 1a

138 default=None, description="Only include events for resources with these labels" 

139 ) 

140 distinct: bool = Field( 1a

141 default=False, 

142 description="Only include events for distinct resources", 

143 ) 

144 

145 def includes(self, event: Event) -> bool: 1a

146 if self.id: 

147 if not any(event.resource.id == resource_id for resource_id in self.id): 

148 return False 

149 

150 if self.id_prefix: 

151 if not any( 

152 event.resource.id.startswith(prefix) for prefix in self.id_prefix 

153 ): 

154 return False 

155 

156 if self.labels: 

157 if not self.labels.matches(event.resource): 

158 return False 

159 

160 return True 

161 

162 

163class EventRelatedFilter(EventDataFilter): 1a

164 id: Optional[list[str]] = Field( 1a

165 default=None, 

166 description="Only include events for related resources with these IDs", 

167 ) 

168 role: Optional[list[str]] = Field( 1a

169 default=None, 

170 description="Only include events for related resources in these roles", 

171 ) 

172 resources_in_roles: Optional[list[tuple[str, str]]] = Field( 1a

173 default=None, 

174 description=( 

175 "Only include events with specific related resources in specific roles" 

176 ), 

177 ) 

178 labels: Optional[ResourceSpecification] = Field( 1a

179 default=None, 

180 description="Only include events for related resources with these labels", 

181 ) 

182 

183 

184class EventAnyResourceFilter(EventDataFilter): 1a

185 id: Optional[list[str]] = Field( 1a

186 default=None, description="Only include events for resources with these IDs" 

187 ) 

188 id_prefix: Optional[list[str]] = Field( 1a

189 default=None, 

190 description=( 

191 "Only include events for resources with IDs starting with these prefixes" 

192 ), 

193 ) 

194 labels: Optional[ResourceSpecification] = Field( 1a

195 default=None, 

196 description="Only include events for related resources with these labels", 

197 ) 

198 

199 def includes(self, event: Event) -> bool: 1a

200 resources = [event.resource] + event.related 

201 if not any(self._includes(resource) for resource in resources): 

202 return False 

203 return True 

204 

205 def _includes(self, resource: Resource) -> bool: 1a

206 if self.id: 

207 if not any(resource.id == resource_id for resource_id in self.id): 

208 return False 

209 

210 if self.id_prefix: 

211 if not any(resource.id.startswith(prefix) for prefix in self.id_prefix): 

212 return False 

213 

214 if self.labels: 

215 if not self.labels.matches(resource): 

216 return False 

217 

218 return True 

219 

220 

221class EventIDFilter(EventDataFilter): 1a

222 id: Optional[list[UUID]] = Field( 1a

223 default=None, description="Only include events with one of these IDs" 

224 ) 

225 

226 def includes(self, event: Event) -> bool: 1a

227 if self.id: 

228 if not any(event.id == id for id in self.id): 

229 return False 

230 

231 return True 

232 

233 

234class EventTextFilter(EventDataFilter): 1a

235 """Filter by text search across event content.""" 

236 

237 query: str = Field( 1a

238 description="Text search query string", 

239 examples=[ 

240 "error", 

241 "error -debug", 

242 '"connection timeout"', 

243 "+required -excluded", 

244 ], 

245 max_length=200, 

246 ) 

247 

248 

249class EventOrder(AutoEnum): 1a

250 ASC = "ASC" 1a

251 DESC = "DESC" 1a

252 

253 

254class EventFilter(EventDataFilter): 1a

255 occurred: EventOccurredFilter = Field( 1a

256 default_factory=lambda: EventOccurredFilter(), 

257 description="Filter criteria for when the events occurred", 

258 ) 

259 event: Optional[EventNameFilter] = Field( 1a

260 default=None, 

261 description="Filter criteria for the event name", 

262 ) 

263 resource: Optional[EventResourceFilter] = Field( 1a

264 default=None, 

265 description="Filter criteria for the resource of the event", 

266 ) 

267 related: Optional[Union[EventRelatedFilter, list[EventRelatedFilter]]] = Field( 1a

268 default=None, 

269 description="Filter criteria for the related resources of the event", 

270 ) 

271 any_resource: Optional[ 1a

272 Union[EventAnyResourceFilter, list[EventAnyResourceFilter]] 

273 ] = Field( 

274 default=None, 

275 description="Filter criteria for any resource involved in the event", 

276 ) 

277 id: EventIDFilter = Field( 1a

278 default_factory=lambda: EventIDFilter(id=[]), 

279 description="Filter criteria for the events' ID", 

280 ) 

281 text: Optional[EventTextFilter] = Field( 1a

282 default=None, 

283 description="Filter criteria for text search across event content", 

284 ) 

285 

286 order: EventOrder = Field( 1a

287 default=EventOrder.DESC, 

288 description="The order to return filtered events", 

289 )