Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_automations/client.py: 16%
150 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3from typing import TYPE_CHECKING 1a
5from httpx import HTTPStatusError 1a
7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a
8from prefect.exceptions import ObjectNotFound 1a
10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a
11 from uuid import UUID
13 from prefect.events.schemas.automations import Automation, AutomationCore
16class AutomationClient(BaseClient): 1a
17 def create_automation(self, automation: "AutomationCore") -> "UUID": 1a
18 """Creates an automation in Prefect Cloud."""
19 response = self.request(
20 "POST",
21 "/automations/",
22 json=automation.model_dump(mode="json"),
23 )
24 from uuid import UUID
26 return UUID(response.json()["id"])
28 def update_automation( 1a
29 self, automation_id: "UUID", automation: "AutomationCore"
30 ) -> None:
31 """Updates an automation in Prefect Cloud."""
32 response = self.request(
33 "PUT",
34 "/automations/{id}",
35 path_params={"id": automation_id},
36 json=automation.model_dump(mode="json", exclude_unset=True),
37 )
38 response.raise_for_status()
40 def read_automations(self) -> list["Automation"]: 1a
41 response = self.request("POST", "/automations/filter")
42 response.raise_for_status()
43 from prefect.events.schemas.automations import Automation
45 return Automation.model_validate_list(response.json())
47 def find_automation(self, id_or_name: "str | UUID") -> "Automation | None": 1a
48 from uuid import UUID
50 if isinstance(id_or_name, str):
51 name = id_or_name
52 try:
53 id = UUID(id_or_name)
54 except ValueError:
55 id = None
56 else:
57 id = id_or_name
58 name = str(id)
60 if id:
61 try:
62 automation = self.read_automation(id)
63 return automation
64 except HTTPStatusError as e:
65 if e.response.status_code == 404:
66 raise ObjectNotFound(http_exc=e) from e
68 automations = self.read_automations()
70 # Look for it by an exact name
71 for automation in automations:
72 if automation.name == name:
73 return automation
75 # Look for it by a case-insensitive name
76 for automation in automations:
77 if automation.name.lower() == name.lower():
78 return automation
80 return None
82 def read_automation(self, automation_id: "UUID | str") -> "Automation | None": 1a
83 response = self.request(
84 "GET", "/automations/{id}", path_params={"id": automation_id}
85 )
86 if response.status_code == 404:
87 return None
88 response.raise_for_status()
89 from prefect.events.schemas.automations import Automation
91 return Automation.model_validate(response.json())
93 def read_automations_by_name(self, name: str) -> list["Automation"]: 1a
94 """
95 Query the Prefect API for an automation by name. Only automations matching the provided name will be returned.
97 Args:
98 name: the name of the automation to query
100 Returns:
101 a list of Automation model representations of the automations
102 """
103 from prefect.client.schemas.sorting import AutomationSort
104 from prefect.events.filters import (
105 AutomationFilter,
106 AutomationFilterName,
107 )
109 automation_filter = AutomationFilter(name=AutomationFilterName(any_=[name]))
111 response = self.request(
112 "POST",
113 "/automations/filter",
114 json={
115 "sort": AutomationSort.UPDATED_DESC,
116 "automations": automation_filter.model_dump(mode="json")
117 if automation_filter
118 else None,
119 },
120 )
122 response.raise_for_status()
123 from prefect.events.schemas.automations import Automation
125 return Automation.model_validate_list(response.json())
127 def pause_automation(self, automation_id: "UUID") -> None: 1a
128 response = self.request(
129 "PATCH",
130 "/automations/{id}",
131 path_params={"id": automation_id},
132 json={"enabled": False},
133 )
134 response.raise_for_status()
136 def resume_automation(self, automation_id: "UUID") -> None: 1a
137 response = self.request(
138 "PATCH",
139 "/automations/{id}",
140 path_params={"id": automation_id},
141 json={"enabled": True},
142 )
143 response.raise_for_status()
145 def delete_automation(self, automation_id: "UUID") -> None: 1a
146 response = self.request(
147 "DELETE",
148 "/automations/{id}",
149 path_params={"id": automation_id},
150 )
151 if response.status_code == 404:
152 return
154 response.raise_for_status()
156 def read_resource_related_automations(self, resource_id: str) -> list["Automation"]: 1a
157 response = self.request(
158 "GET",
159 "/automations/related-to/{resource_id}",
160 path_params={"resource_id": resource_id},
161 )
162 response.raise_for_status()
163 from prefect.events.schemas.automations import Automation
165 return Automation.model_validate_list(response.json())
167 def delete_resource_owned_automations(self, resource_id: str) -> None: 1a
168 self.request(
169 "DELETE",
170 "/automations/owned-by/{resource_id}",
171 path_params={"resource_id": resource_id},
172 )
175class AutomationAsyncClient(BaseAsyncClient): 1a
176 async def create_automation(self, automation: "AutomationCore") -> "UUID": 1a
177 """Creates an automation in Prefect Cloud."""
178 response = await self.request(
179 "POST",
180 "/automations/",
181 json=automation.model_dump(mode="json"),
182 )
183 from uuid import UUID
185 return UUID(response.json()["id"])
187 async def update_automation( 1a
188 self, automation_id: "UUID", automation: "AutomationCore"
189 ) -> None:
190 """Updates an automation in Prefect Cloud."""
191 response = await self.request(
192 "PUT",
193 "/automations/{id}",
194 path_params={"id": automation_id},
195 json=automation.model_dump(mode="json", exclude_unset=True),
196 )
197 response.raise_for_status()
199 async def read_automations(self) -> list["Automation"]: 1a
200 response = await self.request("POST", "/automations/filter")
201 response.raise_for_status()
202 from prefect.events.schemas.automations import Automation
204 return Automation.model_validate_list(response.json())
206 async def find_automation(self, id_or_name: "str | UUID") -> "Automation | None": 1a
207 from uuid import UUID
209 if isinstance(id_or_name, str):
210 name = id_or_name
211 try:
212 id = UUID(id_or_name)
213 except ValueError:
214 id = None
215 else:
216 id = id_or_name
217 name = str(id)
219 if id:
220 try:
221 automation = await self.read_automation(id)
222 return automation
223 except HTTPStatusError as e:
224 if e.response.status_code == 404:
225 raise ObjectNotFound(http_exc=e) from e
227 automations = await self.read_automations()
229 # Look for it by an exact name
230 for automation in automations:
231 if automation.name == name:
232 return automation
234 # Look for it by a case-insensitive name
235 for automation in automations:
236 if automation.name.lower() == name.lower():
237 return automation
239 return None
241 async def read_automation(self, automation_id: "UUID | str") -> "Automation | None": 1a
242 response = await self.request(
243 "GET", "/automations/{id}", path_params={"id": automation_id}
244 )
245 if response.status_code == 404:
246 return None
247 response.raise_for_status()
248 from prefect.events.schemas.automations import Automation
250 return Automation.model_validate(response.json())
252 async def read_automations_by_name(self, name: str) -> list["Automation"]: 1a
253 """
254 Query the Prefect API for an automation by name. Only automations matching the provided name will be returned.
256 Args:
257 name: the name of the automation to query
259 Returns:
260 a list of Automation model representations of the automations
261 """
262 from prefect.client.schemas.sorting import AutomationSort
263 from prefect.events.filters import (
264 AutomationFilter,
265 AutomationFilterName,
266 )
268 automation_filter = AutomationFilter(name=AutomationFilterName(any_=[name]))
270 response = await self.request(
271 "POST",
272 "/automations/filter",
273 json={
274 "sort": AutomationSort.UPDATED_DESC,
275 "automations": automation_filter.model_dump(mode="json")
276 if automation_filter
277 else None,
278 },
279 )
281 response.raise_for_status()
282 from prefect.events.schemas.automations import Automation
284 return Automation.model_validate_list(response.json())
286 async def pause_automation(self, automation_id: "UUID") -> None: 1a
287 response = await self.request(
288 "PATCH",
289 "/automations/{id}",
290 path_params={"id": automation_id},
291 json={"enabled": False},
292 )
293 response.raise_for_status()
295 async def resume_automation(self, automation_id: "UUID") -> None: 1a
296 response = await self.request(
297 "PATCH",
298 "/automations/{id}",
299 path_params={"id": automation_id},
300 json={"enabled": True},
301 )
302 response.raise_for_status()
304 async def delete_automation(self, automation_id: "UUID") -> None: 1a
305 response = await self.request(
306 "DELETE",
307 "/automations/{id}",
308 path_params={"id": automation_id},
309 )
310 if response.status_code == 404:
311 return
313 response.raise_for_status()
315 async def read_resource_related_automations( 1a
316 self, resource_id: str
317 ) -> list["Automation"]:
318 response = await self.request(
319 "GET",
320 "/automations/related-to/{resource_id}",
321 path_params={"resource_id": resource_id},
322 )
323 response.raise_for_status()
324 from prefect.events.schemas.automations import Automation
326 return Automation.model_validate_list(response.json())
328 async def delete_resource_owned_automations(self, resource_id: str) -> None: 1a
329 await self.request(
330 "DELETE",
331 "/automations/owned-by/{resource_id}",
332 path_params={"resource_id": resource_id},
333 )