Coverage for /usr/local/lib/python3.12/site-packages/prefect/runtime/deployment.py: 20%
76 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Access attributes of the current deployment run dynamically.
4Note that if a deployment is not currently being run, all attributes will return empty values.
6You can mock the runtime attributes for testing purposes by setting environment variables
7prefixed with `PREFECT__RUNTIME__DEPLOYMENT`.
9Example usage:
10 ```python
11 from prefect.runtime import deployment
13 def get_task_runner():
14 task_runner_config = deployment.parameters.get("runner_config", "default config here")
15 return DummyTaskRunner(task_runner_specs=task_runner_config)
16 ```
18Available attributes:
19 - `id`: the deployment's unique ID
20 - `name`: the deployment's name
21 - `version`: the deployment's version
22 - `flow_run_id`: the current flow run ID for this deployment
23 - `parameters`: the parameters that were passed to this run; note that these do not necessarily
24 include default values set on the flow function, only the parameter values set on the deployment
25 object or those directly provided via API for this run
26"""
28from __future__ import annotations 1a
30import os 1a
31from typing import TYPE_CHECKING, Any, Callable, List, Optional 1a
33from prefect._internal.concurrency.api import create_call, from_sync 1a
34from prefect.client.orchestration import get_client 1a
35from prefect.context import FlowRunContext, _deployment_id, _deployment_parameters 1a
37from .flow_run import _get_flow_run 1a
39if TYPE_CHECKING: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true1a
40 from prefect.client.schemas.responses import DeploymentResponse
42__all__ = ["id", "flow_run_id", "name", "parameters", "version"] 1a
44CACHED_DEPLOYMENT: dict[str, "DeploymentResponse"] = {} 1a
47type_cast: dict[ 1a
48 type[bool] | type[int] | type[float] | type[str] | type[None], Callable[[Any], Any]
49] = {
50 bool: lambda x: x.lower() == "true",
51 int: int,
52 float: float,
53 str: str,
54 # for optional defined attributes, when real value is NoneType, use str
55 type(None): str,
56}
59def __getattr__(name: str) -> Any: 1a
60 """
61 Attribute accessor for this submodule; note that imports also work with this:
63 from prefect.runtime.flow_run import id
64 """
66 func = FIELDS.get(name)
68 # if `name` is an attribute but it is mocked through environment variable, the mocked type will be str,
69 # which might be different from original one. For consistency, cast env var to the same type
70 env_key = f"PREFECT__RUNTIME__DEPLOYMENT__{name.upper()}"
72 if func is None:
73 if env_key in os.environ:
74 return os.environ[env_key]
75 else:
76 raise AttributeError(f"{__name__} has no attribute {name!r}")
78 real_value = func()
79 if env_key in os.environ:
80 mocked_value = os.environ[env_key]
81 # cast `mocked_value` to the same type as `real_value`
82 try:
83 cast_func = type_cast[type(real_value)]
84 return cast_func(mocked_value)
85 except KeyError:
86 raise ValueError(
87 "This runtime context attribute cannot be mocked using an"
88 " environment variable. Please use monkeypatch instead."
89 )
90 else:
91 return real_value
94def __dir__() -> List[str]: 1a
95 return sorted(__all__)
98async def _get_deployment(deployment_id: str) -> "DeploymentResponse": 1a
99 # deployments won't change between calls so let's avoid the lifecycle of a client
100 if CACHED_DEPLOYMENT.get(deployment_id):
101 return CACHED_DEPLOYMENT[deployment_id]
102 async with get_client() as client:
103 CACHED_DEPLOYMENT[deployment_id] = await client.read_deployment(deployment_id)
104 return CACHED_DEPLOYMENT[deployment_id]
107def get_id() -> Optional[str]: 1a
108 # Check deployment context var first (avoids API calls in nested flows)
109 if deployment_id := _deployment_id.get():
110 return str(deployment_id)
112 flow_run = FlowRunContext.get()
113 deployment_id = getattr(flow_run, "deployment_id", None)
114 if deployment_id is None:
115 run_id = get_flow_run_id()
116 if run_id is None:
117 return None
118 flow_run = from_sync.call_soon_in_loop_thread(
119 create_call(_get_flow_run, run_id)
120 ).result()
121 if flow_run.deployment_id:
122 return str(flow_run.deployment_id)
123 else:
124 return None
125 else:
126 return str(deployment_id)
129def get_parameters() -> dict[str, Any]: 1a
130 # Check deployment context var first (avoids API calls in nested flows)
131 if params := _deployment_parameters.get():
132 return params
134 run_id = get_flow_run_id()
135 if run_id is None:
136 return {}
138 flow_run = from_sync.call_soon_in_loop_thread(
139 create_call(_get_flow_run, run_id)
140 ).result()
141 return flow_run.parameters or {}
144def get_name() -> Optional[str]: 1a
145 dep_id = get_id()
147 if dep_id is None:
148 return None
150 deployment = from_sync.call_soon_in_loop_thread(
151 create_call(_get_deployment, dep_id)
152 ).result()
153 return deployment.name
156def get_version() -> Optional[str]: 1a
157 dep_id = get_id()
159 if dep_id is None:
160 return None
162 deployment = from_sync.call_soon_in_loop_thread(
163 create_call(_get_deployment, dep_id)
164 ).result()
165 return deployment.version
168def get_flow_run_id() -> Optional[str]: 1a
169 flow_run_ctx = FlowRunContext.get()
170 if flow_run_ctx is not None:
171 return str(flow_run_ctx.flow_run.id)
172 return os.getenv("PREFECT__FLOW_RUN_ID")
175FIELDS: dict[str, Callable[[], Any]] = { 1a
176 "id": get_id,
177 "flow_run_id": get_flow_run_id,
178 "parameters": get_parameters,
179 "name": get_name,
180 "version": get_version,
181}