Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/importtools.py: 24%

221 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import ast 1a

2import importlib 1a

3import importlib.util 1a

4import os 1a

5import sys 1a

6import threading 1a

7import warnings 1a

8from collections.abc import Iterable, Sequence 1a

9from importlib.abc import Loader, MetaPathFinder 1a

10from importlib.machinery import ModuleSpec 1a

11from logging import Logger 1a

12from pathlib import Path 1a

13from types import ModuleType 1a

14from typing import TYPE_CHECKING, Any, Callable, NamedTuple, Optional 1a

15 

16from prefect.exceptions import ScriptError 1a

17from prefect.logging.loggers import get_logger 1a

18 

19logger: Logger = get_logger(__name__) 1a

20 

21_sys_path_lock: Optional[threading.Lock] = None 1a

22 

23 

24def _get_sys_path_lock() -> threading.Lock: 1a

25 """Get the global sys.path lock, initializing it if necessary.""" 

26 global _sys_path_lock 

27 if _sys_path_lock is None: 

28 _sys_path_lock = threading.Lock() 

29 return _sys_path_lock 

30 

31 

32def to_qualified_name(obj: Any) -> str: 1a

33 """ 

34 Given an object, returns its fully-qualified name: a string that represents its 

35 Python import path. 

36 

37 Args: 

38 obj (Any): an importable Python object 

39 

40 Returns: 

41 str: the qualified name 

42 """ 

43 return obj.__module__ + "." + obj.__qualname__ 1ab

44 

45 

46def from_qualified_name(name: str) -> Any: 1a

47 """ 

48 Import an object given a fully-qualified name. 

49 

50 Args: 

51 name: The fully-qualified name of the object to import. 

52 

53 Returns: 

54 the imported object 

55 

56 Examples: 

57 ```python 

58 obj = from_qualified_name("random.randint") 

59 import random 

60 obj == random.randint 

61 # True 

62 ``` 

63 """ 

64 # Try importing it first so we support "module" or "module.sub_module" 

65 try: 1ab

66 module = importlib.import_module(name) 1ab

67 return module 1ab

68 except ImportError: 1ab

69 # If no subitem was included raise the import error 

70 if "." not in name: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true1ab

71 raise 

72 

73 # Otherwise, we'll try to load it as an attribute of a module 

74 mod_name, attr_name = name.rsplit(".", 1) 1ab

75 module = importlib.import_module(mod_name) 1ab

76 return getattr(module, attr_name) 1ab

77 

78 

79def load_script_as_module(path: str) -> ModuleType: 1a

80 """Execute a script at the given path. 

81 

82 Sets the module name to a unique identifier to ensure thread safety. 

83 Uses a lock to safely modify sys.path for relative imports. 

84 

85 If an exception occurs during execution of the script, a 

86 `prefect.exceptions.ScriptError` is created to wrap the exception and raised. 

87 """ 

88 if not path.endswith(".py"): 

89 raise ValueError(f"The provided path does not point to a python file: {path!r}") 

90 

91 parent_path = str(Path(path).resolve().parent) 

92 working_directory = os.getcwd() 

93 

94 module_name = os.path.splitext(Path(path).name)[0] 

95 

96 # fall back in case of filenames with the same names as modules 

97 if module_name in sys.modules: 

98 module_name = f"__prefect_loader_{id(path)}__" 

99 

100 spec = importlib.util.spec_from_file_location( 

101 module_name, 

102 path, 

103 submodule_search_locations=[parent_path, working_directory], 

104 ) 

105 if TYPE_CHECKING: 

106 assert spec is not None 

107 assert spec.loader is not None 

108 

109 module = importlib.util.module_from_spec(spec) 

110 sys.modules[module_name] = module 

111 

112 try: 

113 with _get_sys_path_lock(): 

114 sys.path.insert(0, working_directory) 

115 sys.path.insert(0, parent_path) 

116 spec.loader.exec_module(module) 

117 except Exception as exc: 

118 raise ScriptError(user_exc=exc, path=path) from exc 

119 

120 return module 

121 

122 

123def load_module(module_name: str) -> ModuleType: 1a

124 """ 

125 Import a module with support for relative imports within the module. 

126 """ 

127 # Ensure relative imports within the imported module work if the user is in the 

128 # correct working directory 

129 working_directory = os.getcwd() 

130 sys.path.insert(0, working_directory) 

131 

132 try: 

133 return importlib.import_module(module_name) 

134 finally: 

135 sys.path.remove(working_directory) 

136 

137 

138def import_object(import_path: str) -> Any: 1a

139 """ 

140 Load an object from an import path. 

141 

142 Import paths can be formatted as one of: 

143 - module.object 

144 - module:object 

145 - /path/to/script.py:object 

146 - module:object.method 

147 - /path/to/script.py:object.method 

148 

149 This function is not thread safe as it modifies the 'sys' module during execution. 

150 """ 

151 if ".py:" in import_path: 

152 script_path, object_path = import_path.rsplit(":", 1) 

153 module = load_script_as_module(script_path) 

154 else: 

155 if ":" in import_path: 

156 module_name, object_path = import_path.rsplit(":", 1) 

157 elif "." in import_path: 

158 module_name, object_path = import_path.rsplit(".", 1) 

159 else: 

160 raise ValueError( 

161 f"Invalid format for object import. Received {import_path!r}." 

162 ) 

163 

164 module = load_module(module_name) 

165 

166 # Handle nested object/method access 

167 parts = object_path.split(".") 

168 obj = module 

169 for part in parts: 

170 obj = getattr(obj, part) 

171 

172 return obj 

173 

174 

175class DelayedImportErrorModule(ModuleType): 1a

176 """ 

177 A fake module returned by `lazy_import` when the module cannot be found. When any 

178 of the module's attributes are accessed, we will throw a `ModuleNotFoundError`. 

179 

180 Adapted from [lazy_loader][1] 

181 

182 [1]: https://github.com/scientific-python/lazy_loader 

183 """ 

184 

185 def __init__(self, error_message: str, help_message: Optional[str] = None) -> None: 1a

186 self.__error_message = error_message 

187 if not help_message: 

188 help_message = "Import errors for this module are only reported when used." 

189 super().__init__("DelayedImportErrorModule", help_message) 

190 

191 def __getattr__(self, attr: str) -> Any: 1a

192 if attr == "__file__": # not set but should result in an attribute error? 

193 return super().__getattr__(attr) 

194 raise ModuleNotFoundError(self.__error_message) 

195 

196 

197def lazy_import( 1a

198 name: str, error_on_import: bool = False, help_message: Optional[str] = None 

199) -> ModuleType: 

200 """ 

201 Create a lazily-imported module to use in place of the module of the given name. 

202 Use this to retain module-level imports for libraries that we don't want to 

203 actually import until they are needed. 

204 

205 NOTE: Lazy-loading a subpackage can cause the subpackage to be imported 

206 twice if another non-lazy import also imports the subpackage. For example, 

207 using both `lazy_import("docker.errors")` and `import docker.errors` in the 

208 same codebase will import `docker.errors` twice and can lead to unexpected 

209 behavior, e.g. type check failures and import-time side effects running 

210 twice. 

211 

212 Adapted from the [Python documentation][1] and [lazy_loader][2] 

213 

214 [1]: https://docs.python.org/3/library/importlib.html#implementing-lazy-imports 

215 [2]: https://github.com/scientific-python/lazy_loader 

216 """ 

217 

218 try: 1a

219 return sys.modules[name] 1a

220 except KeyError: 1a

221 pass 1a

222 

223 if "." in name: 223 ↛ 224line 223 didn't jump to line 224 because the condition on line 223 was never true1a

224 warnings.warn( 

225 "Lazy importing subpackages can lead to unexpected behavior.", 

226 RuntimeWarning, 

227 ) 

228 

229 spec = importlib.util.find_spec(name) 1a

230 

231 if spec is None: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true1a

232 import_error_message = f"No module named '{name}'.\n{help_message}" 

233 

234 if error_on_import: 

235 raise ModuleNotFoundError(import_error_message) 

236 

237 return DelayedImportErrorModule(import_error_message, help_message) 

238 

239 module = importlib.util.module_from_spec(spec) 1a

240 sys.modules[name] = module 1a

241 

242 if TYPE_CHECKING: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true1a

243 assert spec.loader is not None 

244 loader = importlib.util.LazyLoader(spec.loader) 1a

245 loader.exec_module(module) 1a

246 

247 return module 1a

248 

249 

250class AliasedModuleDefinition(NamedTuple): 1a

251 """ 

252 A definition for the `AliasedModuleFinder`. 

253 

254 Args: 

255 alias: The import name to create 

256 real: The import name of the module to reference for the alias 

257 callback: A function to call when the alias module is loaded 

258 """ 

259 

260 alias: str 1a

261 real: str 1a

262 callback: Optional[Callable[[str], None]] 1a

263 

264 

265class AliasedModuleFinder(MetaPathFinder): 1a

266 def __init__(self, aliases: Iterable[AliasedModuleDefinition]): 1a

267 """ 

268 See `AliasedModuleDefinition` for alias specification. 

269 

270 Aliases apply to all modules nested within an alias. 

271 """ 

272 self.aliases: list[AliasedModuleDefinition] = list(aliases) 1a

273 

274 def find_spec( 1a

275 self, 

276 fullname: str, 

277 path: Optional[Sequence[str]] = None, 

278 target: Optional[ModuleType] = None, 

279 ) -> Optional[ModuleSpec]: 

280 """ 

281 The fullname is the imported path, e.g. "foo.bar". If there is an alias "phi" 

282 for "foo" then on import of "phi.bar" we will find the spec for "foo.bar" and 

283 create a new spec for "phi.bar" that points to "foo.bar". 

284 """ 

285 for alias, real, callback in self.aliases: 285 ↛ 286line 285 didn't jump to line 286 because the loop on line 285 never started1acb

286 if fullname.startswith(alias): 

287 # Retrieve the spec of the real module 

288 real_spec = importlib.util.find_spec(fullname.replace(alias, real, 1)) 

289 assert real_spec is not None 

290 # Create a new spec for the alias 

291 return ModuleSpec( 

292 fullname, 

293 AliasedModuleLoader(fullname, callback, real_spec), 

294 origin=real_spec.origin, 

295 is_package=real_spec.submodule_search_locations is not None, 

296 ) 

297 

298 

299class AliasedModuleLoader(Loader): 1a

300 def __init__( 1a

301 self, 

302 alias: str, 

303 callback: Optional[Callable[[str], None]], 

304 real_spec: ModuleSpec, 

305 ): 

306 self.alias = alias 

307 self.callback = callback 

308 self.real_spec = real_spec 

309 

310 def exec_module(self, module: ModuleType) -> None: 1a

311 root_module = importlib.import_module(self.real_spec.name) 

312 if self.callback is not None: 

313 self.callback(self.alias) 

314 sys.modules[self.alias] = root_module 

315 

316 

317def safe_load_namespace( 1a

318 source_code: str, filepath: Optional[str] = None 

319) -> dict[str, Any]: 

320 """ 

321 Safely load a namespace from source code, optionally handling relative imports. 

322 

323 If a `filepath` is provided, `sys.path` is modified to support relative imports. 

324 Changes to `sys.path` are reverted after completion, but this function is not thread safe 

325 and use of it in threaded contexts may result in undesirable behavior. 

326 

327 Args: 

328 source_code: The source code to load 

329 filepath: Optional file path of the source code. If provided, enables relative imports. 

330 

331 Returns: 

332 The namespace loaded from the source code. 

333 """ 

334 parsed_code = ast.parse(source_code) 

335 

336 namespace: dict[str, Any] = {"__name__": "prefect_safe_namespace_loader"} 

337 

338 # Remove the body of the if __name__ == "__main__": block 

339 new_body = [node for node in parsed_code.body if not _is_main_block(node)] 

340 parsed_code.body = new_body 

341 

342 temp_module = None 

343 original_sys_path = None 

344 

345 if filepath: 

346 # Setup for relative imports 

347 file_dir = os.path.dirname(os.path.abspath(filepath)) 

348 package_name = os.path.basename(file_dir) 

349 parent_dir = os.path.dirname(file_dir) 

350 

351 # Save original sys.path and modify it 

352 original_sys_path = sys.path.copy() 

353 sys.path.insert(0, parent_dir) 

354 sys.path.insert(0, file_dir) 

355 

356 # Create a temporary module for import context 

357 temp_module = ModuleType(package_name) 

358 temp_module.__file__ = filepath 

359 temp_module.__package__ = package_name 

360 

361 # Create a spec for the module 

362 temp_module.__spec__ = ModuleSpec(package_name, None) 

363 temp_module.__spec__.loader = None 

364 temp_module.__spec__.submodule_search_locations = [file_dir] 

365 

366 try: 

367 for node in parsed_code.body: 

368 if isinstance(node, ast.Import): 

369 for alias in node.names: 

370 module_name = alias.name 

371 as_name = alias.asname or module_name 

372 try: 

373 namespace[as_name] = importlib.import_module(module_name) 

374 logger.debug("Successfully imported %s", module_name) 

375 except ImportError as e: 

376 logger.debug(f"Failed to import {module_name}: {e}") 

377 elif isinstance(node, ast.ImportFrom): 

378 module_name = node.module or "" 

379 if filepath: 

380 try: 

381 if node.level > 0: 

382 # For relative imports, use the parent package to inform the import 

383 if TYPE_CHECKING: 

384 assert temp_module is not None 

385 assert temp_module.__package__ is not None 

386 package_parts = temp_module.__package__.split(".") 

387 if len(package_parts) < node.level: 

388 raise ImportError( 

389 "Attempted relative import beyond top-level package" 

390 ) 

391 parent_package = ".".join( 

392 package_parts[: (1 - node.level)] 

393 if node.level > 1 

394 else package_parts 

395 ) 

396 module = importlib.import_module( 

397 f".{module_name}" if module_name else "", 

398 package=parent_package, 

399 ) 

400 else: 

401 # Absolute imports are handled as normal 

402 module = importlib.import_module(module_name) 

403 

404 for alias in node.names: 

405 name = alias.name 

406 asname = alias.asname or name 

407 if name == "*": 

408 # Handle 'from module import *' 

409 module_dict = { 

410 k: v 

411 for k, v in module.__dict__.items() 

412 if not k.startswith("_") 

413 } 

414 namespace.update(module_dict) 

415 else: 

416 try: 

417 attribute = getattr(module, name) 

418 namespace[asname] = attribute 

419 except AttributeError as e: 

420 logger.debug( 

421 "Failed to retrieve %s from %s: %s", 

422 name, 

423 module_name, 

424 e, 

425 ) 

426 except ImportError as e: 

427 logger.debug("Failed to import from %s: %s", module_name, e) 

428 else: 

429 # Handle as absolute import when no filepath is provided 

430 try: 

431 module = importlib.import_module(module_name) 

432 for alias in node.names: 

433 name = alias.name 

434 asname = alias.asname or name 

435 if name == "*": 

436 # Handle 'from module import *' 

437 module_dict = { 

438 k: v 

439 for k, v in module.__dict__.items() 

440 if not k.startswith("_") 

441 } 

442 namespace.update(module_dict) 

443 else: 

444 try: 

445 attribute = getattr(module, name) 

446 namespace[asname] = attribute 

447 except AttributeError as e: 

448 logger.debug( 

449 "Failed to retrieve %s from %s: %s", 

450 name, 

451 module_name, 

452 e, 

453 ) 

454 except ImportError as e: 

455 logger.debug("Failed to import from %s: %s", module_name, e) 

456 # Handle local definitions 

457 for node in parsed_code.body: 

458 if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.Assign)): 

459 try: 

460 code = compile( 

461 ast.Module(body=[node], type_ignores=[]), 

462 filename="<ast>", 

463 mode="exec", 

464 ) 

465 exec(code, namespace) 

466 except Exception as e: 

467 logger.debug("Failed to compile: %s", e) 

468 

469 finally: 

470 # Restore original sys.path if it was modified 

471 if original_sys_path: 

472 sys.path[:] = original_sys_path 

473 

474 return namespace 

475 

476 

477def _is_main_block(node: ast.AST): 1a

478 """ 

479 Check if the node is an `if __name__ == "__main__":` block. 

480 """ 

481 if isinstance(node, ast.If): 

482 try: 

483 # Check if the condition is `if __name__ == "__main__":` 

484 if ( 

485 isinstance(node.test, ast.Compare) 

486 and isinstance(node.test.left, ast.Name) 

487 and node.test.left.id == "__name__" 

488 and isinstance(node.test.comparators[0], ast.Constant) 

489 and node.test.comparators[0].value == "__main__" 

490 ): 

491 return True 

492 except AttributeError: 

493 pass 

494 return False