Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py: 14%
257 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3from collections.abc import Iterable 1a
4from typing import TYPE_CHECKING, Any 1a
6import httpx 1a
7from typing_extensions import TypeVar 1a
9from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a
10from prefect.exceptions import ObjectNotFound 1a
12T = TypeVar("T") 1a
13R = TypeVar("R", infer_variance=True) 1a
15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a
16 from uuid import UUID
18 from prefect.client.schemas import FlowRun, OrchestrationResult
19 from prefect.client.schemas.filters import (
20 DeploymentFilter,
21 FlowFilter,
22 FlowRunFilter,
23 TaskRunFilter,
24 WorkPoolFilter,
25 WorkQueueFilter,
26 )
27 from prefect.client.schemas.objects import (
28 FlowRunInput,
29 FlowRunPolicy,
30 )
31 from prefect.client.schemas.sorting import (
32 FlowRunSort,
33 )
34 from prefect.flows import Flow as FlowObject
35 from prefect.states import State
36 from prefect.types import KeyValueLabelsField
39class FlowRunClient(BaseClient): 1a
40 def create_flow_run( 1a
41 self,
42 flow: "FlowObject[Any, R]",
43 name: str | None = None,
44 parameters: dict[str, Any] | None = None,
45 context: dict[str, Any] | None = None,
46 tags: "Iterable[str] | None" = None,
47 parent_task_run_id: "UUID | None" = None,
48 state: "State[R] | None" = None,
49 work_pool_name: str | None = None,
50 work_queue_name: str | None = None,
51 job_variables: dict[str, Any] | None = None,
52 ) -> "FlowRun":
53 """
54 Create a flow run for a flow.
56 Args:
57 flow: The flow model to create the flow run for
58 name: An optional name for the flow run
59 parameters: Parameter overrides for this flow run.
60 context: Optional run context data
61 tags: a list of tags to apply to this flow run
62 parent_task_run_id: if a subflow run is being created, the placeholder task
63 run identifier in the parent flow
64 state: The initial state for the run. If not provided, defaults to
65 `Pending`.
66 work_pool_name: The name of the work pool to run the flow run in.
67 work_queue_name: The name of the work queue to place the flow run in.
68 job_variables: The job variables to use when setting up flow run infrastructure.
70 Raises:
71 httpx.RequestError: if the Prefect API does not successfully create a run for any reason
73 Returns:
74 The flow run model
75 """
76 from prefect.client.schemas.actions import FlowCreate, FlowRunCreate
77 from prefect.client.schemas.objects import Flow, FlowRun, FlowRunPolicy
78 from prefect.states import Pending, to_state_create
80 parameters = parameters or {}
81 context = context or {}
83 if state is None:
84 state = Pending()
86 # Retrieve the flow id
88 flow_data = FlowCreate(name=flow.name)
89 response = self.request(
90 "POST", "/flows/", json=flow_data.model_dump(mode="json")
91 )
92 flow_id = Flow.model_validate(response.json()).id
94 flow_run_create = FlowRunCreate(
95 flow_id=flow_id,
96 flow_version=flow.version,
97 name=name,
98 parameters=parameters,
99 context=context,
100 tags=list(tags or []),
101 parent_task_run_id=parent_task_run_id,
102 state=to_state_create(state),
103 empirical_policy=FlowRunPolicy(
104 retries=flow.retries,
105 retry_delay=int(flow.retry_delay_seconds or 0),
106 ),
107 )
109 if work_pool_name is not None:
110 flow_run_create.work_pool_name = work_pool_name
111 if work_queue_name is not None:
112 flow_run_create.work_queue_name = work_queue_name
113 if job_variables is not None:
114 flow_run_create.job_variables = job_variables
116 flow_run_create_json = flow_run_create.model_dump(
117 mode="json", exclude_unset=True
118 )
119 response = self.request("POST", "/flow_runs/", json=flow_run_create_json)
121 flow_run = FlowRun.model_validate(response.json())
123 # Restore the parameters to the local objects to retain expectations about
124 # Python objects
125 flow_run.parameters = parameters
127 return flow_run
129 def update_flow_run( 1a
130 self,
131 flow_run_id: "UUID",
132 flow_version: str | None = None,
133 parameters: dict[str, Any] | None = None,
134 name: str | None = None,
135 tags: "Iterable[str] | None" = None,
136 empirical_policy: "FlowRunPolicy | None" = None,
137 infrastructure_pid: str | None = None,
138 job_variables: dict[str, Any] | None = None,
139 ) -> httpx.Response:
140 """
141 Update a flow run's details.
143 Args:
144 flow_run_id: The identifier for the flow run to update.
145 flow_version: A new version string for the flow run.
146 parameters: A dictionary of parameter values for the flow run. This will not
147 be merged with any existing parameters.
148 name: A new name for the flow run.
149 empirical_policy: A new flow run orchestration policy. This will not be
150 merged with any existing policy.
151 tags: An iterable of new tags for the flow run. These will not be merged with
152 any existing tags.
153 infrastructure_pid: The id of flow run as returned by an
154 infrastructure block.
156 Returns:
157 an `httpx.Response` object from the PATCH request
158 """
159 params: dict[str, Any] = {}
160 if flow_version is not None:
161 params["flow_version"] = flow_version
162 if parameters is not None:
163 params["parameters"] = parameters
164 if name is not None:
165 params["name"] = name
166 if tags is not None:
167 params["tags"] = tags
168 if empirical_policy is not None:
169 params["empirical_policy"] = empirical_policy
170 if infrastructure_pid:
171 params["infrastructure_pid"] = infrastructure_pid
172 if job_variables is not None:
173 params["job_variables"] = job_variables
175 from prefect.client.schemas.actions import FlowRunUpdate
177 flow_run_data = FlowRunUpdate(**params)
179 return self.request(
180 "PATCH",
181 "/flow_runs/{id}",
182 path_params={"id": flow_run_id},
183 json=flow_run_data.model_dump(mode="json", exclude_unset=True),
184 )
186 def delete_flow_run( 1a
187 self,
188 flow_run_id: "UUID",
189 ) -> None:
190 """
191 Delete a flow run by UUID.
193 Args:
194 flow_run_id: The flow run UUID of interest.
195 Raises:
196 ObjectNotFound: If request returns 404
197 httpx.RequestError: If requests fails
198 """
199 try:
200 self.request("DELETE", "/flow_runs/{id}", path_params={"id": flow_run_id})
201 except httpx.HTTPStatusError as e:
202 if e.response.status_code == 404:
203 raise ObjectNotFound(http_exc=e) from e
204 else:
205 raise
207 def read_flow_run(self, flow_run_id: "UUID") -> "FlowRun": 1a
208 """
209 Query the Prefect API for a flow run by id.
211 Args:
212 flow_run_id: the flow run ID of interest
214 Returns:
215 a Flow Run model representation of the flow run
216 """
217 try:
218 response = self.request(
219 "GET", "/flow_runs/{id}", path_params={"id": flow_run_id}
220 )
221 except httpx.HTTPStatusError as e:
222 if e.response.status_code == 404:
223 raise ObjectNotFound(http_exc=e) from e
224 else:
225 raise
226 from prefect.client.schemas.objects import FlowRun
228 return FlowRun.model_validate(response.json())
230 def resume_flow_run( 1a
231 self, flow_run_id: "UUID", run_input: dict[str, Any] | None = None
232 ) -> "OrchestrationResult[Any]":
233 """
234 Resumes a paused flow run.
236 Args:
237 flow_run_id: the flow run ID of interest
238 run_input: the input to resume the flow run with
240 Returns:
241 an OrchestrationResult model representation of state orchestration output
242 """
243 try:
244 response = self.request(
245 "POST",
246 "/flow_runs/{id}/resume",
247 path_params={"id": flow_run_id},
248 json={"run_input": run_input},
249 )
250 except httpx.HTTPStatusError:
251 raise
252 from prefect.client.schemas import OrchestrationResult
254 result: OrchestrationResult[Any] = OrchestrationResult.model_validate(
255 response.json()
256 )
257 return result
259 def read_flow_runs( 1a
260 self,
261 *,
262 flow_filter: "FlowFilter | None" = None,
263 flow_run_filter: "FlowRunFilter | None" = None,
264 task_run_filter: "TaskRunFilter | None" = None,
265 deployment_filter: "DeploymentFilter | None" = None,
266 work_pool_filter: "WorkPoolFilter | None" = None,
267 work_queue_filter: "WorkQueueFilter | None" = None,
268 sort: "FlowRunSort | None" = None,
269 limit: int | None = None,
270 offset: int = 0,
271 ) -> "list[FlowRun]":
272 """
273 Query the Prefect API for flow runs. Only flow runs matching all criteria will
274 be returned.
276 Args:
277 flow_filter: filter criteria for flows
278 flow_run_filter: filter criteria for flow runs
279 task_run_filter: filter criteria for task runs
280 deployment_filter: filter criteria for deployments
281 work_pool_filter: filter criteria for work pools
282 work_queue_filter: filter criteria for work pool queues
283 sort: sort criteria for the flow runs
284 limit: limit for the flow run query
285 offset: offset for the flow run query
287 Returns:
288 a list of Flow Run model representations
289 of the flow runs
290 """
291 body: dict[str, Any] = {
292 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
293 "flow_runs": (
294 flow_run_filter.model_dump(mode="json", exclude_unset=True)
295 if flow_run_filter
296 else None
297 ),
298 "task_runs": (
299 task_run_filter.model_dump(mode="json") if task_run_filter else None
300 ),
301 "deployments": (
302 deployment_filter.model_dump(mode="json") if deployment_filter else None
303 ),
304 "work_pools": (
305 work_pool_filter.model_dump(mode="json") if work_pool_filter else None
306 ),
307 "work_pool_queues": (
308 work_queue_filter.model_dump(mode="json") if work_queue_filter else None
309 ),
310 "sort": sort,
311 "limit": limit,
312 "offset": offset,
313 }
315 response = self.request("POST", "/flow_runs/filter", json=body)
316 from prefect.client.schemas.objects import FlowRun
318 return FlowRun.model_validate_list(response.json())
320 def count_flow_runs( 1a
321 self,
322 *,
323 flow_filter: "FlowFilter | None" = None,
324 flow_run_filter: "FlowRunFilter | None" = None,
325 task_run_filter: "TaskRunFilter | None" = None,
326 deployment_filter: "DeploymentFilter | None" = None,
327 work_pool_filter: "WorkPoolFilter | None" = None,
328 work_queue_filter: "WorkQueueFilter | None" = None,
329 ) -> int:
330 """
331 Returns the count of flow runs matching all criteria for flow runs.
333 Args:
334 flow_filter: filter criteria for flows
335 flow_run_filter: filter criteria for flow runs
336 task_run_filter: filter criteria for task runs
337 deployment_filter: filter criteria for deployments
338 work_pool_filter: filter criteria for work pools
339 work_queue_filter: filter criteria for work pool queues
341 Returns:
342 count of flow runs
343 """
344 body: dict[str, Any] = {
345 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
346 "flow_runs": (
347 flow_run_filter.model_dump(mode="json", exclude_unset=True)
348 if flow_run_filter
349 else None
350 ),
351 "task_runs": (
352 task_run_filter.model_dump(mode="json") if task_run_filter else None
353 ),
354 "deployments": (
355 deployment_filter.model_dump(mode="json") if deployment_filter else None
356 ),
357 "work_pools": (
358 work_pool_filter.model_dump(mode="json") if work_pool_filter else None
359 ),
360 "work_pool_queues": (
361 work_queue_filter.model_dump(mode="json") if work_queue_filter else None
362 ),
363 }
365 response = self.request("POST", "/flow_runs/count", json=body)
366 return response.json()
368 def set_flow_run_state( 1a
369 self,
370 flow_run_id: "UUID | str",
371 state: "State[T]",
372 force: bool = False,
373 ) -> "OrchestrationResult[T]":
374 """
375 Set the state of a flow run.
377 Args:
378 flow_run_id: the id of the flow run
379 state: the state to set
380 force: if True, disregard orchestration logic when setting the state,
381 forcing the Prefect API to accept the state
383 Returns:
384 an OrchestrationResult model representation of state orchestration output
385 """
386 from uuid import UUID, uuid4
388 from prefect.states import to_state_create
390 flow_run_id = (
391 flow_run_id if isinstance(flow_run_id, UUID) else UUID(flow_run_id)
392 )
393 state_create = to_state_create(state)
394 state_create.state_details.flow_run_id = flow_run_id
395 state_create.state_details.transition_id = uuid4()
396 try:
397 response = self.request(
398 "POST",
399 "/flow_runs/{id}/set_state",
400 path_params={"id": flow_run_id},
401 json=dict(
402 state=state_create.model_dump(mode="json", serialize_as_any=True),
403 force=force,
404 ),
405 )
406 except httpx.HTTPStatusError as e:
407 if e.response.status_code == 404:
408 raise ObjectNotFound(http_exc=e) from e
409 else:
410 raise
411 from prefect.client.schemas import OrchestrationResult
413 result: OrchestrationResult[T] = OrchestrationResult.model_validate(
414 response.json()
415 )
416 return result
418 def read_flow_run_states(self, flow_run_id: "UUID") -> "list[State]": 1a
419 """
420 Query for the states of a flow run
422 Args:
423 flow_run_id: the id of the flow run
425 Returns:
426 a list of State model representations
427 of the flow run states
428 """
429 response = self.request(
430 "GET", "/flow_run_states/", params=dict(flow_run_id=str(flow_run_id))
431 )
432 from prefect.states import State
434 return State.model_validate_list(response.json())
436 def set_flow_run_name(self, flow_run_id: "UUID", name: str) -> httpx.Response: 1a
437 from prefect.client.schemas.actions import FlowRunUpdate
439 flow_run_data = FlowRunUpdate(name=name)
440 return self.request(
441 "PATCH",
442 "/flow_runs/{id}",
443 path_params={"id": flow_run_id},
444 json=flow_run_data.model_dump(mode="json", exclude_unset=True),
445 )
447 def create_flow_run_input( 1a
448 self, flow_run_id: "UUID", key: str, value: str, sender: str | None = None
449 ) -> None:
450 """
451 Creates a flow run input.
453 Args:
454 flow_run_id: The flow run id.
455 key: The input key.
456 value: The input value.
457 sender: The sender of the input.
458 """
460 # Initialize the input to ensure that the key is valid.
461 FlowRunInput(flow_run_id=flow_run_id, key=key, value=value)
463 response = self.request(
464 "POST",
465 "/flow_runs/{id}/input",
466 path_params={"id": flow_run_id},
467 json={"key": key, "value": value, "sender": sender},
468 )
469 response.raise_for_status()
471 def filter_flow_run_input( 1a
472 self, flow_run_id: "UUID", key_prefix: str, limit: int, exclude_keys: "set[str]"
473 ) -> "list[FlowRunInput]":
474 response = self.request(
475 "POST",
476 "/flow_runs/{id}/input/filter",
477 path_params={"id": flow_run_id},
478 json={
479 "prefix": key_prefix,
480 "limit": limit,
481 "exclude_keys": list(exclude_keys),
482 },
483 )
484 response.raise_for_status()
485 from prefect.client.schemas.objects import FlowRunInput
487 return FlowRunInput.model_validate_list(response.json())
489 def read_flow_run_input(self, flow_run_id: "UUID", key: str) -> str: 1a
490 """
491 Reads a flow run input.
493 Args:
494 flow_run_id: The flow run id.
495 key: The input key.
496 """
497 response = self.request(
498 "GET",
499 "/flow_runs/{id}/input/{key}",
500 path_params={"id": flow_run_id, "key": key},
501 )
502 response.raise_for_status()
503 return response.content.decode()
505 def delete_flow_run_input(self, flow_run_id: "UUID", key: str) -> None: 1a
506 """
507 Deletes a flow run input.
509 Args:
510 flow_run_id: The flow run id.
511 key: The input key.
512 """
513 response = self.request(
514 "DELETE",
515 "/flow_runs/{id}/input/{key}",
516 path_params={"id": flow_run_id, "key": key},
517 )
518 response.raise_for_status()
520 def update_flow_run_labels( 1a
521 self, flow_run_id: "UUID", labels: "KeyValueLabelsField"
522 ) -> None:
523 """
524 Updates the labels of a flow run.
525 """
527 response = self.request(
528 "PATCH",
529 "/flow_runs/{id}/labels",
530 path_params={"id": flow_run_id},
531 json=labels,
532 )
533 response.raise_for_status()
536class FlowRunAsyncClient(BaseAsyncClient): 1a
537 async def create_flow_run( 1a
538 self,
539 flow: "FlowObject[Any, R]",
540 name: str | None = None,
541 parameters: dict[str, Any] | None = None,
542 context: dict[str, Any] | None = None,
543 tags: "Iterable[str] | None" = None,
544 parent_task_run_id: "UUID | None" = None,
545 state: "State[R] | None" = None,
546 work_pool_name: str | None = None,
547 work_queue_name: str | None = None,
548 job_variables: dict[str, Any] | None = None,
549 ) -> "FlowRun":
550 """
551 Create a flow run for a flow.
553 Args:
554 flow: The flow model to create the flow run for
555 name: An optional name for the flow run
556 parameters: Parameter overrides for this flow run.
557 context: Optional run context data
558 tags: a list of tags to apply to this flow run
559 parent_task_run_id: if a subflow run is being created, the placeholder task
560 run identifier in the parent flow
561 state: The initial state for the run. If not provided, defaults to
562 `Pending`.
563 work_pool_name: The name of the work pool to run the flow run in.
564 work_queue_name: The name of the work queue to place the flow run in.
565 job_variables: The job variables to use when setting up flow run infrastructure.
567 Raises:
568 httpx.RequestError: if the Prefect API does not successfully create a run for any reason
570 Returns:
571 The flow run model
572 """
573 from prefect.client.schemas.actions import FlowCreate, FlowRunCreate
574 from prefect.client.schemas.objects import Flow, FlowRun, FlowRunPolicy
575 from prefect.states import Pending, to_state_create
577 parameters = parameters or {}
578 context = context or {}
580 if state is None:
581 state = Pending()
583 # Retrieve the flow id
585 flow_data = FlowCreate(name=flow.name)
586 response = await self.request(
587 "POST", "/flows/", json=flow_data.model_dump(mode="json")
588 )
589 flow_id = Flow.model_validate(response.json()).id
591 flow_run_create = FlowRunCreate(
592 flow_id=flow_id,
593 flow_version=flow.version,
594 name=name,
595 parameters=parameters,
596 context=context,
597 tags=list(tags or []),
598 parent_task_run_id=parent_task_run_id,
599 state=to_state_create(state),
600 empirical_policy=FlowRunPolicy(
601 retries=flow.retries,
602 retry_delay=int(flow.retry_delay_seconds or 0),
603 ),
604 )
606 if work_pool_name is not None:
607 flow_run_create.work_pool_name = work_pool_name
608 if work_queue_name is not None:
609 flow_run_create.work_queue_name = work_queue_name
610 if job_variables is not None:
611 flow_run_create.job_variables = job_variables
613 flow_run_create_json = flow_run_create.model_dump(
614 mode="json", exclude_unset=True
615 )
616 response = await self.request("POST", "/flow_runs/", json=flow_run_create_json)
618 flow_run = FlowRun.model_validate(response.json())
620 # Restore the parameters to the local objects to retain expectations about
621 # Python objects
622 flow_run.parameters = parameters
624 return flow_run
626 async def update_flow_run( 1a
627 self,
628 flow_run_id: "UUID",
629 flow_version: str | None = None,
630 parameters: dict[str, Any] | None = None,
631 name: str | None = None,
632 tags: "Iterable[str] | None" = None,
633 empirical_policy: "FlowRunPolicy | None" = None,
634 infrastructure_pid: str | None = None,
635 job_variables: dict[str, Any] | None = None,
636 ) -> httpx.Response:
637 """
638 Update a flow run's details.
640 Args:
641 flow_run_id: The identifier for the flow run to update.
642 flow_version: A new version string for the flow run.
643 parameters: A dictionary of parameter values for the flow run. This will not
644 be merged with any existing parameters.
645 name: A new name for the flow run.
646 empirical_policy: A new flow run orchestration policy. This will not be
647 merged with any existing policy.
648 tags: An iterable of new tags for the flow run. These will not be merged with
649 any existing tags.
650 infrastructure_pid: The id of flow run as returned by an
651 infrastructure block.
653 Returns:
654 an `httpx.Response` object from the PATCH request
655 """
656 params: dict[str, Any] = {}
657 if flow_version is not None:
658 params["flow_version"] = flow_version
659 if parameters is not None:
660 params["parameters"] = parameters
661 if name is not None:
662 params["name"] = name
663 if tags is not None:
664 params["tags"] = tags
665 if empirical_policy is not None:
666 params["empirical_policy"] = empirical_policy
667 if infrastructure_pid:
668 params["infrastructure_pid"] = infrastructure_pid
669 if job_variables is not None:
670 params["job_variables"] = job_variables
671 from prefect.client.schemas.actions import FlowRunUpdate
673 flow_run_data = FlowRunUpdate(**params)
675 return await self.request(
676 "PATCH",
677 "/flow_runs/{id}",
678 path_params={"id": flow_run_id},
679 json=flow_run_data.model_dump(mode="json", exclude_unset=True),
680 )
682 async def delete_flow_run( 1a
683 self,
684 flow_run_id: "UUID",
685 ) -> None:
686 """
687 Delete a flow run by UUID.
689 Args:
690 flow_run_id: The flow run UUID of interest.
691 Raises:
692 ObjectNotFound: If request returns 404
693 httpx.RequestError: If requests fails
694 """
695 try:
696 await self.request(
697 "DELETE", "/flow_runs/{id}", path_params={"id": flow_run_id}
698 )
699 except httpx.HTTPStatusError as e:
700 if e.response.status_code == 404:
701 raise ObjectNotFound(http_exc=e) from e
702 else:
703 raise
705 async def read_flow_run(self, flow_run_id: "UUID") -> "FlowRun": 1a
706 """
707 Query the Prefect API for a flow run by id.
709 Args:
710 flow_run_id: the flow run ID of interest
712 Returns:
713 a Flow Run model representation of the flow run
714 """
715 try:
716 response = await self.request(
717 "GET", "/flow_runs/{id}", path_params={"id": flow_run_id}
718 )
719 except httpx.HTTPStatusError as e:
720 if e.response.status_code == 404:
721 raise ObjectNotFound(http_exc=e) from e
722 else:
723 raise
724 from prefect.client.schemas.objects import FlowRun
726 return FlowRun.model_validate(response.json())
728 async def resume_flow_run( 1a
729 self, flow_run_id: "UUID", run_input: dict[str, Any] | None = None
730 ) -> "OrchestrationResult[Any]":
731 """
732 Resumes a paused flow run.
734 Args:
735 flow_run_id: the flow run ID of interest
736 run_input: the input to resume the flow run with
738 Returns:
739 an OrchestrationResult model representation of state orchestration output
740 """
741 try:
742 response = await self.request(
743 "POST",
744 "/flow_runs/{id}/resume",
745 path_params={"id": flow_run_id},
746 json={"run_input": run_input},
747 )
748 except httpx.HTTPStatusError:
749 raise
750 from prefect.client.schemas import OrchestrationResult
752 result: OrchestrationResult[Any] = OrchestrationResult.model_validate(
753 response.json()
754 )
755 return result
757 async def read_flow_runs( 1a
758 self,
759 *,
760 flow_filter: "FlowFilter | None" = None,
761 flow_run_filter: "FlowRunFilter | None" = None,
762 task_run_filter: "TaskRunFilter | None" = None,
763 deployment_filter: "DeploymentFilter | None" = None,
764 work_pool_filter: "WorkPoolFilter | None" = None,
765 work_queue_filter: "WorkQueueFilter | None" = None,
766 sort: "FlowRunSort | None" = None,
767 limit: int | None = None,
768 offset: int = 0,
769 ) -> "list[FlowRun]":
770 """
771 Query the Prefect API for flow runs. Only flow runs matching all criteria will
772 be returned.
774 Args:
775 flow_filter: filter criteria for flows
776 flow_run_filter: filter criteria for flow runs
777 task_run_filter: filter criteria for task runs
778 deployment_filter: filter criteria for deployments
779 work_pool_filter: filter criteria for work pools
780 work_queue_filter: filter criteria for work pool queues
781 sort: sort criteria for the flow runs
782 limit: limit for the flow run query
783 offset: offset for the flow run query
785 Returns:
786 a list of Flow Run model representations
787 of the flow runs
788 """
789 body: dict[str, Any] = {
790 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
791 "flow_runs": (
792 flow_run_filter.model_dump(mode="json", exclude_unset=True)
793 if flow_run_filter
794 else None
795 ),
796 "task_runs": (
797 task_run_filter.model_dump(mode="json") if task_run_filter else None
798 ),
799 "deployments": (
800 deployment_filter.model_dump(mode="json") if deployment_filter else None
801 ),
802 "work_pools": (
803 work_pool_filter.model_dump(mode="json") if work_pool_filter else None
804 ),
805 "work_pool_queues": (
806 work_queue_filter.model_dump(mode="json") if work_queue_filter else None
807 ),
808 "sort": sort,
809 "limit": limit,
810 "offset": offset,
811 }
813 response = await self.request("POST", "/flow_runs/filter", json=body)
814 from prefect.client.schemas.objects import FlowRun
816 return FlowRun.model_validate_list(response.json())
818 async def count_flow_runs( 1a
819 self,
820 *,
821 flow_filter: "FlowFilter | None" = None,
822 flow_run_filter: "FlowRunFilter | None" = None,
823 task_run_filter: "TaskRunFilter | None" = None,
824 deployment_filter: "DeploymentFilter | None" = None,
825 work_pool_filter: "WorkPoolFilter | None" = None,
826 work_queue_filter: "WorkQueueFilter | None" = None,
827 ) -> int:
828 """
829 Returns the count of flow runs matching all criteria for flow runs.
831 Args:
832 flow_filter: filter criteria for flows
833 flow_run_filter: filter criteria for flow runs
834 task_run_filter: filter criteria for task runs
835 deployment_filter: filter criteria for deployments
836 work_pool_filter: filter criteria for work pools
837 work_queue_filter: filter criteria for work pool queues
839 Returns:
840 count of flow runs
841 """
842 body: dict[str, Any] = {
843 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
844 "flow_runs": (
845 flow_run_filter.model_dump(mode="json", exclude_unset=True)
846 if flow_run_filter
847 else None
848 ),
849 "task_runs": (
850 task_run_filter.model_dump(mode="json") if task_run_filter else None
851 ),
852 "deployments": (
853 deployment_filter.model_dump(mode="json") if deployment_filter else None
854 ),
855 "work_pools": (
856 work_pool_filter.model_dump(mode="json") if work_pool_filter else None
857 ),
858 "work_pool_queues": (
859 work_queue_filter.model_dump(mode="json") if work_queue_filter else None
860 ),
861 }
863 response = await self.request("POST", "/flow_runs/count", json=body)
864 return response.json()
866 async def set_flow_run_state( 1a
867 self,
868 flow_run_id: "UUID | str",
869 state: "State[T]",
870 force: bool = False,
871 ) -> "OrchestrationResult[T]":
872 """
873 Set the state of a flow run.
875 Args:
876 flow_run_id: the id of the flow run
877 state: the state to set
878 force: if True, disregard orchestration logic when setting the state,
879 forcing the Prefect API to accept the state
881 Returns:
882 an OrchestrationResult model representation of state orchestration output
883 """
884 from uuid import UUID, uuid4
886 from prefect.states import to_state_create
888 flow_run_id = (
889 flow_run_id if isinstance(flow_run_id, UUID) else UUID(flow_run_id)
890 )
891 state_create = to_state_create(state)
892 state_create.state_details.flow_run_id = flow_run_id
893 state_create.state_details.transition_id = uuid4()
894 try:
895 response = await self.request(
896 "POST",
897 "/flow_runs/{id}/set_state",
898 path_params={"id": flow_run_id},
899 json=dict(
900 state=state_create.model_dump(mode="json", serialize_as_any=True),
901 force=force,
902 ),
903 )
904 except httpx.HTTPStatusError as e:
905 if e.response.status_code == 404:
906 raise ObjectNotFound(http_exc=e) from e
907 else:
908 raise
909 from prefect.client.schemas import OrchestrationResult
911 result: OrchestrationResult[T] = OrchestrationResult.model_validate(
912 response.json()
913 )
914 return result
916 async def read_flow_run_states(self, flow_run_id: "UUID") -> "list[State]": 1a
917 """
918 Query for the states of a flow run
920 Args:
921 flow_run_id: the id of the flow run
923 Returns:
924 a list of State model representations
925 of the flow run states
926 """
927 response = await self.request(
928 "GET", "/flow_run_states/", params=dict(flow_run_id=str(flow_run_id))
929 )
930 from prefect.states import State
932 return State.model_validate_list(response.json())
934 async def set_flow_run_name(self, flow_run_id: "UUID", name: str) -> httpx.Response: 1a
935 from prefect.client.schemas.actions import FlowRunUpdate
937 flow_run_data = FlowRunUpdate(name=name)
938 return await self.request(
939 "PATCH",
940 "/flow_runs/{id}",
941 path_params={"id": flow_run_id},
942 json=flow_run_data.model_dump(mode="json", exclude_unset=True),
943 )
945 async def create_flow_run_input( 1a
946 self, flow_run_id: "UUID", key: str, value: str, sender: str | None = None
947 ) -> None:
948 """
949 Creates a flow run input.
951 Args:
952 flow_run_id: The flow run id.
953 key: The input key.
954 value: The input value.
955 sender: The sender of the input.
956 """
958 # Initialize the input to ensure that the key is valid.
959 from prefect.client.schemas.objects import FlowRunInput
961 FlowRunInput(flow_run_id=flow_run_id, key=key, value=value)
963 response = await self.request(
964 "POST",
965 "/flow_runs/{id}/input",
966 path_params={"id": flow_run_id},
967 json={"key": key, "value": value, "sender": sender},
968 )
969 response.raise_for_status()
971 async def filter_flow_run_input( 1a
972 self, flow_run_id: "UUID", key_prefix: str, limit: int, exclude_keys: "set[str]"
973 ) -> "list[FlowRunInput]":
974 response = await self.request(
975 "POST",
976 "/flow_runs/{id}/input/filter",
977 path_params={"id": flow_run_id},
978 json={
979 "prefix": key_prefix,
980 "limit": limit,
981 "exclude_keys": list(exclude_keys),
982 },
983 )
984 response.raise_for_status()
985 from prefect.client.schemas.objects import FlowRunInput
987 return FlowRunInput.model_validate_list(response.json())
989 async def read_flow_run_input(self, flow_run_id: "UUID", key: str) -> str: 1a
990 """
991 Reads a flow run input.
993 Args:
994 flow_run_id: The flow run id.
995 key: The input key.
996 """
997 response = await self.request(
998 "GET",
999 "/flow_runs/{id}/input/{key}",
1000 path_params={"id": flow_run_id, "key": key},
1001 )
1002 response.raise_for_status()
1003 return response.content.decode()
1005 async def delete_flow_run_input(self, flow_run_id: "UUID", key: str) -> None: 1a
1006 """
1007 Deletes a flow run input.
1009 Args:
1010 flow_run_id: The flow run id.
1011 key: The input key.
1012 """
1013 response = await self.request(
1014 "DELETE",
1015 "/flow_runs/{id}/input/{key}",
1016 path_params={"id": flow_run_id, "key": key},
1017 )
1018 response.raise_for_status()
1020 async def update_flow_run_labels( 1a
1021 self, flow_run_id: "UUID", labels: "KeyValueLabelsField"
1022 ) -> None:
1023 """
1024 Updates the labels of a flow run.
1025 """
1027 response = await self.request(
1028 "PATCH",
1029 "/flow_runs/{id}/labels",
1030 path_params={"id": flow_run_id},
1031 json=labels,
1032 )
1033 response.raise_for_status()