Coverage for /usr/local/lib/python3.12/site-packages/prefect/_experimental/plugins/__init__.py: 13%

57 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Experimental plugin system for Prefect. 

3 

4This module provides a startup hook system that allows third-party packages 

5to run initialization code before Prefect CLI commands, workers, or agents 

6start their main work. 

7 

8**Note: This is an experimental API and is subject to change.** 

9""" 

10 

11from __future__ import annotations 1a

12from typing import Any 1a

13 

14import anyio 1a

15 

16from prefect._experimental.plugins.apply import apply_setup_result, summarize_env 1a

17from prefect._experimental.plugins.diagnostics import SetupSummary 1a

18from prefect._experimental.plugins.manager import ( 1a

19 build_manager, 

20 call_async_hook, 

21 load_entry_point_plugins, 

22 register_hook, 

23) 

24from prefect._experimental.plugins.spec import ( 1a

25 PREFECT_PLUGIN_API_VERSION, 

26 HookContext, 

27 HookSpec, 

28 SetupResult, 

29) 

30from prefect.settings import get_current_settings 1a

31 

32__all__ = [ 1a

33 "run_startup_hooks", 

34 "HookContext", 

35 "SetupResult", 

36 "HookSpec", 

37 "SetupSummary", 

38 "PREFECT_PLUGIN_API_VERSION", 

39 "register_hook", 

40] 

41 

42 

43async def run_startup_hooks(ctx: HookContext) -> list[SetupSummary]: 1a

44 """ 

45 Run all registered plugin startup hooks. 

46 

47 This is the main entry point for the plugin system. It: 

48 1. Checks if plugins are enabled via configuration 

49 2. Discovers and loads plugins from entry points 

50 3. Calls setup_environment hooks (respecting timeouts) 

51 4. Applies environment changes from successful hooks 

52 5. Returns diagnostic summaries 

53 

54 Args: 

55 ctx: Context object with Prefect version, API URL, and logger factory 

56 

57 Returns: 

58 List of SetupSummary objects describing what each plugin did 

59 

60 Raises: 

61 SystemExit: In strict mode, if a required plugin fails 

62 """ 

63 logger = ctx.logger_factory("prefect.plugins") 

64 settings = get_current_settings().experiments.plugins 

65 

66 if not settings.enabled: 

67 logger.debug("Experimental plugins not enabled") 

68 return [] 

69 

70 logger.debug("Initializing experimental plugin system") 

71 pm = build_manager(HookSpec) 

72 allow = settings.allow 

73 deny = settings.deny 

74 load_entry_point_plugins(pm, allow=allow, deny=deny, logger=logger) 

75 

76 summaries: list[SetupSummary] = [] 

77 

78 if settings.safe_mode: 

79 logger.info("Safe mode enabled - plugins loaded but hooks not called") 

80 return summaries 

81 

82 # Call all hooks with timeout 

83 timeout = settings.setup_timeout_seconds 

84 results: list[tuple[str, Any, Exception | None]] = [] 

85 

86 try: 

87 with anyio.move_on_after(timeout) as cancel_scope: 

88 results = await call_async_hook(pm, "setup_environment", ctx=ctx) 

89 

90 if cancel_scope.cancel_called: 

91 logger.warning("Plugin setup timed out after %.1fs", timeout) 

92 except Exception as e: 

93 logger.exception("Unexpected error during plugin setup: %s", e) 

94 

95 # Process results 

96 for name, res, err in results: 

97 if err: 

98 logger.error("Plugin %s failed: %s", name, err) 

99 summaries.append( 

100 SetupSummary(plugin=name, env_preview={}, note=None, error=str(err)) 

101 ) 

102 continue 

103 

104 if res is None: 

105 logger.debug("Plugin %s returned no changes", name) 

106 summaries.append( 

107 SetupSummary(plugin=name, env_preview={}, note=None, error=None) 

108 ) 

109 continue 

110 

111 try: 

112 apply_setup_result(res, logger) 

113 summaries.append( 

114 SetupSummary( 

115 name, 

116 summarize_env(dict(res.env)), 

117 res.note, 

118 error=None, 

119 ) 

120 ) 

121 logger.debug("Plugin %s completed successfully", name) 

122 except Exception as e: 

123 logger.exception("Failed to apply result from plugin %s", name) 

124 summaries.append(SetupSummary(name, {}, None, None, error=str(e))) 

125 

126 # Strict failure policy 

127 if settings.strict: 

128 for name, res, err in results: 

129 if err: 

130 raise SystemExit(f"[plugins] required plugin '{name}' failed: {err}") 

131 if res and getattr(res, "required", False) and not res.env: 

132 raise SystemExit( 

133 f"[plugins] required plugin '{name}' returned SetupResult with " 

134 f"required=True but empty env. If no changes are needed, return " 

135 f"None instead of SetupResult." 

136 ) 

137 

138 logger.info("Plugin system initialization complete (%d plugins)", len(summaries)) 

139 return summaries