Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/automations.py: 31%
93 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from typing import Optional, Sequence 1a
2from uuid import UUID 1a
4from fastapi import Body, Depends, HTTPException, Path 1a
5from fastapi.exceptions import RequestValidationError 1a
6from pydantic import ValidationError 1a
8from prefect._internal.compatibility.starlette import status 1a
9from prefect.server.api.dependencies import LimitBody 1a
10from prefect.server.api.validation import ( 1a
11 validate_job_variables_for_run_deployment_action,
12)
13from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
14from prefect.server.events import actions 1a
15from prefect.server.events.filters import AutomationFilter, AutomationFilterCreated 1a
16from prefect.server.events.models import automations as automations_models 1a
17from prefect.server.events.schemas.automations import ( 1a
18 Automation,
19 AutomationCreate,
20 AutomationPartialUpdate,
21 AutomationSort,
22 AutomationUpdate,
23)
24from prefect.server.exceptions import ObjectNotFoundError 1a
25from prefect.server.utilities.server import PrefectRouter 1a
26from prefect.types._datetime import now 1a
27from prefect.utilities.schema_tools.validation import ( 1a
28 ValidationError as JSONSchemaValidationError,
29)
31router: PrefectRouter = PrefectRouter( 1a
32 prefix="/automations",
33 tags=["Automations"],
34 dependencies=[],
35)
38@router.post("/", status_code=status.HTTP_201_CREATED) 1a
39async def create_automation( 1a
40 automation: AutomationCreate,
41 db: PrefectDBInterface = Depends(provide_database_interface),
42) -> Automation:
43 """
44 Create an automation.
46 For more information, see https://docs.prefect.io/v3/concepts/automations.
47 """
48 # reset any client-provided IDs on the provided triggers
49 automation.trigger.reset_ids()
51 errors = []
52 for action in automation.actions:
53 if (
54 isinstance(action, actions.RunDeployment)
55 and action.deployment_id is not None
56 and action.job_variables is not None
57 and action.job_variables != {}
58 ):
59 async with db.session_context() as session:
60 try:
61 await validate_job_variables_for_run_deployment_action(
62 session, action
63 )
64 except JSONSchemaValidationError as exc:
65 errors.append(str(exc))
67 if errors:
68 raise HTTPException(
69 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
70 detail=f"Error creating automation: {' '.join(errors)}",
71 )
73 automation_dict = automation.model_dump()
74 owner_resource = automation_dict.pop("owner_resource", None)
76 async with db.session_context(begin_transaction=True) as session:
77 created_automation = await automations_models.create_automation(
78 session=session,
79 automation=Automation(
80 **automation_dict,
81 ),
82 )
84 if owner_resource:
85 await automations_models.relate_automation_to_resource(
86 session,
87 automation_id=created_automation.id,
88 resource_id=owner_resource,
89 owned_by_resource=True,
90 )
92 return created_automation
95@router.put("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a
96async def update_automation( 1a
97 automation: AutomationUpdate,
98 automation_id: UUID = Path(..., alias="id"),
99 db: PrefectDBInterface = Depends(provide_database_interface),
100) -> None:
101 # reset any client-provided IDs on the provided triggers
102 automation.trigger.reset_ids()
104 errors = []
105 for action in automation.actions:
106 if (
107 isinstance(action, actions.RunDeployment)
108 and action.deployment_id is not None
109 and action.job_variables is not None
110 ):
111 async with db.session_context() as session:
112 try:
113 await validate_job_variables_for_run_deployment_action(
114 session, action
115 )
116 except JSONSchemaValidationError as exc:
117 errors.append(str(exc))
118 if errors:
119 raise HTTPException(
120 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
121 detail=f"Error creating automation: {' '.join(errors)}",
122 )
124 async with db.session_context(begin_transaction=True) as session:
125 updated = await automations_models.update_automation(
126 session=session,
127 automation_update=automation,
128 automation_id=automation_id,
129 )
131 if not updated:
132 raise ObjectNotFoundError("Automation not found")
135@router.patch( 1a
136 "/{id:uuid}",
137 status_code=status.HTTP_204_NO_CONTENT,
138)
139async def patch_automation( 1a
140 automation: AutomationPartialUpdate,
141 automation_id: UUID = Path(..., alias="id"),
142 db: PrefectDBInterface = Depends(provide_database_interface),
143) -> None:
144 try:
145 async with db.session_context(begin_transaction=True) as session:
146 updated = await automations_models.update_automation(
147 session=session,
148 automation_update=automation,
149 automation_id=automation_id,
150 )
151 except ValidationError as e:
152 raise RequestValidationError(
153 errors=e.errors(),
154 body=automation.model_dump(mode="json"),
155 )
157 if not updated:
158 raise ObjectNotFoundError("Automation not found")
161@router.delete( 1a
162 "/{id:uuid}",
163 status_code=status.HTTP_204_NO_CONTENT,
164)
165async def delete_automation( 1a
166 automation_id: UUID = Path(..., alias="id"),
167 db: PrefectDBInterface = Depends(provide_database_interface),
168) -> None:
169 async with db.session_context(begin_transaction=True) as session:
170 deleted = await automations_models.delete_automation(
171 session=session,
172 automation_id=automation_id,
173 )
175 if not deleted:
176 raise ObjectNotFoundError("Automation not found")
179@router.post("/filter") 1a
180async def read_automations( 1a
181 sort: AutomationSort = Body(AutomationSort.NAME_ASC),
182 limit: int = LimitBody(),
183 offset: int = Body(0, ge=0),
184 automations: Optional[AutomationFilter] = None,
185 db: PrefectDBInterface = Depends(provide_database_interface),
186) -> Sequence[Automation]:
187 async with db.session_context() as session:
188 return await automations_models.read_automations_for_workspace(
189 session=session,
190 sort=sort,
191 limit=limit,
192 offset=offset,
193 automation_filter=automations,
194 )
197@router.post("/count") 1a
198async def count_automations( 1a
199 db: PrefectDBInterface = Depends(provide_database_interface),
200) -> int:
201 async with db.session_context() as session:
202 return await automations_models.count_automations_for_workspace(session=session)
205@router.get("/{id:uuid}") 1a
206async def read_automation( 1a
207 automation_id: UUID = Path(..., alias="id"),
208 db: PrefectDBInterface = Depends(provide_database_interface),
209) -> Automation:
210 async with db.session_context() as session:
211 automation = await automations_models.read_automation(
212 session=session,
213 automation_id=automation_id,
214 )
216 if not automation:
217 raise ObjectNotFoundError("Automation not found")
218 return automation
221@router.get("/related-to/{resource_id:str}") 1a
222async def read_automations_related_to_resource( 1a
223 resource_id: str = Path(..., alias="resource_id"),
224 db: PrefectDBInterface = Depends(provide_database_interface),
225) -> Sequence[Automation]:
226 async with db.session_context() as session:
227 return await automations_models.read_automations_related_to_resource(
228 session=session,
229 resource_id=resource_id,
230 )
233@router.delete("/owned-by/{resource_id:str}", status_code=status.HTTP_202_ACCEPTED) 1a
234async def delete_automations_owned_by_resource( 1a
235 resource_id: str = Path(..., alias="resource_id"),
236 db: PrefectDBInterface = Depends(provide_database_interface),
237) -> None:
238 async with db.session_context(begin_transaction=True) as session:
239 await automations_models.delete_automations_owned_by_resource(
240 session,
241 resource_id=resource_id,
242 automation_filter=AutomationFilter(
243 created=AutomationFilterCreated(before_=now("UTC"))
244 ),
245 )