Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/flow_runs.py: 0%
86 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from datetime import datetime
2from typing import TYPE_CHECKING, Any, Iterable, Optional, Union
3from uuid import UUID
5import anyio
6from opentelemetry import trace
8import prefect
9from prefect._result_records import ResultRecordMetadata
10from prefect.client.schemas import FlowRun, TaskRunResult
11from prefect.client.utilities import inject_client
12from prefect.context import FlowRunContext, TaskRunContext
13from prefect.logging import get_logger
14from prefect.states import Pending, Scheduled
15from prefect.tasks import Task
16from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry
17from prefect.types._datetime import now
18from prefect.utilities.asyncutils import sync_compatible
19from prefect.utilities.slugify import slugify
22def _is_instrumentation_enabled() -> bool:
23 try:
24 from opentelemetry.instrumentation.utils import is_instrumentation_enabled
26 return is_instrumentation_enabled()
27 except (ImportError, ModuleNotFoundError):
28 return False
31if TYPE_CHECKING:
32 from prefect.client.orchestration import PrefectClient
33 from prefect.client.schemas.objects import FlowRun
35prefect.client.schemas.StateCreate.model_rebuild(
36 _types_namespace={
37 "ResultRecordMetadata": ResultRecordMetadata,
38 }
39)
42if TYPE_CHECKING:
43 import logging
45logger: "logging.Logger" = get_logger(__name__)
48@sync_compatible
49@inject_client
50async def run_deployment(
51 name: Union[str, UUID],
52 client: Optional["PrefectClient"] = None,
53 parameters: Optional[dict[str, Any]] = None,
54 scheduled_time: Optional[datetime] = None,
55 flow_run_name: Optional[str] = None,
56 timeout: Optional[float] = None,
57 poll_interval: Optional[float] = 5,
58 tags: Optional[Iterable[str]] = None,
59 idempotency_key: Optional[str] = None,
60 work_queue_name: Optional[str] = None,
61 as_subflow: Optional[bool] = True,
62 job_variables: Optional[dict[str, Any]] = None,
63) -> "FlowRun":
64 """
65 Create a flow run for a deployment and return it after completion or a timeout.
67 By default, this function blocks until the flow run finishes executing.
68 Specify a timeout (in seconds) to wait for the flow run to execute before
69 returning flow run metadata. To return immediately, without waiting for the
70 flow run to execute, set `timeout=0`.
72 Note that if you specify a timeout, this function will return the flow run
73 metadata whether or not the flow run finished executing.
75 If called within a flow or task, the flow run this function creates will
76 be linked to the current flow run as a subflow. Disable this behavior by
77 passing `as_subflow=False`.
79 Args:
80 name: The deployment id or deployment name in the form:
81 `"flow name/deployment name"`
82 parameters: Parameter overrides for this flow run. Merged with the deployment
83 defaults.
84 scheduled_time: The time to schedule the flow run for, defaults to scheduling
85 the flow run to start now.
86 flow_run_name: A name for the created flow run
87 timeout: The amount of time to wait (in seconds) for the flow run to
88 complete before returning. Setting `timeout` to 0 will return the flow
89 run metadata immediately. Setting `timeout` to None will allow this
90 function to poll indefinitely. Defaults to None.
91 poll_interval: The number of seconds between polls
92 tags: A list of tags to associate with this flow run; tags can be used in
93 automations and for organizational purposes.
94 idempotency_key: A unique value to recognize retries of the same run, and
95 prevent creating multiple flow runs.
96 work_queue_name: The name of a work queue to use for this run. Defaults to
97 the default work queue for the deployment.
98 as_subflow: Whether to link the flow run as a subflow of the current
99 flow or task run.
100 job_variables: A dictionary of dot delimited infrastructure overrides that
101 will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
102 `namespace='prefect'`
103 """
104 if timeout is not None and timeout < 0:
105 raise ValueError("`timeout` cannot be negative")
107 if scheduled_time is None:
108 scheduled_time = now("UTC")
110 parameters = parameters or {}
112 deployment_id = None
114 if isinstance(name, UUID):
115 deployment_id = name
116 else:
117 try:
118 deployment_id = UUID(name)
119 except ValueError:
120 pass
122 if deployment_id:
123 deployment = await client.read_deployment(deployment_id=deployment_id)
124 else:
125 deployment = await client.read_deployment_by_name(name)
127 flow_run_ctx = FlowRunContext.get()
128 task_run_ctx = TaskRunContext.get()
129 if as_subflow and (flow_run_ctx or task_run_ctx):
130 # TODO: this logic can likely be simplified by using `Task.create_run`
131 from prefect.utilities._engine import dynamic_key_for_task_run
132 from prefect.utilities.engine import collect_task_run_inputs
134 # This was called from a flow. Link the flow run as a subflow.
135 task_inputs = {
136 k: await collect_task_run_inputs(v) for k, v in parameters.items()
137 }
139 # Track parent task if this is being called from within a task
140 # This enables the execution graph to properly display the deployment
141 # flow run as nested under the calling task
142 if task_run_ctx:
143 # The task run is only considered a parent if it is in the same
144 # flow run (otherwise the child is in a subflow, so the subflow
145 # serves as the parent) or if there is no flow run
146 if not flow_run_ctx or (
147 task_run_ctx.task_run.flow_run_id
148 == getattr(flow_run_ctx.flow_run, "id", None)
149 ):
150 task_inputs["__parents__"] = [
151 TaskRunResult(id=task_run_ctx.task_run.id)
152 ]
154 if deployment_id:
155 flow = await client.read_flow(deployment.flow_id)
156 deployment_name = f"{flow.name}/{deployment.name}"
157 else:
158 deployment_name = name
160 # Generate a task in the parent flow run to represent the result of the subflow
161 dummy_task = Task(
162 name=deployment_name,
163 fn=lambda: None,
164 version=deployment.version,
165 )
166 # Override the default task key to include the deployment name
167 dummy_task.task_key = f"{__name__}.run_deployment.{slugify(deployment_name)}"
168 flow_run_id = (
169 flow_run_ctx.flow_run.id
170 if flow_run_ctx
171 else task_run_ctx.task_run.flow_run_id
172 )
173 dynamic_key = (
174 dynamic_key_for_task_run(flow_run_ctx, dummy_task)
175 if flow_run_ctx
176 else task_run_ctx.task_run.dynamic_key
177 )
178 parent_task_run = await client.create_task_run(
179 task=dummy_task,
180 flow_run_id=flow_run_id,
181 dynamic_key=dynamic_key,
182 task_inputs=task_inputs,
183 state=Pending(),
184 )
185 parent_task_run_id = parent_task_run.id
186 else:
187 parent_task_run_id = None
189 if flow_run_ctx and flow_run_ctx.flow_run:
190 traceparent = flow_run_ctx.flow_run.labels.get(LABELS_TRACEPARENT_KEY)
191 elif _is_instrumentation_enabled():
192 traceparent = RunTelemetry.traceparent_from_span(span=trace.get_current_span())
193 else:
194 traceparent = None
196 trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {}
198 flow_run = await client.create_flow_run_from_deployment(
199 deployment.id,
200 parameters=parameters,
201 state=Scheduled(scheduled_time=scheduled_time),
202 name=flow_run_name,
203 tags=tags,
204 idempotency_key=idempotency_key,
205 parent_task_run_id=parent_task_run_id,
206 work_queue_name=work_queue_name,
207 job_variables=job_variables,
208 labels=trace_labels,
209 )
211 flow_run_id = flow_run.id
213 if timeout == 0:
214 return flow_run
216 with anyio.move_on_after(timeout):
217 while True:
218 flow_run = await client.read_flow_run(flow_run_id)
219 flow_state = flow_run.state
220 if flow_state and flow_state.is_final():
221 return flow_run
222 await anyio.sleep(poll_interval)
224 return flow_run