Coverage for /usr/local/lib/python3.12/site-packages/prefect/input/actions.py: 35%
60 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import inspect 1a
2from typing import TYPE_CHECKING, Any, Optional, Set 1a
3from uuid import UUID 1a
5import orjson 1a
6import pydantic 1a
8from prefect.client.utilities import client_injector 1a
9from prefect.context import FlowRunContext 1a
10from prefect.exceptions import PrefectHTTPStatusError 1a
11from prefect.utilities.asyncutils import sync_compatible 1a
13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a
14 from prefect.client.orchestration import PrefectClient
17from prefect._internal.pydantic.v2_schema import is_v2_model 1a
20def ensure_flow_run_id(flow_run_id: Optional[UUID] = None) -> UUID: 1a
21 if flow_run_id:
22 return flow_run_id
24 context = FlowRunContext.get()
25 if context is None or context.flow_run is None:
26 raise RuntimeError("Must either provide a flow run ID or be within a flow run.")
28 return context.flow_run.id
31@sync_compatible 1a
32async def create_flow_run_input_from_model( 1a
33 key: str,
34 model_instance: pydantic.BaseModel,
35 flow_run_id: Optional[UUID] = None,
36 sender: Optional[str] = None,
37):
38 if sender is None:
39 context = FlowRunContext.get()
40 if context is not None and context.flow_run is not None:
41 sender = f"prefect.flow-run.{context.flow_run.id}"
43 if is_v2_model(model_instance):
44 json_safe = orjson.loads(model_instance.model_dump_json())
45 else:
46 json_safe = orjson.loads(model_instance.json())
48 coro = create_flow_run_input(
49 key=key, value=json_safe, flow_run_id=flow_run_id, sender=sender
50 )
51 if TYPE_CHECKING:
52 assert inspect.iscoroutine(coro)
53 await coro
56@sync_compatible 1a
57@client_injector 1a
58async def create_flow_run_input( 1a
59 client: "PrefectClient",
60 key: str,
61 value: Any,
62 flow_run_id: Optional[UUID] = None,
63 sender: Optional[str] = None,
64):
65 """
66 Create a new flow run input. The given `value` will be serialized to JSON
67 and stored as a flow run input value.
69 Args:
70 - key (str): the flow run input key
71 - value (Any): the flow run input value
72 - flow_run_id (UUID): the, optional, flow run ID. If not given will
73 default to pulling the flow run ID from the current context.
74 """
75 flow_run_id = ensure_flow_run_id(flow_run_id)
77 await client.create_flow_run_input(
78 flow_run_id=flow_run_id,
79 key=key,
80 sender=sender,
81 value=orjson.dumps(value).decode(),
82 )
85@sync_compatible 1a
86@client_injector 1a
87async def filter_flow_run_input( 1a
88 client: "PrefectClient",
89 key_prefix: str,
90 limit: int = 1,
91 exclude_keys: Optional[Set[str]] = None,
92 flow_run_id: Optional[UUID] = None,
93):
94 if exclude_keys is None:
95 exclude_keys = set()
97 flow_run_id = ensure_flow_run_id(flow_run_id)
99 return await client.filter_flow_run_input(
100 flow_run_id=flow_run_id,
101 key_prefix=key_prefix,
102 limit=limit,
103 exclude_keys=exclude_keys,
104 )
107@sync_compatible 1a
108@client_injector 1a
109async def read_flow_run_input( 1a
110 client: "PrefectClient", key: str, flow_run_id: Optional[UUID] = None
111) -> Any:
112 """Read a flow run input.
114 Args:
115 - key (str): the flow run input key
116 - flow_run_id (UUID): the flow run ID
117 """
118 flow_run_id = ensure_flow_run_id(flow_run_id)
120 try:
121 value = await client.read_flow_run_input(flow_run_id=flow_run_id, key=key)
122 except PrefectHTTPStatusError as exc:
123 if exc.response.status_code == 404:
124 return None
125 raise
126 else:
127 return orjson.loads(value)
130@sync_compatible 1a
131@client_injector 1a
132async def delete_flow_run_input( 1a
133 client: "PrefectClient", key: str, flow_run_id: Optional[UUID] = None
134):
135 """Delete a flow run input.
137 Args:
138 - flow_run_id (UUID): the flow run ID
139 - key (str): the flow run input key
140 """
142 flow_run_id = ensure_flow_run_id(flow_run_id)
144 await client.delete_flow_run_input(flow_run_id=flow_run_id, key=key)