Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/models/automations.py: 25%
166 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import logging 1a
2from contextlib import asynccontextmanager 1a
3from typing import AsyncGenerator, Optional, Sequence, Union 1a
4from uuid import UUID 1a
6import orjson 1a
7import sqlalchemy as sa 1a
8from sqlalchemy.ext.asyncio import AsyncSession 1a
9from typing_extensions import Literal, TypeAlias 1a
11from prefect.logging import get_logger 1a
12from prefect.server.database import PrefectDBInterface, db_injector 1a
13from prefect.server.events import filters 1a
14from prefect.server.events.schemas.automations import ( 1a
15 Automation,
16 AutomationPartialUpdate,
17 AutomationSort,
18 AutomationUpdate,
19)
20from prefect.server.utilities.database import get_dialect 1a
21from prefect.types._datetime import now 1a
22from prefect.utilities.asyncutils import run_coro_as_sync 1a
24logger: logging.Logger = get_logger(__name__) 1a
26AutomationChangeEvent: TypeAlias = Literal[ 1a
27 "automation__created", "automation__updated", "automation__deleted"
28]
29AUTOMATION_CHANGES_CHANNEL = "prefect_automation_changes" 1a
32@asynccontextmanager 1a
33@db_injector 1a
34async def automations_session( 1a
35 db: PrefectDBInterface, begin_transaction: bool = False
36) -> AsyncGenerator[AsyncSession, None]:
37 async with db.session_context(begin_transaction=begin_transaction) as session: 1bc
38 yield session 1bc
41@db_injector 1a
42async def read_automations_for_workspace( 1a
43 db: PrefectDBInterface,
44 session: AsyncSession,
45 sort: AutomationSort = AutomationSort.NAME_ASC,
46 limit: Optional[int] = None,
47 offset: Optional[int] = None,
48 automation_filter: Optional[filters.AutomationFilter] = None,
49) -> Sequence[Automation]:
50 query = sa.select(db.Automation)
52 query = query.order_by(db.Automation.sort_expression(sort))
54 if automation_filter:
55 query = query.where(automation_filter.as_sql_filter())
56 if limit is not None:
57 query = query.limit(limit)
58 if offset is not None:
59 query = query.offset(offset)
61 result = await session.execute(query)
63 return [
64 Automation.model_validate(a, from_attributes=True)
65 for a in result.scalars().all()
66 ]
69@db_injector 1a
70async def count_automations_for_workspace( 1a
71 db: PrefectDBInterface,
72 session: AsyncSession,
73) -> int:
74 query = sa.select(sa.func.count(None)).select_from(db.Automation)
76 result = await session.execute(query)
78 return result.scalar() or 0
81@db_injector 1a
82async def read_automation( 1a
83 db: PrefectDBInterface,
84 session: AsyncSession,
85 automation_id: UUID,
86) -> Optional[Automation]:
87 automation = await session.scalar(
88 sa.select(db.Automation).where(db.Automation.id == automation_id)
89 )
90 if not automation:
91 return None
92 return Automation.model_validate(automation, from_attributes=True)
95@db_injector 1a
96async def read_automation_by_id( 1a
97 db: PrefectDBInterface, session: AsyncSession, automation_id: UUID
98) -> Optional[Automation]:
99 automation = await session.scalar(
100 sa.select(db.Automation).where(
101 db.Automation.id == automation_id,
102 )
103 )
104 if not automation:
105 return None
106 return Automation.model_validate(automation, from_attributes=True)
109async def _notify(session: AsyncSession, automation: Automation, event: str): 1a
110 from prefect.server.events.triggers import automation_changed
112 event_key: AutomationChangeEvent
113 if event == "created":
114 event_key = "automation__created"
115 elif event == "updated":
116 event_key = "automation__updated"
117 elif event == "deleted":
118 event_key = "automation__deleted"
119 else:
120 logger.error(
121 f"Unknown event type '{event}' in _notify for automation {automation.id}"
122 )
123 return
125 # Handle cache updates based on database type
126 sync_session = session.sync_session
127 dialect_name = get_dialect(sync_session).name
129 if dialect_name == "postgresql":
130 # For PostgreSQL, only send NOTIFY - the listener will update the cache
131 try:
132 payload_json = (
133 orjson.dumps(
134 {
135 "automation_id": str(automation.id),
136 "event_type": event,
137 }
138 )
139 .decode()
140 .replace("'", "''")
141 )
142 await session.execute(
143 sa.text(f"NOTIFY {AUTOMATION_CHANGES_CHANNEL}, '{payload_json}'")
144 )
146 logger.debug(
147 f"Sent Postgres NOTIFY on channel '{AUTOMATION_CHANGES_CHANNEL}' for automation {automation.id}, event: {event}"
148 )
149 except Exception as e:
150 logger.error(
151 f"Failed to send Postgres NOTIFY for automation {automation.id} on channel {AUTOMATION_CHANGES_CHANNEL}: {e}",
152 exc_info=True,
153 )
154 else:
155 # For SQLite, we need to update the cache after commit
156 @sa.event.listens_for(sync_session, "after_commit", once=True)
157 def update_cache_after_commit(session):
158 try:
159 run_coro_as_sync(automation_changed(automation.id, event_key))
160 except Exception as e:
161 logger.error(
162 f"Failed to update in-memory cache for automation {automation.id}, event: {event}: {e}",
163 exc_info=True,
164 )
167@db_injector 1a
168async def create_automation( 1a
169 db: PrefectDBInterface, session: AsyncSession, automation: Automation
170) -> Automation:
171 new_automation = db.Automation(**automation.model_dump())
172 session.add(new_automation)
173 await session.flush()
174 automation = Automation.model_validate(new_automation, from_attributes=True)
176 await _sync_automation_related_resources(session, new_automation.id, automation)
178 await _notify(session, automation, "created")
179 return automation
182@db_injector 1a
183async def update_automation( 1a
184 db: PrefectDBInterface,
185 session: AsyncSession,
186 automation_update: Union[AutomationUpdate, AutomationPartialUpdate],
187 automation_id: UUID,
188) -> bool:
189 if not isinstance(automation_update, (AutomationUpdate, AutomationPartialUpdate)):
190 raise TypeError(
191 "automation_update must be an AutomationUpdate or AutomationPartialUpdate, "
192 f"not {type(automation_update)}"
193 )
195 automation = await read_automation(session, automation_id)
196 if not automation:
197 return False
199 if isinstance(automation_update, AutomationPartialUpdate):
200 # Partial updates won't go through the full Automation/AutomationCore
201 # validation, which could change due to one of these updates. Here we attempt
202 # to apply and parse the final effect of the partial update to the existing
203 # automation to see if anything fails validation.
204 Automation.model_validate(
205 {
206 **automation.model_dump(mode="json"),
207 **automation_update.model_dump(mode="json"),
208 }
209 )
211 result = await session.execute(
212 sa.update(db.Automation)
213 .where(db.Automation.id == automation_id)
214 .values(**automation_update.model_dump_for_orm(exclude_unset=True))
215 )
217 if isinstance(automation_update, AutomationUpdate):
218 await _sync_automation_related_resources(
219 session, automation_id, automation_update
220 )
222 await _notify(session, automation, "updated")
223 return result.rowcount > 0 # type: ignore
226@db_injector 1a
227async def delete_automation( 1a
228 db: PrefectDBInterface,
229 session: AsyncSession,
230 automation_id: UUID,
231) -> bool:
232 automation = await read_automation(session, automation_id)
233 if not automation:
234 return False
236 # Delete child tables in a consistent order to prevent deadlocks
237 # when multiple automations are deleted concurrently
238 await session.execute(
239 sa.delete(db.AutomationBucket).where(
240 db.AutomationBucket.automation_id == automation_id,
241 )
242 )
243 await session.execute(
244 sa.delete(db.AutomationRelatedResource).where(
245 db.AutomationRelatedResource.automation_id == automation_id,
246 )
247 )
248 await session.execute(
249 sa.delete(db.CompositeTriggerChildFiring).where(
250 db.CompositeTriggerChildFiring.automation_id == automation_id,
251 )
252 )
254 # Now delete the parent automation
255 await session.execute(
256 sa.delete(db.Automation).where(
257 db.Automation.id == automation_id,
258 )
259 )
260 await _sync_automation_related_resources(session, automation_id, None)
262 await _notify(session, automation, "deleted")
263 return True
266@db_injector 1a
267async def delete_automations_for_workspace( 1a
268 db: PrefectDBInterface,
269 session: AsyncSession,
270) -> bool:
271 automations = await read_automations_for_workspace(
272 session,
273 )
275 # Delete child tables in a consistent order to prevent deadlocks
276 # when multiple workspace deletions occur concurrently
277 await session.execute(sa.delete(db.AutomationBucket))
278 await session.execute(sa.delete(db.AutomationRelatedResource))
279 await session.execute(sa.delete(db.CompositeTriggerChildFiring))
281 # Now delete all automations
282 result = await session.execute(sa.delete(db.Automation))
283 for automation in automations:
284 await _notify(session, automation, "deleted")
285 return result.rowcount > 0
288@db_injector 1a
289async def disable_automations_for_workspace( 1a
290 db: PrefectDBInterface,
291 session: AsyncSession,
292) -> bool:
293 automations = await read_automations_for_workspace(session)
294 result = await session.execute(sa.update(db.Automation).values(enabled=False))
295 for automation in automations:
296 await _notify(session, automation, "updated")
297 return result.rowcount > 0
300@db_injector 1a
301async def disable_automation( 1a
302 db: PrefectDBInterface, session: AsyncSession, automation_id: UUID
303) -> bool:
304 automation = await read_automation_by_id(
305 session=session,
306 automation_id=automation_id,
307 )
308 if not automation:
309 raise ValueError(f"Automation with ID {automation_id} not found")
311 result = await session.execute(
312 sa.update(db.Automation)
313 .where(db.Automation.id == automation_id)
314 .values(enabled=False)
315 )
316 await _notify(session, automation, "updated")
317 return result.rowcount > 0
320@db_injector 1a
321async def _sync_automation_related_resources( 1a
322 db: PrefectDBInterface,
323 session: AsyncSession,
324 automation_id: UUID,
325 automation: Optional[Union[Automation, AutomationUpdate]],
326):
327 """Actively maintains the set of related resources for an automation"""
328 from prefect.server.events import actions
330 await session.execute(
331 sa.delete(db.AutomationRelatedResource).where(
332 db.AutomationRelatedResource.automation_id == automation_id,
333 db.AutomationRelatedResource.resource_id.like("prefect.deployment.%"),
334 db.AutomationRelatedResource.automation_owned_by_resource.is_(False),
335 ),
336 execution_options={"synchronize_session": False},
337 )
339 if not automation:
340 return
342 deployment_ids = set(
343 action.deployment_id
344 for action in automation.actions
345 if isinstance(action, actions.RunDeployment) and action.source == "selected"
346 )
347 for deployment_id in deployment_ids:
348 await relate_automation_to_resource(
349 session, automation_id, f"prefect.deployment.{deployment_id}", False
350 )
353@db_injector 1a
354async def relate_automation_to_resource( 1a
355 db: PrefectDBInterface,
356 session: AsyncSession,
357 automation_id: UUID,
358 resource_id: str,
359 owned_by_resource: bool,
360) -> None:
361 await session.execute(
362 db.queries.insert(db.AutomationRelatedResource)
363 .values(
364 automation_id=automation_id,
365 resource_id=resource_id,
366 automation_owned_by_resource=owned_by_resource,
367 )
368 .on_conflict_do_update(
369 index_elements=[
370 db.AutomationRelatedResource.automation_id,
371 db.AutomationRelatedResource.resource_id,
372 ],
373 set_=dict(
374 automation_owned_by_resource=sa.or_(
375 db.AutomationRelatedResource.automation_owned_by_resource,
376 sa.true() if owned_by_resource else sa.false(),
377 ),
378 updated=now("UTC"),
379 ),
380 )
381 )
384@db_injector 1a
385async def read_automations_related_to_resource( 1a
386 db: PrefectDBInterface,
387 session: AsyncSession,
388 resource_id: str,
389 owned_by_resource: Optional[bool] = None,
390 automation_filter: Optional[filters.AutomationFilter] = None,
391) -> Sequence[Automation]:
392 query = (
393 sa.select(db.Automation)
394 .join(db.Automation.related_resources)
395 .where(
396 db.AutomationRelatedResource.resource_id == resource_id,
397 )
398 )
399 if owned_by_resource is not None:
400 query = query.where(
401 db.AutomationRelatedResource.automation_owned_by_resource
402 == owned_by_resource
403 )
405 if automation_filter:
406 query = query.where(automation_filter.as_sql_filter())
408 result = await session.execute(query)
409 return [
410 Automation.model_validate(a, from_attributes=True)
411 for a in result.scalars().all()
412 ]
415@db_injector 1a
416async def delete_automations_owned_by_resource( 1a
417 db: PrefectDBInterface,
418 session: AsyncSession,
419 resource_id: str,
420 automation_filter: Optional[filters.AutomationFilter] = None,
421) -> Sequence[UUID]:
422 automations = await read_automations_related_to_resource(
423 session=session,
424 resource_id=resource_id,
425 owned_by_resource=True,
426 automation_filter=automation_filter,
427 )
429 automation_ids = [automation.id for automation in automations]
431 await session.execute(
432 sa.delete(db.Automation).where(db.Automation.id.in_(automation_ids))
433 )
435 for automation in automations:
436 await _notify(session, automation, "deleted")
438 return automation_ids