Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/models/automations.py: 25%

166 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import logging 1a

2from contextlib import asynccontextmanager 1a

3from typing import AsyncGenerator, Optional, Sequence, Union 1a

4from uuid import UUID 1a

5 

6import orjson 1a

7import sqlalchemy as sa 1a

8from sqlalchemy.ext.asyncio import AsyncSession 1a

9from typing_extensions import Literal, TypeAlias 1a

10 

11from prefect.logging import get_logger 1a

12from prefect.server.database import PrefectDBInterface, db_injector 1a

13from prefect.server.events import filters 1a

14from prefect.server.events.schemas.automations import ( 1a

15 Automation, 

16 AutomationPartialUpdate, 

17 AutomationSort, 

18 AutomationUpdate, 

19) 

20from prefect.server.utilities.database import get_dialect 1a

21from prefect.types._datetime import now 1a

22from prefect.utilities.asyncutils import run_coro_as_sync 1a

23 

24logger: logging.Logger = get_logger(__name__) 1a

25 

26AutomationChangeEvent: TypeAlias = Literal[ 1a

27 "automation__created", "automation__updated", "automation__deleted" 

28] 

29AUTOMATION_CHANGES_CHANNEL = "prefect_automation_changes" 1a

30 

31 

32@asynccontextmanager 1a

33@db_injector 1a

34async def automations_session( 1a

35 db: PrefectDBInterface, begin_transaction: bool = False 

36) -> AsyncGenerator[AsyncSession, None]: 

37 async with db.session_context(begin_transaction=begin_transaction) as session: 1bc

38 yield session 1bc

39 

40 

41@db_injector 1a

42async def read_automations_for_workspace( 1a

43 db: PrefectDBInterface, 

44 session: AsyncSession, 

45 sort: AutomationSort = AutomationSort.NAME_ASC, 

46 limit: Optional[int] = None, 

47 offset: Optional[int] = None, 

48 automation_filter: Optional[filters.AutomationFilter] = None, 

49) -> Sequence[Automation]: 

50 query = sa.select(db.Automation) 

51 

52 query = query.order_by(db.Automation.sort_expression(sort)) 

53 

54 if automation_filter: 

55 query = query.where(automation_filter.as_sql_filter()) 

56 if limit is not None: 

57 query = query.limit(limit) 

58 if offset is not None: 

59 query = query.offset(offset) 

60 

61 result = await session.execute(query) 

62 

63 return [ 

64 Automation.model_validate(a, from_attributes=True) 

65 for a in result.scalars().all() 

66 ] 

67 

68 

69@db_injector 1a

70async def count_automations_for_workspace( 1a

71 db: PrefectDBInterface, 

72 session: AsyncSession, 

73) -> int: 

74 query = sa.select(sa.func.count(None)).select_from(db.Automation) 

75 

76 result = await session.execute(query) 

77 

78 return result.scalar() or 0 

79 

80 

81@db_injector 1a

82async def read_automation( 1a

83 db: PrefectDBInterface, 

84 session: AsyncSession, 

85 automation_id: UUID, 

86) -> Optional[Automation]: 

87 automation = await session.scalar( 

88 sa.select(db.Automation).where(db.Automation.id == automation_id) 

89 ) 

90 if not automation: 

91 return None 

92 return Automation.model_validate(automation, from_attributes=True) 

93 

94 

95@db_injector 1a

96async def read_automation_by_id( 1a

97 db: PrefectDBInterface, session: AsyncSession, automation_id: UUID 

98) -> Optional[Automation]: 

99 automation = await session.scalar( 

100 sa.select(db.Automation).where( 

101 db.Automation.id == automation_id, 

102 ) 

103 ) 

104 if not automation: 

105 return None 

106 return Automation.model_validate(automation, from_attributes=True) 

107 

108 

109async def _notify(session: AsyncSession, automation: Automation, event: str): 1a

110 from prefect.server.events.triggers import automation_changed 

111 

112 event_key: AutomationChangeEvent 

113 if event == "created": 

114 event_key = "automation__created" 

115 elif event == "updated": 

116 event_key = "automation__updated" 

117 elif event == "deleted": 

118 event_key = "automation__deleted" 

119 else: 

120 logger.error( 

121 f"Unknown event type '{event}' in _notify for automation {automation.id}" 

122 ) 

123 return 

124 

125 # Handle cache updates based on database type 

126 sync_session = session.sync_session 

127 dialect_name = get_dialect(sync_session).name 

128 

129 if dialect_name == "postgresql": 

130 # For PostgreSQL, only send NOTIFY - the listener will update the cache 

131 try: 

132 payload_json = ( 

133 orjson.dumps( 

134 { 

135 "automation_id": str(automation.id), 

136 "event_type": event, 

137 } 

138 ) 

139 .decode() 

140 .replace("'", "''") 

141 ) 

142 await session.execute( 

143 sa.text(f"NOTIFY {AUTOMATION_CHANGES_CHANNEL}, '{payload_json}'") 

144 ) 

145 

146 logger.debug( 

147 f"Sent Postgres NOTIFY on channel '{AUTOMATION_CHANGES_CHANNEL}' for automation {automation.id}, event: {event}" 

148 ) 

149 except Exception as e: 

150 logger.error( 

151 f"Failed to send Postgres NOTIFY for automation {automation.id} on channel {AUTOMATION_CHANGES_CHANNEL}: {e}", 

152 exc_info=True, 

153 ) 

154 else: 

155 # For SQLite, we need to update the cache after commit 

156 @sa.event.listens_for(sync_session, "after_commit", once=True) 

157 def update_cache_after_commit(session): 

158 try: 

159 run_coro_as_sync(automation_changed(automation.id, event_key)) 

160 except Exception as e: 

161 logger.error( 

162 f"Failed to update in-memory cache for automation {automation.id}, event: {event}: {e}", 

163 exc_info=True, 

164 ) 

165 

166 

167@db_injector 1a

168async def create_automation( 1a

169 db: PrefectDBInterface, session: AsyncSession, automation: Automation 

170) -> Automation: 

171 new_automation = db.Automation(**automation.model_dump()) 

172 session.add(new_automation) 

173 await session.flush() 

174 automation = Automation.model_validate(new_automation, from_attributes=True) 

175 

176 await _sync_automation_related_resources(session, new_automation.id, automation) 

177 

178 await _notify(session, automation, "created") 

179 return automation 

180 

181 

182@db_injector 1a

183async def update_automation( 1a

184 db: PrefectDBInterface, 

185 session: AsyncSession, 

186 automation_update: Union[AutomationUpdate, AutomationPartialUpdate], 

187 automation_id: UUID, 

188) -> bool: 

189 if not isinstance(automation_update, (AutomationUpdate, AutomationPartialUpdate)): 

190 raise TypeError( 

191 "automation_update must be an AutomationUpdate or AutomationPartialUpdate, " 

192 f"not {type(automation_update)}" 

193 ) 

194 

195 automation = await read_automation(session, automation_id) 

196 if not automation: 

197 return False 

198 

199 if isinstance(automation_update, AutomationPartialUpdate): 

200 # Partial updates won't go through the full Automation/AutomationCore 

201 # validation, which could change due to one of these updates. Here we attempt 

202 # to apply and parse the final effect of the partial update to the existing 

203 # automation to see if anything fails validation. 

204 Automation.model_validate( 

205 { 

206 **automation.model_dump(mode="json"), 

207 **automation_update.model_dump(mode="json"), 

208 } 

209 ) 

210 

211 result = await session.execute( 

212 sa.update(db.Automation) 

213 .where(db.Automation.id == automation_id) 

214 .values(**automation_update.model_dump_for_orm(exclude_unset=True)) 

215 ) 

216 

217 if isinstance(automation_update, AutomationUpdate): 

218 await _sync_automation_related_resources( 

219 session, automation_id, automation_update 

220 ) 

221 

222 await _notify(session, automation, "updated") 

223 return result.rowcount > 0 # type: ignore 

224 

225 

226@db_injector 1a

227async def delete_automation( 1a

228 db: PrefectDBInterface, 

229 session: AsyncSession, 

230 automation_id: UUID, 

231) -> bool: 

232 automation = await read_automation(session, automation_id) 

233 if not automation: 

234 return False 

235 

236 # Delete child tables in a consistent order to prevent deadlocks 

237 # when multiple automations are deleted concurrently 

238 await session.execute( 

239 sa.delete(db.AutomationBucket).where( 

240 db.AutomationBucket.automation_id == automation_id, 

241 ) 

242 ) 

243 await session.execute( 

244 sa.delete(db.AutomationRelatedResource).where( 

245 db.AutomationRelatedResource.automation_id == automation_id, 

246 ) 

247 ) 

248 await session.execute( 

249 sa.delete(db.CompositeTriggerChildFiring).where( 

250 db.CompositeTriggerChildFiring.automation_id == automation_id, 

251 ) 

252 ) 

253 

254 # Now delete the parent automation 

255 await session.execute( 

256 sa.delete(db.Automation).where( 

257 db.Automation.id == automation_id, 

258 ) 

259 ) 

260 await _sync_automation_related_resources(session, automation_id, None) 

261 

262 await _notify(session, automation, "deleted") 

263 return True 

264 

265 

266@db_injector 1a

267async def delete_automations_for_workspace( 1a

268 db: PrefectDBInterface, 

269 session: AsyncSession, 

270) -> bool: 

271 automations = await read_automations_for_workspace( 

272 session, 

273 ) 

274 

275 # Delete child tables in a consistent order to prevent deadlocks 

276 # when multiple workspace deletions occur concurrently 

277 await session.execute(sa.delete(db.AutomationBucket)) 

278 await session.execute(sa.delete(db.AutomationRelatedResource)) 

279 await session.execute(sa.delete(db.CompositeTriggerChildFiring)) 

280 

281 # Now delete all automations 

282 result = await session.execute(sa.delete(db.Automation)) 

283 for automation in automations: 

284 await _notify(session, automation, "deleted") 

285 return result.rowcount > 0 

286 

287 

288@db_injector 1a

289async def disable_automations_for_workspace( 1a

290 db: PrefectDBInterface, 

291 session: AsyncSession, 

292) -> bool: 

293 automations = await read_automations_for_workspace(session) 

294 result = await session.execute(sa.update(db.Automation).values(enabled=False)) 

295 for automation in automations: 

296 await _notify(session, automation, "updated") 

297 return result.rowcount > 0 

298 

299 

300@db_injector 1a

301async def disable_automation( 1a

302 db: PrefectDBInterface, session: AsyncSession, automation_id: UUID 

303) -> bool: 

304 automation = await read_automation_by_id( 

305 session=session, 

306 automation_id=automation_id, 

307 ) 

308 if not automation: 

309 raise ValueError(f"Automation with ID {automation_id} not found") 

310 

311 result = await session.execute( 

312 sa.update(db.Automation) 

313 .where(db.Automation.id == automation_id) 

314 .values(enabled=False) 

315 ) 

316 await _notify(session, automation, "updated") 

317 return result.rowcount > 0 

318 

319 

320@db_injector 1a

321async def _sync_automation_related_resources( 1a

322 db: PrefectDBInterface, 

323 session: AsyncSession, 

324 automation_id: UUID, 

325 automation: Optional[Union[Automation, AutomationUpdate]], 

326): 

327 """Actively maintains the set of related resources for an automation""" 

328 from prefect.server.events import actions 

329 

330 await session.execute( 

331 sa.delete(db.AutomationRelatedResource).where( 

332 db.AutomationRelatedResource.automation_id == automation_id, 

333 db.AutomationRelatedResource.resource_id.like("prefect.deployment.%"), 

334 db.AutomationRelatedResource.automation_owned_by_resource.is_(False), 

335 ), 

336 execution_options={"synchronize_session": False}, 

337 ) 

338 

339 if not automation: 

340 return 

341 

342 deployment_ids = set( 

343 action.deployment_id 

344 for action in automation.actions 

345 if isinstance(action, actions.RunDeployment) and action.source == "selected" 

346 ) 

347 for deployment_id in deployment_ids: 

348 await relate_automation_to_resource( 

349 session, automation_id, f"prefect.deployment.{deployment_id}", False 

350 ) 

351 

352 

353@db_injector 1a

354async def relate_automation_to_resource( 1a

355 db: PrefectDBInterface, 

356 session: AsyncSession, 

357 automation_id: UUID, 

358 resource_id: str, 

359 owned_by_resource: bool, 

360) -> None: 

361 await session.execute( 

362 db.queries.insert(db.AutomationRelatedResource) 

363 .values( 

364 automation_id=automation_id, 

365 resource_id=resource_id, 

366 automation_owned_by_resource=owned_by_resource, 

367 ) 

368 .on_conflict_do_update( 

369 index_elements=[ 

370 db.AutomationRelatedResource.automation_id, 

371 db.AutomationRelatedResource.resource_id, 

372 ], 

373 set_=dict( 

374 automation_owned_by_resource=sa.or_( 

375 db.AutomationRelatedResource.automation_owned_by_resource, 

376 sa.true() if owned_by_resource else sa.false(), 

377 ), 

378 updated=now("UTC"), 

379 ), 

380 ) 

381 ) 

382 

383 

384@db_injector 1a

385async def read_automations_related_to_resource( 1a

386 db: PrefectDBInterface, 

387 session: AsyncSession, 

388 resource_id: str, 

389 owned_by_resource: Optional[bool] = None, 

390 automation_filter: Optional[filters.AutomationFilter] = None, 

391) -> Sequence[Automation]: 

392 query = ( 

393 sa.select(db.Automation) 

394 .join(db.Automation.related_resources) 

395 .where( 

396 db.AutomationRelatedResource.resource_id == resource_id, 

397 ) 

398 ) 

399 if owned_by_resource is not None: 

400 query = query.where( 

401 db.AutomationRelatedResource.automation_owned_by_resource 

402 == owned_by_resource 

403 ) 

404 

405 if automation_filter: 

406 query = query.where(automation_filter.as_sql_filter()) 

407 

408 result = await session.execute(query) 

409 return [ 

410 Automation.model_validate(a, from_attributes=True) 

411 for a in result.scalars().all() 

412 ] 

413 

414 

415@db_injector 1a

416async def delete_automations_owned_by_resource( 1a

417 db: PrefectDBInterface, 

418 session: AsyncSession, 

419 resource_id: str, 

420 automation_filter: Optional[filters.AutomationFilter] = None, 

421) -> Sequence[UUID]: 

422 automations = await read_automations_related_to_resource( 

423 session=session, 

424 resource_id=resource_id, 

425 owned_by_resource=True, 

426 automation_filter=automation_filter, 

427 ) 

428 

429 automation_ids = [automation.id for automation in automations] 

430 

431 await session.execute( 

432 sa.delete(db.Automation).where(db.Automation.id.in_(automation_ids)) 

433 ) 

434 

435 for automation in automations: 

436 await _notify(session, automation, "deleted") 

437 

438 return automation_ids