Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/automations.py: 82%
93 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from typing import Optional, Sequence 1b
2from uuid import UUID 1b
4from fastapi import Body, Depends, HTTPException, Path 1b
5from fastapi.exceptions import RequestValidationError 1b
6from pydantic import ValidationError 1b
8from prefect._internal.compatibility.starlette import status 1b
9from prefect.server.api.dependencies import LimitBody 1b
10from prefect.server.api.validation import ( 1b
11 validate_job_variables_for_run_deployment_action,
12)
13from prefect.server.database import PrefectDBInterface, provide_database_interface 1b
14from prefect.server.events import actions 1b
15from prefect.server.events.filters import AutomationFilter, AutomationFilterCreated 1b
16from prefect.server.events.models import automations as automations_models 1b
17from prefect.server.events.schemas.automations import ( 1b
18 Automation,
19 AutomationCreate,
20 AutomationPartialUpdate,
21 AutomationSort,
22 AutomationUpdate,
23)
24from prefect.server.exceptions import ObjectNotFoundError 1b
25from prefect.server.utilities.server import PrefectRouter 1b
26from prefect.types._datetime import now 1b
27from prefect.utilities.schema_tools.validation import ( 1b
28 ValidationError as JSONSchemaValidationError,
29)
31router: PrefectRouter = PrefectRouter( 1b
32 prefix="/automations",
33 tags=["Automations"],
34 dependencies=[],
35)
38@router.post("/", status_code=status.HTTP_201_CREATED) 1b
39async def create_automation( 1b
40 automation: AutomationCreate,
41 db: PrefectDBInterface = Depends(provide_database_interface),
42) -> Automation:
43 """
44 Create an automation.
46 For more information, see https://docs.prefect.io/v3/concepts/automations.
47 """
48 # reset any client-provided IDs on the provided triggers
49 automation.trigger.reset_ids() 1adc
51 errors = [] 1adc
52 for action in automation.actions: 1adc
53 if ( 53 ↛ 59line 53 didn't jump to line 59 because the condition on line 53 was never true
54 isinstance(action, actions.RunDeployment)
55 and action.deployment_id is not None
56 and action.job_variables is not None
57 and action.job_variables != {}
58 ):
59 async with db.session_context() as session:
60 try:
61 await validate_job_variables_for_run_deployment_action(
62 session, action
63 )
64 except JSONSchemaValidationError as exc:
65 errors.append(str(exc))
67 if errors: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true1adc
68 raise HTTPException(
69 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
70 detail=f"Error creating automation: {' '.join(errors)}",
71 )
73 automation_dict = automation.model_dump() 1adc
74 owner_resource = automation_dict.pop("owner_resource", None) 1adc
76 async with db.session_context(begin_transaction=True) as session: 1adc
77 created_automation = await automations_models.create_automation( 1adc
78 session=session,
79 automation=Automation(
80 **automation_dict,
81 ),
82 )
84 if owner_resource:
85 await automations_models.relate_automation_to_resource(
86 session,
87 automation_id=created_automation.id,
88 resource_id=owner_resource,
89 owned_by_resource=True,
90 )
92 return created_automation 1adc
95@router.put("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1b
96async def update_automation( 1b
97 automation: AutomationUpdate,
98 automation_id: UUID = Path(..., alias="id"),
99 db: PrefectDBInterface = Depends(provide_database_interface),
100) -> None:
101 # reset any client-provided IDs on the provided triggers
102 automation.trigger.reset_ids() 1ac
104 errors = [] 1ac
105 for action in automation.actions: 1ac
106 if ( 106 ↛ 111line 106 didn't jump to line 111 because the condition on line 106 was never true
107 isinstance(action, actions.RunDeployment)
108 and action.deployment_id is not None
109 and action.job_variables is not None
110 ):
111 async with db.session_context() as session:
112 try:
113 await validate_job_variables_for_run_deployment_action(
114 session, action
115 )
116 except JSONSchemaValidationError as exc:
117 errors.append(str(exc))
118 if errors: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true1ac
119 raise HTTPException(
120 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
121 detail=f"Error creating automation: {' '.join(errors)}",
122 )
124 async with db.session_context(begin_transaction=True) as session: 1ac
125 updated = await automations_models.update_automation( 1ac
126 session=session,
127 automation_update=automation,
128 automation_id=automation_id,
129 )
131 if not updated: 1ac
132 raise ObjectNotFoundError("Automation not found") 1a
135@router.patch( 1b
136 "/{id:uuid}",
137 status_code=status.HTTP_204_NO_CONTENT,
138)
139async def patch_automation( 1b
140 automation: AutomationPartialUpdate,
141 automation_id: UUID = Path(..., alias="id"),
142 db: PrefectDBInterface = Depends(provide_database_interface),
143) -> None:
144 try: 1ac
145 async with db.session_context(begin_transaction=True) as session: 1ac
146 updated = await automations_models.update_automation( 1ac
147 session=session,
148 automation_update=automation,
149 automation_id=automation_id,
150 )
151 except ValidationError as e:
152 raise RequestValidationError(
153 errors=e.errors(),
154 body=automation.model_dump(mode="json"),
155 )
157 if not updated: 1ac
158 raise ObjectNotFoundError("Automation not found") 1a
161@router.delete( 1b
162 "/{id:uuid}",
163 status_code=status.HTTP_204_NO_CONTENT,
164)
165async def delete_automation( 1b
166 automation_id: UUID = Path(..., alias="id"),
167 db: PrefectDBInterface = Depends(provide_database_interface),
168) -> None:
169 async with db.session_context(begin_transaction=True) as session: 1ac
170 deleted = await automations_models.delete_automation( 1ac
171 session=session,
172 automation_id=automation_id,
173 )
175 if not deleted: 1ac
176 raise ObjectNotFoundError("Automation not found") 1a
179@router.post("/filter") 1b
180async def read_automations( 1b
181 sort: AutomationSort = Body(AutomationSort.NAME_ASC),
182 limit: int = LimitBody(),
183 offset: int = Body(0, ge=0),
184 automations: Optional[AutomationFilter] = None,
185 db: PrefectDBInterface = Depends(provide_database_interface),
186) -> Sequence[Automation]:
187 async with db.session_context() as session: 1adc
188 return await automations_models.read_automations_for_workspace( 1adc
189 session=session,
190 sort=sort,
191 limit=limit,
192 offset=offset,
193 automation_filter=automations,
194 )
197@router.post("/count") 1b
198async def count_automations( 1b
199 db: PrefectDBInterface = Depends(provide_database_interface),
200) -> int:
201 async with db.session_context() as session: 1a
202 return await automations_models.count_automations_for_workspace(session=session) 1a
205@router.get("/{id:uuid}") 1b
206async def read_automation( 1b
207 automation_id: UUID = Path(..., alias="id"),
208 db: PrefectDBInterface = Depends(provide_database_interface),
209) -> Automation:
210 async with db.session_context() as session: 1ac
211 automation = await automations_models.read_automation( 1ac
212 session=session,
213 automation_id=automation_id,
214 )
216 if not automation: 1ac
217 raise ObjectNotFoundError("Automation not found") 1a
218 return automation 1ac
221@router.get("/related-to/{resource_id:str}") 1b
222async def read_automations_related_to_resource( 1b
223 resource_id: str = Path(..., alias="resource_id"),
224 db: PrefectDBInterface = Depends(provide_database_interface),
225) -> Sequence[Automation]:
226 async with db.session_context() as session: 1ac
227 return await automations_models.read_automations_related_to_resource( 1ac
228 session=session,
229 resource_id=resource_id,
230 )
233@router.delete("/owned-by/{resource_id:str}", status_code=status.HTTP_202_ACCEPTED) 1b
234async def delete_automations_owned_by_resource( 1b
235 resource_id: str = Path(..., alias="resource_id"),
236 db: PrefectDBInterface = Depends(provide_database_interface),
237) -> None:
238 async with db.session_context(begin_transaction=True) as session: 1ac
239 await automations_models.delete_automations_owned_by_resource( 1ac
240 session,
241 resource_id=resource_id,
242 automation_filter=AutomationFilter(
243 created=AutomationFilterCreated(before_=now("UTC"))
244 ),
245 )