Coverage for /usr/local/lib/python3.12/site-packages/prefect/settings/sources.py: 70%

165 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import os 1a

2import sys 1a

3import warnings 1a

4from pathlib import Path 1a

5from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Type 1a

6 

7import dotenv 1a

8import toml 1a

9from cachetools import TTLCache 1a

10from pydantic import AliasChoices 1a

11from pydantic.fields import FieldInfo 1a

12from pydantic_settings import ( 1a

13 BaseSettings, 

14 DotEnvSettingsSource, 

15 EnvSettingsSource, 

16 PydanticBaseSettingsSource, 

17) 

18from pydantic_settings.sources import ( 1a

19 ENV_FILE_SENTINEL, 

20 ConfigFileSourceMixin, 

21 DotenvType, 

22) 

23 

24from prefect.settings.constants import DEFAULT_PREFECT_HOME, DEFAULT_PROFILES_PATH 1a

25from prefect.utilities.collections import get_from_dict 1a

26 

27_file_cache: TTLCache[str, dict[str, Any]] = TTLCache(maxsize=100, ttl=60) 1a

28 

29 

30def _read_toml_file(path: Path) -> dict[str, Any]: 1a

31 """use ttl cache to cache toml files""" 

32 modified_time = path.stat().st_mtime 1a

33 cache_key = f"toml_file:{path}:{modified_time}" 1a

34 if value := _file_cache.get(cache_key): 1a

35 return value 1a

36 data = toml.load(path) # type: ignore 1a

37 _file_cache[cache_key] = data 1a

38 return data 1a

39 

40 

41class EnvFilterSettingsSource(EnvSettingsSource): 1a

42 """ 

43 Custom pydantic settings source to filter out specific environment variables. 

44 

45 All validation aliases are loaded from environment variables by default. We use 

46 `AliasPath` to maintain the ability set fields via model initialization, but those 

47 shouldn't be loaded from environment variables. This loader allows use to say which 

48 environment variables should be ignored. 

49 """ 

50 

51 def __init__( 1a

52 self, 

53 settings_cls: type[BaseSettings], 

54 case_sensitive: Optional[bool] = None, 

55 env_prefix: Optional[str] = None, 

56 env_nested_delimiter: Optional[str] = None, 

57 env_ignore_empty: Optional[bool] = None, 

58 env_parse_none_str: Optional[str] = None, 

59 env_parse_enums: Optional[bool] = None, 

60 env_filter: Optional[List[str]] = None, 

61 ) -> None: 

62 super().__init__( 1a

63 settings_cls, 

64 case_sensitive, 

65 env_prefix, 

66 env_nested_delimiter, 

67 env_ignore_empty, 

68 env_parse_none_str, 

69 env_parse_enums, 

70 ) 

71 self.env_vars: Mapping[str, str | None] 1a

72 if env_filter: 72 ↛ exitline 72 didn't return from function '__init__' because the condition on line 72 was always true1a

73 if isinstance(self.env_vars, dict): 73 ↛ 77line 73 didn't jump to line 77 because the condition on line 73 was always true1a

74 for key in env_filter: 1a

75 self.env_vars.pop(key, None) 1a

76 else: 

77 self.env_vars = { 

78 key: value 

79 for key, value in self.env_vars.items() # type: ignore 

80 if key.lower() not in env_filter 

81 } 

82 

83 

84class FilteredDotEnvSettingsSource(DotEnvSettingsSource): 1a

85 def __init__( 1a

86 self, 

87 settings_cls: type[BaseSettings], 

88 env_file: Optional[DotenvType] = ENV_FILE_SENTINEL, 

89 env_file_encoding: Optional[str] = None, 

90 case_sensitive: Optional[bool] = None, 

91 env_prefix: Optional[str] = None, 

92 env_nested_delimiter: Optional[str] = None, 

93 env_ignore_empty: Optional[bool] = None, 

94 env_parse_none_str: Optional[str] = None, 

95 env_parse_enums: Optional[bool] = None, 

96 env_blacklist: Optional[List[str]] = None, 

97 ) -> None: 

98 super().__init__( 1a

99 settings_cls, 

100 env_file, 

101 env_file_encoding, 

102 case_sensitive, 

103 env_prefix, 

104 env_nested_delimiter, 

105 env_ignore_empty, 

106 env_parse_none_str, 

107 env_parse_enums, 

108 ) 

109 self.env_blacklist = env_blacklist 1a

110 if self.env_blacklist: 110 ↛ exitline 110 didn't return from function '__init__' because the condition on line 110 was always true1a

111 if isinstance(self.env_vars, dict): 111 ↛ 115line 111 didn't jump to line 115 because the condition on line 111 was always true1a

112 for key in self.env_blacklist: 1a

113 self.env_vars.pop(key, None) 1a

114 else: 

115 self.env_vars: dict[str, str | None] = { 

116 key: value 

117 for key, value in self.env_vars.items() # type: ignore 

118 if key.lower() not in env_blacklist 

119 } 

120 

121 

122class ProfileSettingsTomlLoader(PydanticBaseSettingsSource): 1a

123 """ 

124 Custom pydantic settings source to load profile settings from a toml file. 

125 

126 See https://docs.pydantic.dev/latest/concepts/pydantic_settings/#customise-settings-sources 

127 """ 

128 

129 def __init__(self, settings_cls: Type[BaseSettings]): 1a

130 super().__init__(settings_cls) 1a

131 self.settings_cls = settings_cls 1a

132 self.profiles_path: Path = _get_profiles_path() 1a

133 self.profile_settings: dict[str, Any] = self._load_profile_settings() 1a

134 

135 def _load_profile_settings(self) -> Dict[str, Any]: 1a

136 """Helper method to load the profile settings from the profiles.toml file""" 

137 if not self.profiles_path.exists(): 137 ↛ 140line 137 didn't jump to line 140 because the condition on line 137 was always true1a

138 return self._get_default_profile() 1a

139 

140 try: 

141 all_profile_data = _read_toml_file(self.profiles_path) 

142 except toml.TomlDecodeError: 

143 warnings.warn( 

144 f"Failed to load profiles from {self.profiles_path}. Please ensure the file is valid TOML." 

145 ) 

146 return {} 

147 

148 if ( 

149 sys.argv[0].endswith("/prefect") 

150 and len(sys.argv) >= 3 

151 and sys.argv[1] == "--profile" 

152 ): 

153 active_profile = sys.argv[2] 

154 

155 else: 

156 active_profile = os.environ.get("PREFECT_PROFILE") or all_profile_data.get( 

157 "active" 

158 ) 

159 

160 profiles_data = all_profile_data.get("profiles", {}) 

161 

162 if not active_profile or active_profile not in profiles_data: 

163 return self._get_default_profile() 

164 return profiles_data[active_profile] 

165 

166 def _get_default_profile(self) -> Dict[str, Any]: 1a

167 """Helper method to get the default profile""" 

168 default_profile_data = _read_toml_file(DEFAULT_PROFILES_PATH) 1a

169 default_profile = default_profile_data.get("active", "ephemeral") 1a

170 assert isinstance(default_profile, str) 1a

171 return default_profile_data.get("profiles", {}).get(default_profile, {}) 1a

172 

173 def get_field_value( 1a

174 self, field: FieldInfo, field_name: str 

175 ) -> Tuple[Any, str, bool]: 

176 """Concrete implementation to get the field value from the profile settings""" 

177 if field.validation_alias: 1a

178 # Use validation alias as the key to ensure profile value does not 

179 # higher priority sources. Lower priority sources that use the 

180 # field name can override higher priority sources that use the 

181 # validation alias as seen in https://github.com/PrefectHQ/prefect/issues/15981 

182 if isinstance(field.validation_alias, str): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true1a

183 value = self.profile_settings.get(field.validation_alias.upper()) 

184 if value is not None: 

185 return value, field.validation_alias, self.field_is_complex(field) 

186 elif isinstance(field.validation_alias, AliasChoices): 186 ↛ 204line 186 didn't jump to line 204 because the condition on line 186 was always true1a

187 value = None 1a

188 lowest_priority_alias = next( 1a

189 choice 

190 for choice in reversed(field.validation_alias.choices) 

191 if isinstance(choice, str) 

192 ) 

193 for alias in field.validation_alias.choices: 1a

194 if not isinstance(alias, str): 1a

195 continue 1a

196 value = self.profile_settings.get(alias.upper()) 1a

197 if value is not None: 1a

198 return ( 1a

199 value, 

200 lowest_priority_alias, 

201 self.field_is_complex(field), 

202 ) 

203 

204 name = f"{self.config.get('env_prefix', '')}{field_name.upper()}" 1a

205 value = self.profile_settings.get(name) 1a

206 return value, field_name, self.field_is_complex(field) 1a

207 

208 def __call__(self) -> Dict[str, Any]: 1a

209 """Called by pydantic to get the settings from our custom source""" 

210 if _is_test_mode(): 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true1a

211 return {} 

212 profile_settings: Dict[str, Any] = {} 1a

213 for field_name, field in self.settings_cls.model_fields.items(): 1a

214 value, key, is_complex = self.get_field_value(field, field_name) 1a

215 if value is not None: 1a

216 prepared_value = self.prepare_field_value( 1a

217 field_name, field, value, is_complex 

218 ) 

219 profile_settings[key] = prepared_value 1a

220 return profile_settings 1a

221 

222 

223DEFAULT_PREFECT_TOML_PATH = Path("prefect.toml") 1a

224 

225 

226class TomlConfigSettingsSourceBase(PydanticBaseSettingsSource, ConfigFileSourceMixin): 1a

227 def __init__(self, settings_cls: Type[BaseSettings]): 1a

228 super().__init__(settings_cls) 1a

229 self.settings_cls = settings_cls 1a

230 self.toml_data: dict[str, Any] = {} 1a

231 

232 def _read_file(self, path: Path) -> dict[str, Any]: 1a

233 return _read_toml_file(path) 

234 

235 def get_field_value( 1a

236 self, field: FieldInfo, field_name: str 

237 ) -> tuple[Any, str, bool]: 

238 """Concrete implementation to get the field value from toml data""" 

239 value = self.toml_data.get(field_name) 1a

240 if isinstance(value, dict): 240 ↛ 243line 240 didn't jump to line 243 because the condition on line 240 was never true1a

241 # if the value is a dict, it is likely a nested settings object and a nested 

242 # source will handle it 

243 value = None 

244 name = field_name 1a

245 # Use validation alias as the key to ensure profile value does not 

246 # higher priority sources. Lower priority sources that use the 

247 # field name can override higher priority sources that use the 

248 # validation alias as seen in https://github.com/PrefectHQ/prefect/issues/15981 

249 if value is not None: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true1a

250 if field.validation_alias and isinstance(field.validation_alias, str): 

251 name = field.validation_alias 

252 elif field.validation_alias and isinstance( 

253 field.validation_alias, AliasChoices 

254 ): 

255 for alias in reversed(field.validation_alias.choices): 

256 if isinstance(alias, str): 

257 name = alias 

258 break 

259 return value, name, self.field_is_complex(field) 1a

260 

261 def __call__(self) -> dict[str, Any]: 1a

262 """Called by pydantic to get the settings from our custom source""" 

263 toml_setings: dict[str, Any] = {} 1a

264 for field_name, field in self.settings_cls.model_fields.items(): 1a

265 value, key, is_complex = self.get_field_value(field, field_name) 1a

266 if value is not None: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true1a

267 prepared_value = self.prepare_field_value( 

268 field_name, field, value, is_complex 

269 ) 

270 toml_setings[key] = prepared_value 

271 return toml_setings 1a

272 

273 

274class PrefectTomlConfigSettingsSource(TomlConfigSettingsSourceBase): 1a

275 """Custom pydantic settings source to load settings from a prefect.toml file""" 

276 

277 def __init__( 1a

278 self, 

279 settings_cls: Type[BaseSettings], 

280 ): 

281 super().__init__(settings_cls) 1a

282 self.toml_file_path: Path | str | Sequence[Path | str] | None = ( 1a

283 settings_cls.model_config.get("toml_file", DEFAULT_PREFECT_TOML_PATH) 

284 ) 

285 self.toml_data: dict[str, Any] = self._read_files(self.toml_file_path) 1a

286 self.toml_table_header: tuple[str, ...] = settings_cls.model_config.get( 1a

287 "prefect_toml_table_header", tuple() 

288 ) 

289 for key in self.toml_table_header: 1a

290 self.toml_data: dict[str, Any] = self.toml_data.get(key, {}) 1a

291 

292 

293class PyprojectTomlConfigSettingsSource(TomlConfigSettingsSourceBase): 1a

294 """Custom pydantic settings source to load settings from a pyproject.toml file""" 

295 

296 def __init__( 1a

297 self, 

298 settings_cls: Type[BaseSettings], 

299 ): 

300 super().__init__(settings_cls) 1a

301 self.toml_file_path: Path = Path("pyproject.toml") 1a

302 self.toml_data: dict[str, Any] = self._read_files(self.toml_file_path) 1a

303 self.toml_table_header: tuple[str, ...] = settings_cls.model_config.get( 1a

304 "pyproject_toml_table_header", ("tool", "prefect") 

305 ) 

306 for key in self.toml_table_header: 1a

307 self.toml_data: dict[str, Any] = self.toml_data.get(key, {}) 1a

308 

309 

310def _is_test_mode() -> bool: 1a

311 """Check if the current process is in test mode.""" 

312 return bool( 1a

313 os.getenv("PREFECT_TEST_MODE") 

314 or os.getenv("PREFECT_UNIT_TEST_MODE") 

315 or os.getenv("PREFECT_TESTING_UNIT_TEST_MODE") 

316 or os.getenv("PREFECT_TESTING_TEST_MODE") 

317 ) 

318 

319 

320def _get_profiles_path() -> Path: 1a

321 """Helper to get the profiles path""" 

322 

323 if _is_test_mode(): 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true1a

324 return DEFAULT_PROFILES_PATH 

325 if env_path := os.getenv("PREFECT_PROFILES_PATH"): 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true1a

326 return Path(env_path) 

327 if dotenv_path := dotenv.dotenv_values(".env").get("PREFECT_PROFILES_PATH"): 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true1a

328 return Path(dotenv_path) 

329 if toml_path := _get_profiles_path_from_toml("prefect.toml", ["profiles_path"]): 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true1a

330 return Path(toml_path) 

331 if pyproject_path := _get_profiles_path_from_toml( 331 ↛ 334line 331 didn't jump to line 334 because the condition on line 331 was never true1a

332 "pyproject.toml", ["tool", "prefect", "profiles_path"] 

333 ): 

334 return Path(pyproject_path) 

335 

336 if os.environ.get("PREFECT_HOME"): 336 ↛ 339line 336 didn't jump to line 339 because the condition on line 336 was always true1a

337 return Path(os.environ["PREFECT_HOME"]) / "profiles.toml" 1a

338 

339 if not (DEFAULT_PREFECT_HOME / "profiles.toml").exists(): 

340 return DEFAULT_PROFILES_PATH 

341 return DEFAULT_PREFECT_HOME / "profiles.toml" 

342 

343 

344def _get_profiles_path_from_toml(path: str, keys: List[str]) -> Optional[str]: 1a

345 """Helper to get the profiles path from a toml file.""" 

346 

347 try: 1a

348 toml_data = _read_toml_file(Path(path)) 1a

349 except FileNotFoundError: 1a

350 return None 1a

351 

352 return get_from_dict(toml_data, keys)