Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/flow_runs.py: 0%

86 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from datetime import datetime 

2from typing import TYPE_CHECKING, Any, Iterable, Optional, Union 

3from uuid import UUID 

4 

5import anyio 

6from opentelemetry import trace 

7 

8import prefect 

9from prefect._result_records import ResultRecordMetadata 

10from prefect.client.schemas import FlowRun, TaskRunResult 

11from prefect.client.utilities import inject_client 

12from prefect.context import FlowRunContext, TaskRunContext 

13from prefect.logging import get_logger 

14from prefect.states import Pending, Scheduled 

15from prefect.tasks import Task 

16from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry 

17from prefect.types._datetime import now 

18from prefect.utilities.asyncutils import sync_compatible 

19from prefect.utilities.slugify import slugify 

20 

21 

22def _is_instrumentation_enabled() -> bool: 

23 try: 

24 from opentelemetry.instrumentation.utils import is_instrumentation_enabled 

25 

26 return is_instrumentation_enabled() 

27 except (ImportError, ModuleNotFoundError): 

28 return False 

29 

30 

31if TYPE_CHECKING: 

32 from prefect.client.orchestration import PrefectClient 

33 from prefect.client.schemas.objects import FlowRun 

34 

35prefect.client.schemas.StateCreate.model_rebuild( 

36 _types_namespace={ 

37 "ResultRecordMetadata": ResultRecordMetadata, 

38 } 

39) 

40 

41 

42if TYPE_CHECKING: 

43 import logging 

44 

45logger: "logging.Logger" = get_logger(__name__) 

46 

47 

48@sync_compatible 

49@inject_client 

50async def run_deployment( 

51 name: Union[str, UUID], 

52 client: Optional["PrefectClient"] = None, 

53 parameters: Optional[dict[str, Any]] = None, 

54 scheduled_time: Optional[datetime] = None, 

55 flow_run_name: Optional[str] = None, 

56 timeout: Optional[float] = None, 

57 poll_interval: Optional[float] = 5, 

58 tags: Optional[Iterable[str]] = None, 

59 idempotency_key: Optional[str] = None, 

60 work_queue_name: Optional[str] = None, 

61 as_subflow: Optional[bool] = True, 

62 job_variables: Optional[dict[str, Any]] = None, 

63) -> "FlowRun": 

64 """ 

65 Create a flow run for a deployment and return it after completion or a timeout. 

66 

67 By default, this function blocks until the flow run finishes executing. 

68 Specify a timeout (in seconds) to wait for the flow run to execute before 

69 returning flow run metadata. To return immediately, without waiting for the 

70 flow run to execute, set `timeout=0`. 

71 

72 Note that if you specify a timeout, this function will return the flow run 

73 metadata whether or not the flow run finished executing. 

74 

75 If called within a flow or task, the flow run this function creates will 

76 be linked to the current flow run as a subflow. Disable this behavior by 

77 passing `as_subflow=False`. 

78 

79 Args: 

80 name: The deployment id or deployment name in the form: 

81 `"flow name/deployment name"` 

82 parameters: Parameter overrides for this flow run. Merged with the deployment 

83 defaults. 

84 scheduled_time: The time to schedule the flow run for, defaults to scheduling 

85 the flow run to start now. 

86 flow_run_name: A name for the created flow run 

87 timeout: The amount of time to wait (in seconds) for the flow run to 

88 complete before returning. Setting `timeout` to 0 will return the flow 

89 run metadata immediately. Setting `timeout` to None will allow this 

90 function to poll indefinitely. Defaults to None. 

91 poll_interval: The number of seconds between polls 

92 tags: A list of tags to associate with this flow run; tags can be used in 

93 automations and for organizational purposes. 

94 idempotency_key: A unique value to recognize retries of the same run, and 

95 prevent creating multiple flow runs. 

96 work_queue_name: The name of a work queue to use for this run. Defaults to 

97 the default work queue for the deployment. 

98 as_subflow: Whether to link the flow run as a subflow of the current 

99 flow or task run. 

100 job_variables: A dictionary of dot delimited infrastructure overrides that 

101 will be applied at runtime; for example `env.CONFIG_KEY=config_value` or 

102 `namespace='prefect'` 

103 """ 

104 if timeout is not None and timeout < 0: 

105 raise ValueError("`timeout` cannot be negative") 

106 

107 if scheduled_time is None: 

108 scheduled_time = now("UTC") 

109 

110 parameters = parameters or {} 

111 

112 deployment_id = None 

113 

114 if isinstance(name, UUID): 

115 deployment_id = name 

116 else: 

117 try: 

118 deployment_id = UUID(name) 

119 except ValueError: 

120 pass 

121 

122 if deployment_id: 

123 deployment = await client.read_deployment(deployment_id=deployment_id) 

124 else: 

125 deployment = await client.read_deployment_by_name(name) 

126 

127 flow_run_ctx = FlowRunContext.get() 

128 task_run_ctx = TaskRunContext.get() 

129 if as_subflow and (flow_run_ctx or task_run_ctx): 

130 # TODO: this logic can likely be simplified by using `Task.create_run` 

131 from prefect.utilities._engine import dynamic_key_for_task_run 

132 from prefect.utilities.engine import collect_task_run_inputs 

133 

134 # This was called from a flow. Link the flow run as a subflow. 

135 task_inputs = { 

136 k: await collect_task_run_inputs(v) for k, v in parameters.items() 

137 } 

138 

139 # Track parent task if this is being called from within a task 

140 # This enables the execution graph to properly display the deployment 

141 # flow run as nested under the calling task 

142 if task_run_ctx: 

143 # The task run is only considered a parent if it is in the same 

144 # flow run (otherwise the child is in a subflow, so the subflow 

145 # serves as the parent) or if there is no flow run 

146 if not flow_run_ctx or ( 

147 task_run_ctx.task_run.flow_run_id 

148 == getattr(flow_run_ctx.flow_run, "id", None) 

149 ): 

150 task_inputs["__parents__"] = [ 

151 TaskRunResult(id=task_run_ctx.task_run.id) 

152 ] 

153 

154 if deployment_id: 

155 flow = await client.read_flow(deployment.flow_id) 

156 deployment_name = f"{flow.name}/{deployment.name}" 

157 else: 

158 deployment_name = name 

159 

160 # Generate a task in the parent flow run to represent the result of the subflow 

161 dummy_task = Task( 

162 name=deployment_name, 

163 fn=lambda: None, 

164 version=deployment.version, 

165 ) 

166 # Override the default task key to include the deployment name 

167 dummy_task.task_key = f"{__name__}.run_deployment.{slugify(deployment_name)}" 

168 flow_run_id = ( 

169 flow_run_ctx.flow_run.id 

170 if flow_run_ctx 

171 else task_run_ctx.task_run.flow_run_id 

172 ) 

173 dynamic_key = ( 

174 dynamic_key_for_task_run(flow_run_ctx, dummy_task) 

175 if flow_run_ctx 

176 else task_run_ctx.task_run.dynamic_key 

177 ) 

178 parent_task_run = await client.create_task_run( 

179 task=dummy_task, 

180 flow_run_id=flow_run_id, 

181 dynamic_key=dynamic_key, 

182 task_inputs=task_inputs, 

183 state=Pending(), 

184 ) 

185 parent_task_run_id = parent_task_run.id 

186 else: 

187 parent_task_run_id = None 

188 

189 if flow_run_ctx and flow_run_ctx.flow_run: 

190 traceparent = flow_run_ctx.flow_run.labels.get(LABELS_TRACEPARENT_KEY) 

191 elif _is_instrumentation_enabled(): 

192 traceparent = RunTelemetry.traceparent_from_span(span=trace.get_current_span()) 

193 else: 

194 traceparent = None 

195 

196 trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {} 

197 

198 flow_run = await client.create_flow_run_from_deployment( 

199 deployment.id, 

200 parameters=parameters, 

201 state=Scheduled(scheduled_time=scheduled_time), 

202 name=flow_run_name, 

203 tags=tags, 

204 idempotency_key=idempotency_key, 

205 parent_task_run_id=parent_task_run_id, 

206 work_queue_name=work_queue_name, 

207 job_variables=job_variables, 

208 labels=trace_labels, 

209 ) 

210 

211 flow_run_id = flow_run.id 

212 

213 if timeout == 0: 

214 return flow_run 

215 

216 with anyio.move_on_after(timeout): 

217 while True: 

218 flow_run = await client.read_flow_run(flow_run_id) 

219 flow_state = flow_run.state 

220 if flow_state and flow_state.is_final(): 

221 return flow_run 

222 await anyio.sleep(poll_interval) 

223 

224 return flow_run