Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/models/automations.py: 59%
166 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import logging 1c
2from contextlib import asynccontextmanager 1c
3from typing import AsyncGenerator, Optional, Sequence, Union 1c
4from uuid import UUID 1c
6import orjson 1c
7import sqlalchemy as sa 1c
8from sqlalchemy.ext.asyncio import AsyncSession 1c
9from typing_extensions import Literal, TypeAlias 1c
11from prefect.logging import get_logger 1c
12from prefect.server.database import PrefectDBInterface, db_injector 1c
13from prefect.server.events import filters 1c
14from prefect.server.events.schemas.automations import ( 1c
15 Automation,
16 AutomationPartialUpdate,
17 AutomationSort,
18 AutomationUpdate,
19)
20from prefect.server.utilities.database import get_dialect 1c
21from prefect.types._datetime import now 1c
22from prefect.utilities.asyncutils import run_coro_as_sync 1c
24logger: logging.Logger = get_logger(__name__) 1c
26AutomationChangeEvent: TypeAlias = Literal[ 1c
27 "automation__created", "automation__updated", "automation__deleted"
28]
29AUTOMATION_CHANGES_CHANNEL = "prefect_automation_changes" 1c
32@asynccontextmanager 1c
33@db_injector 1c
34async def automations_session( 1c
35 db: PrefectDBInterface, begin_transaction: bool = False
36) -> AsyncGenerator[AsyncSession, None]:
37 async with db.session_context(begin_transaction=begin_transaction) as session: 1fgebhdija
38 yield session 1fgebhdija
41@db_injector 1c
42async def read_automations_for_workspace( 1c
43 db: PrefectDBInterface,
44 session: AsyncSession,
45 sort: AutomationSort = AutomationSort.NAME_ASC,
46 limit: Optional[int] = None,
47 offset: Optional[int] = None,
48 automation_filter: Optional[filters.AutomationFilter] = None,
49) -> Sequence[Automation]:
50 query = sa.select(db.Automation) 1bda
52 query = query.order_by(db.Automation.sort_expression(sort)) 1bda
54 if automation_filter: 1bda
55 query = query.where(automation_filter.as_sql_filter())
56 if limit is not None: 56 ↛ 58line 56 didn't jump to line 58 because the condition on line 56 was always true1bda
57 query = query.limit(limit) 1bda
58 if offset is not None: 58 ↛ 61line 58 didn't jump to line 61 because the condition on line 58 was always true1bda
59 query = query.offset(offset) 1bda
61 result = await session.execute(query) 1bda
63 return [
64 Automation.model_validate(a, from_attributes=True)
65 for a in result.scalars().all()
66 ]
69@db_injector 1c
70async def count_automations_for_workspace( 1c
71 db: PrefectDBInterface,
72 session: AsyncSession,
73) -> int:
74 query = sa.select(sa.func.count(None)).select_from(db.Automation) 1b
76 result = await session.execute(query) 1b
78 return result.scalar() or 0
81@db_injector 1c
82async def read_automation( 1c
83 db: PrefectDBInterface,
84 session: AsyncSession,
85 automation_id: UUID,
86) -> Optional[Automation]:
87 automation = await session.scalar( 1ebda
88 sa.select(db.Automation).where(db.Automation.id == automation_id)
89 )
90 if not automation: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true1bda
91 return None
92 return Automation.model_validate(automation, from_attributes=True) 1bda
95@db_injector 1c
96async def read_automation_by_id( 1c
97 db: PrefectDBInterface, session: AsyncSession, automation_id: UUID
98) -> Optional[Automation]:
99 automation = await session.scalar(
100 sa.select(db.Automation).where(
101 db.Automation.id == automation_id,
102 )
103 )
104 if not automation:
105 return None
106 return Automation.model_validate(automation, from_attributes=True)
109async def _notify(session: AsyncSession, automation: Automation, event: str): 1c
110 from prefect.server.events.triggers import automation_changed 1bda
112 event_key: AutomationChangeEvent
113 if event == "created": 1bda
114 event_key = "automation__created" 1bda
115 elif event == "updated": 1ba
116 event_key = "automation__updated"
117 elif event == "deleted": 117 ↛ 120line 117 didn't jump to line 120 because the condition on line 117 was always true1ba
118 event_key = "automation__deleted" 1ba
119 else:
120 logger.error(
121 f"Unknown event type '{event}' in _notify for automation {automation.id}"
122 )
123 return
125 # Handle cache updates based on database type
126 sync_session = session.sync_session 1bda
127 dialect_name = get_dialect(sync_session).name 1bda
129 if dialect_name == "postgresql": 129 ↛ 131line 129 didn't jump to line 131 because the condition on line 129 was never true1bda
130 # For PostgreSQL, only send NOTIFY - the listener will update the cache
131 try:
132 payload_json = (
133 orjson.dumps(
134 {
135 "automation_id": str(automation.id),
136 "event_type": event,
137 }
138 )
139 .decode()
140 .replace("'", "''")
141 )
142 await session.execute(
143 sa.text(f"NOTIFY {AUTOMATION_CHANGES_CHANNEL}, '{payload_json}'")
144 )
146 logger.debug(
147 f"Sent Postgres NOTIFY on channel '{AUTOMATION_CHANGES_CHANNEL}' for automation {automation.id}, event: {event}"
148 )
149 except Exception as e:
150 logger.error(
151 f"Failed to send Postgres NOTIFY for automation {automation.id} on channel {AUTOMATION_CHANGES_CHANNEL}: {e}",
152 exc_info=True,
153 )
154 else:
155 # For SQLite, we need to update the cache after commit
156 @sa.event.listens_for(sync_session, "after_commit", once=True) 1bda
157 def update_cache_after_commit(session): 1bda
158 try: 1bda
159 run_coro_as_sync(automation_changed(automation.id, event_key)) 1bda
160 except Exception as e:
161 logger.error(
162 f"Failed to update in-memory cache for automation {automation.id}, event: {event}: {e}",
163 exc_info=True,
164 )
167@db_injector 1c
168async def create_automation( 1c
169 db: PrefectDBInterface, session: AsyncSession, automation: Automation
170) -> Automation:
171 new_automation = db.Automation(**automation.model_dump()) 1bda
172 session.add(new_automation) 1bda
173 await session.flush() 1bda
174 automation = Automation.model_validate(new_automation, from_attributes=True)
176 await _sync_automation_related_resources(session, new_automation.id, automation) 1bda
178 await _notify(session, automation, "created")
179 return automation
182@db_injector 1c
183async def update_automation( 1c
184 db: PrefectDBInterface,
185 session: AsyncSession,
186 automation_update: Union[AutomationUpdate, AutomationPartialUpdate],
187 automation_id: UUID,
188) -> bool:
189 if not isinstance(automation_update, (AutomationUpdate, AutomationPartialUpdate)): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true1ba
190 raise TypeError(
191 "automation_update must be an AutomationUpdate or AutomationPartialUpdate, "
192 f"not {type(automation_update)}"
193 )
195 automation = await read_automation(session, automation_id) 1ba
196 if not automation:
197 return False
199 if isinstance(automation_update, AutomationPartialUpdate):
200 # Partial updates won't go through the full Automation/AutomationCore
201 # validation, which could change due to one of these updates. Here we attempt
202 # to apply and parse the final effect of the partial update to the existing
203 # automation to see if anything fails validation.
204 Automation.model_validate(
205 {
206 **automation.model_dump(mode="json"),
207 **automation_update.model_dump(mode="json"),
208 }
209 )
211 result = await session.execute(
212 sa.update(db.Automation)
213 .where(db.Automation.id == automation_id)
214 .values(**automation_update.model_dump_for_orm(exclude_unset=True))
215 )
217 if isinstance(automation_update, AutomationUpdate):
218 await _sync_automation_related_resources(
219 session, automation_id, automation_update
220 )
222 await _notify(session, automation, "updated")
223 return result.rowcount > 0 # type: ignore
226@db_injector 1c
227async def delete_automation( 1c
228 db: PrefectDBInterface,
229 session: AsyncSession,
230 automation_id: UUID,
231) -> bool:
232 automation = await read_automation(session, automation_id) 1ba
233 if not automation:
234 return False
236 # Delete child tables in a consistent order to prevent deadlocks
237 # when multiple automations are deleted concurrently
238 await session.execute( 1ba
239 sa.delete(db.AutomationBucket).where(
240 db.AutomationBucket.automation_id == automation_id,
241 )
242 )
243 await session.execute( 1ba
244 sa.delete(db.AutomationRelatedResource).where(
245 db.AutomationRelatedResource.automation_id == automation_id,
246 )
247 )
248 await session.execute( 1ba
249 sa.delete(db.CompositeTriggerChildFiring).where(
250 db.CompositeTriggerChildFiring.automation_id == automation_id,
251 )
252 )
254 # Now delete the parent automation
255 await session.execute( 1ba
256 sa.delete(db.Automation).where(
257 db.Automation.id == automation_id,
258 )
259 )
260 await _sync_automation_related_resources(session, automation_id, None) 1ba
262 await _notify(session, automation, "deleted")
263 return True
266@db_injector 1c
267async def delete_automations_for_workspace( 1c
268 db: PrefectDBInterface,
269 session: AsyncSession,
270) -> bool:
271 automations = await read_automations_for_workspace(
272 session,
273 )
275 # Delete child tables in a consistent order to prevent deadlocks
276 # when multiple workspace deletions occur concurrently
277 await session.execute(sa.delete(db.AutomationBucket))
278 await session.execute(sa.delete(db.AutomationRelatedResource))
279 await session.execute(sa.delete(db.CompositeTriggerChildFiring))
281 # Now delete all automations
282 result = await session.execute(sa.delete(db.Automation))
283 for automation in automations:
284 await _notify(session, automation, "deleted")
285 return result.rowcount > 0
288@db_injector 1c
289async def disable_automations_for_workspace( 1c
290 db: PrefectDBInterface,
291 session: AsyncSession,
292) -> bool:
293 automations = await read_automations_for_workspace(session)
294 result = await session.execute(sa.update(db.Automation).values(enabled=False))
295 for automation in automations:
296 await _notify(session, automation, "updated")
297 return result.rowcount > 0
300@db_injector 1c
301async def disable_automation( 1c
302 db: PrefectDBInterface, session: AsyncSession, automation_id: UUID
303) -> bool:
304 automation = await read_automation_by_id(
305 session=session,
306 automation_id=automation_id,
307 )
308 if not automation:
309 raise ValueError(f"Automation with ID {automation_id} not found")
311 result = await session.execute(
312 sa.update(db.Automation)
313 .where(db.Automation.id == automation_id)
314 .values(enabled=False)
315 )
316 await _notify(session, automation, "updated")
317 return result.rowcount > 0
320@db_injector 1c
321async def _sync_automation_related_resources( 1c
322 db: PrefectDBInterface,
323 session: AsyncSession,
324 automation_id: UUID,
325 automation: Optional[Union[Automation, AutomationUpdate]],
326):
327 """Actively maintains the set of related resources for an automation"""
328 from prefect.server.events import actions 1bda
330 await session.execute( 1bda
331 sa.delete(db.AutomationRelatedResource).where(
332 db.AutomationRelatedResource.automation_id == automation_id,
333 db.AutomationRelatedResource.resource_id.like("prefect.deployment.%"),
334 db.AutomationRelatedResource.automation_owned_by_resource.is_(False),
335 ),
336 execution_options={"synchronize_session": False},
337 )
339 if not automation:
340 return
342 deployment_ids = set( 1bda
343 action.deployment_id
344 for action in automation.actions
345 if isinstance(action, actions.RunDeployment) and action.source == "selected"
346 )
347 for deployment_id in deployment_ids:
348 await relate_automation_to_resource( 1ba
349 session, automation_id, f"prefect.deployment.{deployment_id}", False
350 )
353@db_injector 1c
354async def relate_automation_to_resource( 1c
355 db: PrefectDBInterface,
356 session: AsyncSession,
357 automation_id: UUID,
358 resource_id: str,
359 owned_by_resource: bool,
360) -> None:
361 await session.execute( 1ba
362 db.queries.insert(db.AutomationRelatedResource)
363 .values(
364 automation_id=automation_id,
365 resource_id=resource_id,
366 automation_owned_by_resource=owned_by_resource,
367 )
368 .on_conflict_do_update(
369 index_elements=[
370 db.AutomationRelatedResource.automation_id,
371 db.AutomationRelatedResource.resource_id,
372 ],
373 set_=dict(
374 automation_owned_by_resource=sa.or_(
375 db.AutomationRelatedResource.automation_owned_by_resource,
376 sa.true() if owned_by_resource else sa.false(),
377 ),
378 updated=now("UTC"),
379 ),
380 )
381 )
384@db_injector 1c
385async def read_automations_related_to_resource( 1c
386 db: PrefectDBInterface,
387 session: AsyncSession,
388 resource_id: str,
389 owned_by_resource: Optional[bool] = None,
390 automation_filter: Optional[filters.AutomationFilter] = None,
391) -> Sequence[Automation]:
392 query = ( 1ba
393 sa.select(db.Automation)
394 .join(db.Automation.related_resources)
395 .where(
396 db.AutomationRelatedResource.resource_id == resource_id,
397 )
398 )
399 if owned_by_resource is not None: 1ba
400 query = query.where( 1ba
401 db.AutomationRelatedResource.automation_owned_by_resource
402 == owned_by_resource
403 )
405 if automation_filter: 1ba
406 query = query.where(automation_filter.as_sql_filter()) 1ba
408 result = await session.execute(query) 1ba
409 return [
410 Automation.model_validate(a, from_attributes=True)
411 for a in result.scalars().all()
412 ]
415@db_injector 1c
416async def delete_automations_owned_by_resource( 1c
417 db: PrefectDBInterface,
418 session: AsyncSession,
419 resource_id: str,
420 automation_filter: Optional[filters.AutomationFilter] = None,
421) -> Sequence[UUID]:
422 automations = await read_automations_related_to_resource( 1ba
423 session=session,
424 resource_id=resource_id,
425 owned_by_resource=True,
426 automation_filter=automation_filter,
427 )
429 automation_ids = [automation.id for automation in automations]
431 await session.execute( 1ba
432 sa.delete(db.Automation).where(db.Automation.id.in_(automation_ids))
433 )
435 for automation in automations:
436 await _notify(session, automation, "deleted")
438 return automation_ids