Coverage for /usr/local/lib/python3.12/site-packages/prefect/runtime/deployment.py: 20%

76 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Access attributes of the current deployment run dynamically. 

3 

4Note that if a deployment is not currently being run, all attributes will return empty values. 

5 

6You can mock the runtime attributes for testing purposes by setting environment variables 

7prefixed with `PREFECT__RUNTIME__DEPLOYMENT`. 

8 

9Example usage: 

10 ```python 

11 from prefect.runtime import deployment 

12 

13 def get_task_runner(): 

14 task_runner_config = deployment.parameters.get("runner_config", "default config here") 

15 return DummyTaskRunner(task_runner_specs=task_runner_config) 

16 ``` 

17 

18Available attributes: 

19 - `id`: the deployment's unique ID 

20 - `name`: the deployment's name 

21 - `version`: the deployment's version 

22 - `flow_run_id`: the current flow run ID for this deployment 

23 - `parameters`: the parameters that were passed to this run; note that these do not necessarily 

24 include default values set on the flow function, only the parameter values set on the deployment 

25 object or those directly provided via API for this run 

26""" 

27 

28from __future__ import annotations 1a

29 

30import os 1a

31from typing import TYPE_CHECKING, Any, Callable, List, Optional 1a

32 

33from prefect._internal.concurrency.api import create_call, from_sync 1a

34from prefect.client.orchestration import get_client 1a

35from prefect.context import FlowRunContext, _deployment_id, _deployment_parameters 1a

36 

37from .flow_run import _get_flow_run 1a

38 

39if TYPE_CHECKING: 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true1a

40 from prefect.client.schemas.responses import DeploymentResponse 

41 

42__all__ = ["id", "flow_run_id", "name", "parameters", "version"] 1a

43 

44CACHED_DEPLOYMENT: dict[str, "DeploymentResponse"] = {} 1a

45 

46 

47type_cast: dict[ 1a

48 type[bool] | type[int] | type[float] | type[str] | type[None], Callable[[Any], Any] 

49] = { 

50 bool: lambda x: x.lower() == "true", 

51 int: int, 

52 float: float, 

53 str: str, 

54 # for optional defined attributes, when real value is NoneType, use str 

55 type(None): str, 

56} 

57 

58 

59def __getattr__(name: str) -> Any: 1a

60 """ 

61 Attribute accessor for this submodule; note that imports also work with this: 

62 

63 from prefect.runtime.flow_run import id 

64 """ 

65 

66 func = FIELDS.get(name) 

67 

68 # if `name` is an attribute but it is mocked through environment variable, the mocked type will be str, 

69 # which might be different from original one. For consistency, cast env var to the same type 

70 env_key = f"PREFECT__RUNTIME__DEPLOYMENT__{name.upper()}" 

71 

72 if func is None: 

73 if env_key in os.environ: 

74 return os.environ[env_key] 

75 else: 

76 raise AttributeError(f"{__name__} has no attribute {name!r}") 

77 

78 real_value = func() 

79 if env_key in os.environ: 

80 mocked_value = os.environ[env_key] 

81 # cast `mocked_value` to the same type as `real_value` 

82 try: 

83 cast_func = type_cast[type(real_value)] 

84 return cast_func(mocked_value) 

85 except KeyError: 

86 raise ValueError( 

87 "This runtime context attribute cannot be mocked using an" 

88 " environment variable. Please use monkeypatch instead." 

89 ) 

90 else: 

91 return real_value 

92 

93 

94def __dir__() -> List[str]: 1a

95 return sorted(__all__) 

96 

97 

98async def _get_deployment(deployment_id: str) -> "DeploymentResponse": 1a

99 # deployments won't change between calls so let's avoid the lifecycle of a client 

100 if CACHED_DEPLOYMENT.get(deployment_id): 

101 return CACHED_DEPLOYMENT[deployment_id] 

102 async with get_client() as client: 

103 CACHED_DEPLOYMENT[deployment_id] = await client.read_deployment(deployment_id) 

104 return CACHED_DEPLOYMENT[deployment_id] 

105 

106 

107def get_id() -> Optional[str]: 1a

108 # Check deployment context var first (avoids API calls in nested flows) 

109 if deployment_id := _deployment_id.get(): 

110 return str(deployment_id) 

111 

112 flow_run = FlowRunContext.get() 

113 deployment_id = getattr(flow_run, "deployment_id", None) 

114 if deployment_id is None: 

115 run_id = get_flow_run_id() 

116 if run_id is None: 

117 return None 

118 flow_run = from_sync.call_soon_in_loop_thread( 

119 create_call(_get_flow_run, run_id) 

120 ).result() 

121 if flow_run.deployment_id: 

122 return str(flow_run.deployment_id) 

123 else: 

124 return None 

125 else: 

126 return str(deployment_id) 

127 

128 

129def get_parameters() -> dict[str, Any]: 1a

130 # Check deployment context var first (avoids API calls in nested flows) 

131 if params := _deployment_parameters.get(): 

132 return params 

133 

134 run_id = get_flow_run_id() 

135 if run_id is None: 

136 return {} 

137 

138 flow_run = from_sync.call_soon_in_loop_thread( 

139 create_call(_get_flow_run, run_id) 

140 ).result() 

141 return flow_run.parameters or {} 

142 

143 

144def get_name() -> Optional[str]: 1a

145 dep_id = get_id() 

146 

147 if dep_id is None: 

148 return None 

149 

150 deployment = from_sync.call_soon_in_loop_thread( 

151 create_call(_get_deployment, dep_id) 

152 ).result() 

153 return deployment.name 

154 

155 

156def get_version() -> Optional[str]: 1a

157 dep_id = get_id() 

158 

159 if dep_id is None: 

160 return None 

161 

162 deployment = from_sync.call_soon_in_loop_thread( 

163 create_call(_get_deployment, dep_id) 

164 ).result() 

165 return deployment.version 

166 

167 

168def get_flow_run_id() -> Optional[str]: 1a

169 flow_run_ctx = FlowRunContext.get() 

170 if flow_run_ctx is not None: 

171 return str(flow_run_ctx.flow_run.id) 

172 return os.getenv("PREFECT__FLOW_RUN_ID") 

173 

174 

175FIELDS: dict[str, Callable[[], Any]] = { 1a

176 "id": get_id, 

177 "flow_run_id": get_flow_run_id, 

178 "parameters": get_parameters, 

179 "name": get_name, 

180 "version": get_version, 

181}