Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/events.py: 58%

115 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from datetime import datetime, timedelta 1c

2from typing import Any, Dict, List, MutableMapping, Optional, Set, Union 1c

3from uuid import UUID 1c

4 

5from cachetools import TTLCache 1c

6from sqlalchemy.ext.asyncio import AsyncSession 1c

7 

8from prefect._internal.uuid7 import uuid7 1c

9from prefect.server import models, schemas 1c

10from prefect.server.database.orm_models import ( 1c

11 ORMDeployment, 

12 ORMFlow, 

13 ORMFlowRun, 

14 ORMFlowRunState, 

15 ORMTaskRun, 

16 ORMTaskRunState, 

17 ORMWorkPool, 

18 ORMWorkQueue, 

19) 

20from prefect.server.events.schemas.events import Event 1c

21from prefect.server.models import deployments 1c

22from prefect.server.schemas.statuses import DeploymentStatus 1c

23from prefect.settings import PREFECT_API_EVENTS_RELATED_RESOURCE_CACHE_TTL 1c

24from prefect.types._datetime import DateTime, now 1c

25from prefect.utilities.text import truncated_to 1c

26 

27ResourceData = Dict[str, Dict[str, Any]] 1c

28RelatedResourceList = List[Dict[str, str]] 1c

29 

30 

31# Some users use state messages to convey error messages and large results; let's 

32# truncate them so they don't blow out the size of a message 

33TRUNCATE_STATE_MESSAGES_AT = 100_000 1c

34 

35 

36_flow_run_resource_data_cache: MutableMapping[UUID, ResourceData] = TTLCache( 1c

37 maxsize=1000, 

38 ttl=PREFECT_API_EVENTS_RELATED_RESOURCE_CACHE_TTL.value().total_seconds(), 

39) 

40 

41 

42async def flow_run_state_change_event( 1c

43 session: AsyncSession, 

44 occurred: datetime, 

45 flow_run: ORMFlowRun, 

46 initial_state_id: Optional[UUID], 

47 initial_state: Optional[schemas.states.State], 

48 validated_state_id: Optional[UUID], 

49 validated_state: schemas.states.State, 

50) -> Event: 

51 return Event( 1ab

52 occurred=occurred, 

53 event=f"prefect.flow-run.{validated_state.name}", 

54 resource={ 

55 "prefect.resource.id": f"prefect.flow-run.{flow_run.id}", 

56 "prefect.resource.name": flow_run.name, 

57 "prefect.run-count": str(flow_run.run_count), 

58 "prefect.state-message": truncated_to( 

59 TRUNCATE_STATE_MESSAGES_AT, validated_state.message 

60 ), 

61 "prefect.state-name": validated_state.name or "", 

62 "prefect.state-timestamp": ( 

63 validated_state.timestamp.isoformat() 

64 if validated_state.timestamp 

65 else None 

66 ), 

67 "prefect.state-type": validated_state.type.value, 

68 }, 

69 related=await _flow_run_related_resources_from_orm( 

70 session=session, flow_run=flow_run 

71 ), 

72 payload={ 

73 "intended": { 

74 "from": _state_type(initial_state), 

75 "to": _state_type(validated_state), 

76 }, 

77 "initial_state": state_payload(initial_state), 

78 "validated_state": state_payload(validated_state), 

79 }, 

80 # Here we use the state's ID as the ID of the event as well, in order to 

81 # establish the ordering of the state-change events 

82 id=validated_state_id, 

83 follows=initial_state_id if _timing_is_tight(occurred, initial_state) else None, 

84 ) 

85 

86 

87async def _flow_run_related_resources_from_orm( 1c

88 session: AsyncSession, flow_run: ORMFlowRun 

89) -> RelatedResourceList: 

90 resource_data = _flow_run_resource_data_cache.get(flow_run.id) 1ab

91 if not resource_data: 91 ↛ 119line 91 didn't jump to line 119 because the condition on line 91 was always true1ab

92 flow = await models.flows.read_flow(session=session, flow_id=flow_run.flow_id) 1ab

93 deployment: Optional[ORMDeployment] = None 

94 if flow_run.deployment_id: 

95 deployment = await deployments.read_deployment( 

96 session, deployment_id=flow_run.deployment_id 

97 ) 

98 

99 work_queue = None 

100 if flow_run.work_queue_id: 

101 work_queue = await models.work_queues.read_work_queue( 

102 session, work_queue_id=flow_run.work_queue_id 

103 ) 

104 

105 work_pool = work_queue.work_pool if work_queue is not None else None 

106 

107 task_run: Optional[ORMTaskRun] = None 

108 if flow_run.parent_task_run_id: 

109 task_run = await models.task_runs.read_task_run( 

110 session, 

111 task_run_id=flow_run.parent_task_run_id, 

112 ) 

113 

114 resource_data = _as_resource_data( 

115 flow_run, flow, deployment, work_queue, work_pool, task_run 

116 ) 

117 _flow_run_resource_data_cache[flow_run.id] = resource_data 

118 

119 return _resource_data_as_related_resources( 

120 resource_data, 

121 excluded_kinds=["flow-run"], 

122 ) + _provenance_as_related_resources(flow_run.created_by) 

123 

124 

125def _as_resource_data( 1c

126 flow_run: ORMFlowRun, 

127 flow: Union[ORMFlow, schemas.core.Flow, None], 

128 deployment: Union[ORMDeployment, schemas.responses.DeploymentResponse, None], 

129 work_queue: Union[ORMWorkQueue, schemas.responses.WorkQueueResponse, None], 

130 work_pool: Union[ORMWorkPool, schemas.core.WorkPool, None], 

131 task_run: Union[ORMTaskRun, schemas.core.TaskRun, None] = None, 

132) -> ResourceData: 

133 return { 1ab

134 "flow-run": { 

135 "id": str(flow_run.id), 

136 "name": flow_run.name, 

137 "tags": flow_run.tags if flow_run.tags else [], 

138 "role": "flow-run", 

139 }, 

140 "flow": ( 

141 { 

142 "id": str(flow.id), 

143 "name": flow.name, 

144 "tags": flow.tags if flow.tags else [], 

145 "role": "flow", 

146 } 

147 if flow 

148 else {} 

149 ), 

150 "deployment": ( 

151 { 

152 "id": str(deployment.id), 

153 "name": deployment.name, 

154 "tags": deployment.tags if deployment.tags else [], 

155 "role": "deployment", 

156 } 

157 if deployment 

158 else {} 

159 ), 

160 "work-queue": ( 

161 { 

162 "id": str(work_queue.id), 

163 "name": work_queue.name, 

164 "tags": [], 

165 "role": "work-queue", 

166 } 

167 if work_queue 

168 else {} 

169 ), 

170 "work-pool": ( 

171 { 

172 "id": str(work_pool.id), 

173 "name": work_pool.name, 

174 "tags": [], 

175 "role": "work-pool", 

176 "type": work_pool.type, 

177 } 

178 if work_pool 

179 else {} 

180 ), 

181 "task-run": ( 

182 { 

183 "id": str(task_run.id), 

184 "name": task_run.name, 

185 "tags": task_run.tags if task_run.tags else [], 

186 "role": "task-run", 

187 } 

188 if task_run 

189 else {} 

190 ), 

191 } 

192 

193 

194def _resource_data_as_related_resources( 1c

195 resource_data: ResourceData, 

196 excluded_kinds: Optional[List[str]] = None, 

197) -> RelatedResourceList: 

198 related = [] 1ab

199 tags: Set[str] = set() 1ab

200 

201 if excluded_kinds is None: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true1ab

202 excluded_kinds = [] 

203 

204 for kind, data in resource_data.items(): 1ab

205 tags |= set(data.get("tags", [])) 1ab

206 

207 if kind in excluded_kinds or not data: 1ab

208 continue 1ab

209 

210 related_resource = { 1ab

211 "prefect.resource.id": f"prefect.{kind}.{data['id']}", 

212 "prefect.resource.role": data["role"], 

213 "prefect.resource.name": data["name"], 

214 } 

215 

216 if kind == "work-pool": 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true1ab

217 related_resource["prefect.work-pool.type"] = data["type"] 

218 

219 related.append(related_resource) 1ab

220 

221 related += [ 1ab

222 { 

223 "prefect.resource.id": f"prefect.tag.{tag}", 

224 "prefect.resource.role": "tag", 

225 } 

226 for tag in sorted(tags) 

227 ] 

228 

229 return related 1ab

230 

231 

232def _provenance_as_related_resources( 1c

233 created_by: Optional[schemas.core.CreatedBy], 

234) -> RelatedResourceList: 

235 if not created_by: 235 ↛ 240line 235 didn't jump to line 240 because the condition on line 235 was always true1ab

236 return [] 1ab

237 

238 resource_id: str 

239 

240 if created_by.type == "DEPLOYMENT": 

241 resource_id = f"prefect.deployment.{created_by.id}" 

242 elif created_by.type == "AUTOMATION": 

243 resource_id = f"prefect.automation.{created_by.id}" 

244 else: 

245 return [] 

246 

247 related = { 

248 "prefect.resource.id": resource_id, 

249 "prefect.resource.role": "creator", 

250 } 

251 if created_by.display_value: 

252 related["prefect.resource.name"] = created_by.display_value 

253 return [related] 

254 

255 

256def _state_type( 1c

257 state: Union[ORMFlowRunState, ORMTaskRunState, Optional[schemas.states.State]], 

258) -> Optional[str]: 

259 return str(state.type.value) if state else None 1ab

260 

261 

262def state_payload(state: Optional[schemas.states.State]) -> Optional[Dict[str, str]]: 1c

263 """Given a State, return the essential string parts of it for use in an 

264 event payload""" 

265 if not state: 1ab

266 return None 1ab

267 payload: Dict[str, str] = {"type": state.type.value} 1ab

268 if state.name: 268 ↛ 270line 268 didn't jump to line 270 because the condition on line 268 was always true1ab

269 payload["name"] = state.name 1ab

270 if state.message: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true1ab

271 payload["message"] = truncated_to(TRUNCATE_STATE_MESSAGES_AT, state.message) 

272 if state.is_paused(): 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true1ab

273 payload["pause_reschedule"] = str(state.state_details.pause_reschedule).lower() 

274 return payload 1ab

275 

276 

277def _timing_is_tight( 1c

278 occurred: datetime, 

279 initial_state: Union[ 

280 ORMFlowRunState, ORMTaskRunState, Optional[schemas.states.State] 

281 ], 

282) -> bool: 

283 # Only connect events with event.follows if the timing here is tight, which will 

284 # help us resolve the order of these events if they happen to be delivered out of 

285 # order. If the preceding state change happened a while back, don't worry about 

286 # it because the order is very likely to be unambiguous. 

287 TIGHT_TIMING = timedelta(minutes=5) 1ab

288 if initial_state and initial_state.timestamp: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true1ab

289 return bool(-TIGHT_TIMING < (occurred - initial_state.timestamp) < TIGHT_TIMING) 

290 

291 return False 1ab

292 

293 

294async def deployment_status_event( 1c

295 session: AsyncSession, 

296 deployment_id: UUID, 

297 status: DeploymentStatus, 

298 occurred: DateTime, 

299) -> Event: 

300 deployment = await models.deployments.read_deployment( 

301 session=session, deployment_id=deployment_id 

302 ) 

303 assert deployment 

304 flow = await models.flows.read_flow(session=session, flow_id=deployment.flow_id) 

305 work_queue = ( 

306 await models.workers.read_work_queue( 

307 session=session, 

308 work_queue_id=deployment.work_queue_id, 

309 ) 

310 if deployment.work_queue_id 

311 else None 

312 ) 

313 

314 work_pool = ( 

315 await models.workers.read_work_pool( 

316 session=session, 

317 work_pool_id=work_queue.work_pool_id, 

318 ) 

319 if work_queue and work_queue.work_pool_id 

320 else None 

321 ) 

322 

323 related_work_queue_and_pool_info = [] 

324 

325 if flow is not None: 

326 related_work_queue_and_pool_info.append( 

327 { 

328 "prefect.resource.id": f"prefect.flow.{flow.id}", 

329 "prefect.resource.name": flow.name, 

330 "prefect.resource.role": "flow", 

331 } 

332 ) 

333 

334 if work_queue is not None: 

335 related_work_queue_and_pool_info.append( 

336 { 

337 "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", 

338 "prefect.resource.name": work_queue.name, 

339 "prefect.resource.role": "work-queue", 

340 } 

341 ) 

342 

343 if work_pool is not None: 

344 related_work_queue_and_pool_info.append( 

345 { 

346 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", 

347 "prefect.resource.name": work_pool.name, 

348 "prefect.work-pool.type": work_pool.type, 

349 "prefect.resource.role": "work-pool", 

350 } 

351 ) 

352 

353 return Event( 

354 occurred=occurred, 

355 event=f"prefect.deployment.{status.in_kebab_case()}", 

356 resource={ 

357 "prefect.resource.id": f"prefect.deployment.{deployment.id}", 

358 "prefect.resource.name": f"{deployment.name}", 

359 }, 

360 related=related_work_queue_and_pool_info, 

361 id=uuid7(), 

362 ) 

363 

364 

365async def work_queue_status_event( 1c

366 session: AsyncSession, 

367 work_queue: "ORMWorkQueue", 

368 occurred: DateTime, 

369) -> Event: 

370 related_work_pool_info: List[Dict[str, Any]] = [] 1a

371 

372 if work_queue.work_pool_id: 372 ↛ 388line 372 didn't jump to line 388 because the condition on line 372 was always true1a

373 work_pool = await models.workers.read_work_pool( 1a

374 session=session, 

375 work_pool_id=work_queue.work_pool_id, 

376 ) 

377 

378 if work_pool and work_pool.id and work_pool.name: 378 ↛ 388line 378 didn't jump to line 388 because the condition on line 378 was always true1a

379 related_work_pool_info.append( 1a

380 { 

381 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", 

382 "prefect.resource.name": work_pool.name, 

383 "prefect.work-pool.type": work_pool.type, 

384 "prefect.resource.role": "work-pool", 

385 } 

386 ) 

387 

388 return Event( 1a

389 occurred=occurred, 

390 event=f"prefect.work-queue.{work_queue.status.in_kebab_case()}", 

391 resource={ 

392 "prefect.resource.id": f"prefect.work-queue.{work_queue.id}", 

393 "prefect.resource.name": work_queue.name, 

394 "prefect.resource.role": "work-queue", 

395 }, 

396 related=related_work_pool_info, 

397 id=uuid7(), 

398 ) 

399 

400 

401async def work_pool_status_event( 1c

402 event_id: UUID, 

403 occurred: DateTime, 

404 pre_update_work_pool: Optional["ORMWorkPool"], 

405 work_pool: "ORMWorkPool", 

406) -> Event: 

407 assert work_pool.status 1ab

408 

409 return Event( 1ab

410 id=event_id, 

411 occurred=occurred, 

412 event=f"prefect.work-pool.{work_pool.status.in_kebab_case()}", 

413 resource={ 

414 "prefect.resource.id": f"prefect.work-pool.{work_pool.id}", 

415 "prefect.resource.name": work_pool.name, 

416 "prefect.work-pool.type": work_pool.type, 

417 }, 

418 follows=_get_recent_preceding_work_pool_event_id(pre_update_work_pool), 

419 ) 

420 

421 

422def _get_recent_preceding_work_pool_event_id( 1c

423 work_pool: Optional["ORMWorkPool"], 

424) -> Optional[UUID]: 

425 """ 

426 Returns the preceding event ID if the work pool transitioned status 

427 recently to help ensure correct event ordering. 

428 """ 

429 if not work_pool: 1ab

430 return None 1a

431 

432 time_since_last_event = timedelta(hours=24) 1ab

433 if work_pool.last_transitioned_status_at: 1ab

434 time_since_last_event = now("UTC") - work_pool.last_transitioned_status_at 

435 

436 return ( 1ab

437 work_pool.last_status_event_id 

438 if time_since_last_event < timedelta(minutes=10) 

439 else None 

440 )