Coverage for /usr/local/lib/python3.12/site-packages/prefect/input/actions.py: 35%

60 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import inspect 1a

2from typing import TYPE_CHECKING, Any, Optional, Set 1a

3from uuid import UUID 1a

4 

5import orjson 1a

6import pydantic 1a

7 

8from prefect.client.utilities import client_injector 1a

9from prefect.context import FlowRunContext 1a

10from prefect.exceptions import PrefectHTTPStatusError 1a

11from prefect.utilities.asyncutils import sync_compatible 1a

12 

13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a

14 from prefect.client.orchestration import PrefectClient 

15 

16 

17from prefect._internal.pydantic.v2_schema import is_v2_model 1a

18 

19 

20def ensure_flow_run_id(flow_run_id: Optional[UUID] = None) -> UUID: 1a

21 if flow_run_id: 

22 return flow_run_id 

23 

24 context = FlowRunContext.get() 

25 if context is None or context.flow_run is None: 

26 raise RuntimeError("Must either provide a flow run ID or be within a flow run.") 

27 

28 return context.flow_run.id 

29 

30 

31@sync_compatible 1a

32async def create_flow_run_input_from_model( 1a

33 key: str, 

34 model_instance: pydantic.BaseModel, 

35 flow_run_id: Optional[UUID] = None, 

36 sender: Optional[str] = None, 

37): 

38 if sender is None: 

39 context = FlowRunContext.get() 

40 if context is not None and context.flow_run is not None: 

41 sender = f"prefect.flow-run.{context.flow_run.id}" 

42 

43 if is_v2_model(model_instance): 

44 json_safe = orjson.loads(model_instance.model_dump_json()) 

45 else: 

46 json_safe = orjson.loads(model_instance.json()) 

47 

48 coro = create_flow_run_input( 

49 key=key, value=json_safe, flow_run_id=flow_run_id, sender=sender 

50 ) 

51 if TYPE_CHECKING: 

52 assert inspect.iscoroutine(coro) 

53 await coro 

54 

55 

56@sync_compatible 1a

57@client_injector 1a

58async def create_flow_run_input( 1a

59 client: "PrefectClient", 

60 key: str, 

61 value: Any, 

62 flow_run_id: Optional[UUID] = None, 

63 sender: Optional[str] = None, 

64): 

65 """ 

66 Create a new flow run input. The given `value` will be serialized to JSON 

67 and stored as a flow run input value. 

68 

69 Args: 

70 - key (str): the flow run input key 

71 - value (Any): the flow run input value 

72 - flow_run_id (UUID): the, optional, flow run ID. If not given will 

73 default to pulling the flow run ID from the current context. 

74 """ 

75 flow_run_id = ensure_flow_run_id(flow_run_id) 

76 

77 await client.create_flow_run_input( 

78 flow_run_id=flow_run_id, 

79 key=key, 

80 sender=sender, 

81 value=orjson.dumps(value).decode(), 

82 ) 

83 

84 

85@sync_compatible 1a

86@client_injector 1a

87async def filter_flow_run_input( 1a

88 client: "PrefectClient", 

89 key_prefix: str, 

90 limit: int = 1, 

91 exclude_keys: Optional[Set[str]] = None, 

92 flow_run_id: Optional[UUID] = None, 

93): 

94 if exclude_keys is None: 

95 exclude_keys = set() 

96 

97 flow_run_id = ensure_flow_run_id(flow_run_id) 

98 

99 return await client.filter_flow_run_input( 

100 flow_run_id=flow_run_id, 

101 key_prefix=key_prefix, 

102 limit=limit, 

103 exclude_keys=exclude_keys, 

104 ) 

105 

106 

107@sync_compatible 1a

108@client_injector 1a

109async def read_flow_run_input( 1a

110 client: "PrefectClient", key: str, flow_run_id: Optional[UUID] = None 

111) -> Any: 

112 """Read a flow run input. 

113 

114 Args: 

115 - key (str): the flow run input key 

116 - flow_run_id (UUID): the flow run ID 

117 """ 

118 flow_run_id = ensure_flow_run_id(flow_run_id) 

119 

120 try: 

121 value = await client.read_flow_run_input(flow_run_id=flow_run_id, key=key) 

122 except PrefectHTTPStatusError as exc: 

123 if exc.response.status_code == 404: 

124 return None 

125 raise 

126 else: 

127 return orjson.loads(value) 

128 

129 

130@sync_compatible 1a

131@client_injector 1a

132async def delete_flow_run_input( 1a

133 client: "PrefectClient", key: str, flow_run_id: Optional[UUID] = None 

134): 

135 """Delete a flow run input. 

136 

137 Args: 

138 - flow_run_id (UUID): the flow run ID 

139 - key (str): the flow run input key 

140 """ 

141 

142 flow_run_id = ensure_flow_run_id(flow_run_id) 

143 

144 await client.delete_flow_run_input(flow_run_id=flow_run_id, key=key)