Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flows/client.py: 25%
75 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3from typing import TYPE_CHECKING, Any 1a
5from httpx import HTTPStatusError, RequestError 1a
7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a
8from prefect.exceptions import ObjectNotFound 1a
10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a
11 from uuid import UUID
13 from prefect.client.schemas.filters import (
14 DeploymentFilter,
15 FlowFilter,
16 FlowRunFilter,
17 TaskRunFilter,
18 WorkPoolFilter,
19 WorkQueueFilter,
20 )
21 from prefect.client.schemas.objects import (
22 Flow,
23 )
24 from prefect.client.schemas.sorting import FlowSort
25 from prefect.flows import Flow as FlowObject
28class FlowClient(BaseClient): 1a
29 def create_flow(self, flow: "FlowObject[Any, Any]") -> "UUID": 1a
30 """
31 Create a flow in the Prefect API.
33 Args:
34 flow: a `Flow` object
36 Raises:
37 httpx.RequestError: if a flow was not created for any reason
39 Returns:
40 the ID of the flow in the backend
41 """
42 return self.create_flow_from_name(flow.name)
44 def create_flow_from_name(self, flow_name: str) -> "UUID": 1a
45 """
46 Create a flow in the Prefect API.
48 Args:
49 flow_name: the name of the new flow
51 Raises:
52 httpx.RequestError: if a flow was not created for any reason
54 Returns:
55 the ID of the flow in the backend
56 """
57 from prefect.client.schemas.actions import FlowCreate
59 flow_data = FlowCreate(name=flow_name)
60 response = self.request(
61 "POST", "/flows/", json=flow_data.model_dump(mode="json")
62 )
64 flow_id = response.json().get("id")
65 if not flow_id:
66 raise RequestError(f"Malformed response: {response}")
68 # Return the id of the created flow
69 from uuid import UUID
71 return UUID(flow_id)
73 def read_flow(self, flow_id: "UUID") -> "Flow": 1a
74 """
75 Query the Prefect API for a flow by id.
77 Args:
78 flow_id: the flow ID of interest
80 Returns:
81 a Flow model representation of the flow
82 """
83 response = self.request("GET", "/flows/{id}", path_params={"id": flow_id})
84 from prefect.client.schemas.objects import Flow
86 return Flow.model_validate(response.json())
88 def delete_flow(self, flow_id: "UUID") -> None: 1a
89 """
90 Delete a flow by UUID.
92 Args:
93 flow_id: ID of the flow to be deleted
94 Raises:
95 prefect.exceptions.ObjectNotFound: If request returns 404
96 httpx.RequestError: If requests fail
97 """
98 try:
99 self.request("DELETE", "/flows/{id}", path_params={"id": flow_id})
100 except HTTPStatusError as e:
101 if e.response.status_code == 404:
102 raise ObjectNotFound(http_exc=e) from e
103 else:
104 raise
106 def read_flows( 1a
107 self,
108 *,
109 flow_filter: "FlowFilter | None" = None,
110 flow_run_filter: "FlowRunFilter | None" = None,
111 task_run_filter: "TaskRunFilter | None" = None,
112 deployment_filter: "DeploymentFilter | None" = None,
113 work_pool_filter: "WorkPoolFilter | None" = None,
114 work_queue_filter: "WorkQueueFilter | None" = None,
115 sort: "FlowSort | None" = None,
116 limit: int | None = None,
117 offset: int = 0,
118 ) -> list["Flow"]:
119 """
120 Query the Prefect API for flows. Only flows matching all criteria will
121 be returned.
123 Args:
124 flow_filter: filter criteria for flows
125 flow_run_filter: filter criteria for flow runs
126 task_run_filter: filter criteria for task runs
127 deployment_filter: filter criteria for deployments
128 work_pool_filter: filter criteria for work pools
129 work_queue_filter: filter criteria for work pool queues
130 sort: sort criteria for the flows
131 limit: limit for the flow query
132 offset: offset for the flow query
134 Returns:
135 a list of Flow model representations of the flows
136 """
137 body: dict[str, Any] = {
138 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
139 "flow_runs": (
140 flow_run_filter.model_dump(mode="json", exclude_unset=True)
141 if flow_run_filter
142 else None
143 ),
144 "task_runs": (
145 task_run_filter.model_dump(mode="json") if task_run_filter else None
146 ),
147 "deployments": (
148 deployment_filter.model_dump(mode="json") if deployment_filter else None
149 ),
150 "work_pools": (
151 work_pool_filter.model_dump(mode="json") if work_pool_filter else None
152 ),
153 "work_queues": (
154 work_queue_filter.model_dump(mode="json") if work_queue_filter else None
155 ),
156 "sort": sort,
157 "limit": limit,
158 "offset": offset,
159 }
161 response = self.request("POST", "/flows/filter", json=body)
162 from prefect.client.schemas.objects import Flow
164 return Flow.model_validate_list(response.json())
166 def read_flow_by_name( 1a
167 self,
168 flow_name: str,
169 ) -> "Flow":
170 """
171 Query the Prefect API for a flow by name.
173 Args:
174 flow_name: the name of a flow
176 Returns:
177 a fully hydrated Flow model
178 """
179 response = self.request(
180 "GET", "/flows/name/{name}", path_params={"name": flow_name}
181 )
182 from prefect.client.schemas.objects import Flow
184 return Flow.model_validate(response.json())
187class FlowAsyncClient(BaseAsyncClient): 1a
188 async def create_flow(self, flow: "FlowObject[Any, Any]") -> "UUID": 1a
189 """
190 Create a flow in the Prefect API.
192 Args:
193 flow: a `Flow` object
195 Raises:
196 httpx.RequestError: if a flow was not created for any reason
198 Returns:
199 the ID of the flow in the backend
200 """
201 return await self.create_flow_from_name(flow.name)
203 async def create_flow_from_name(self, flow_name: str) -> "UUID": 1a
204 """
205 Create a flow in the Prefect API.
207 Args:
208 flow_name: the name of the new flow
210 Raises:
211 httpx.RequestError: if a flow was not created for any reason
213 Returns:
214 the ID of the flow in the backend
215 """
216 from prefect.client.schemas.actions import FlowCreate
218 flow_data = FlowCreate(name=flow_name)
219 response = await self.request(
220 "POST", "/flows/", json=flow_data.model_dump(mode="json")
221 )
223 flow_id = response.json().get("id")
224 if not flow_id:
225 raise RequestError(f"Malformed response: {response}")
227 # Return the id of the created flow
228 from uuid import UUID
230 return UUID(flow_id)
232 async def read_flow(self, flow_id: "UUID") -> "Flow": 1a
233 """
234 Query the Prefect API for a flow by id.
236 Args:
237 flow_id: the flow ID of interest
239 Returns:
240 a Flow model representation of the flow
241 """
242 response = await self.request("GET", "/flows/{id}", path_params={"id": flow_id})
243 from prefect.client.schemas.objects import Flow
245 return Flow.model_validate(response.json())
247 async def delete_flow(self, flow_id: "UUID") -> None: 1a
248 """
249 Delete a flow by UUID.
251 Args:
252 flow_id: ID of the flow to be deleted
253 Raises:
254 prefect.exceptions.ObjectNotFound: If request returns 404
255 httpx.RequestError: If requests fail
256 """
257 try:
258 await self.request("DELETE", "/flows/{id}", path_params={"id": flow_id})
259 except HTTPStatusError as e:
260 if e.response.status_code == 404:
261 raise ObjectNotFound(http_exc=e) from e
262 else:
263 raise
265 async def read_flows( 1a
266 self,
267 *,
268 flow_filter: "FlowFilter | None" = None,
269 flow_run_filter: "FlowRunFilter | None" = None,
270 task_run_filter: "TaskRunFilter | None" = None,
271 deployment_filter: "DeploymentFilter | None" = None,
272 work_pool_filter: "WorkPoolFilter | None" = None,
273 work_queue_filter: "WorkQueueFilter | None" = None,
274 sort: "FlowSort | None" = None,
275 limit: int | None = None,
276 offset: int = 0,
277 ) -> list["Flow"]:
278 """
279 Query the Prefect API for flows. Only flows matching all criteria will
280 be returned.
282 Args:
283 flow_filter: filter criteria for flows
284 flow_run_filter: filter criteria for flow runs
285 task_run_filter: filter criteria for task runs
286 deployment_filter: filter criteria for deployments
287 work_pool_filter: filter criteria for work pools
288 work_queue_filter: filter criteria for work pool queues
289 sort: sort criteria for the flows
290 limit: limit for the flow query
291 offset: offset for the flow query
293 Returns:
294 a list of Flow model representations of the flows
295 """
296 body: dict[str, Any] = {
297 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
298 "flow_runs": (
299 flow_run_filter.model_dump(mode="json", exclude_unset=True)
300 if flow_run_filter
301 else None
302 ),
303 "task_runs": (
304 task_run_filter.model_dump(mode="json") if task_run_filter else None
305 ),
306 "deployments": (
307 deployment_filter.model_dump(mode="json") if deployment_filter else None
308 ),
309 "work_pools": (
310 work_pool_filter.model_dump(mode="json") if work_pool_filter else None
311 ),
312 "work_queues": (
313 work_queue_filter.model_dump(mode="json") if work_queue_filter else None
314 ),
315 "sort": sort,
316 "limit": limit,
317 "offset": offset,
318 }
320 response = await self.request("POST", "/flows/filter", json=body)
321 from prefect.client.schemas.objects import Flow
323 return Flow.model_validate_list(response.json())
325 async def read_flow_by_name( 1a
326 self,
327 flow_name: str,
328 ) -> "Flow":
329 """
330 Query the Prefect API for a flow by name.
332 Args:
333 flow_name: the name of a flow
335 Returns:
336 a fully hydrated Flow model
337 """
338 response = await self.request(
339 "GET", "/flows/name/{name}", path_params={"name": flow_name}
340 )
341 from prefect.client.schemas.objects import Flow
343 return Flow.model_validate(response.json())