Coverage for /usr/local/lib/python3.12/site-packages/prefect/_experimental/plugins/manager.py: 22%
54 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Plugin manager using pluggy with async bridge.
3"""
5from __future__ import annotations 1a
7import importlib.metadata as md 1a
8import inspect 1a
9import logging 1a
10import sys 1a
11from typing import Any 1a
13import pluggy 1a
14from packaging.specifiers import InvalidSpecifier, SpecifierSet 1a
16from prefect._experimental.plugins.spec import PREFECT_PLUGIN_API_VERSION 1a
18PM_PROJECT_NAME = "prefect-experimental" 1a
19ENTRYPOINTS_GROUP = "prefect.plugins" 1a
21register_hook = pluggy.HookimplMarker(PM_PROJECT_NAME) 1a
24def build_manager(hookspecs: type) -> pluggy.PluginManager: 1a
25 """
26 Create a pluggy PluginManager and register hook specifications.
28 Args:
29 hookspecs: The hook specification class/protocol
31 Returns:
32 Configured PluginManager instance
33 """
34 pm = pluggy.PluginManager(PM_PROJECT_NAME)
35 pm.add_hookspecs(hookspecs)
36 return pm
39def load_entry_point_plugins( 1a
40 pm: pluggy.PluginManager,
41 *,
42 allow: set[str] | None,
43 deny: set[str] | None,
44 logger: logging.Logger,
45) -> None:
46 """
47 Discover and load plugins from entry points.
49 Args:
50 pm: The PluginManager to register plugins with
51 allow: If set, only load plugins with names in this set
52 deny: If set, skip plugins with names in this set
53 logger: Logger for reporting load failures
54 """
55 # Python 3.10+ supports group parameter, 3.9 requires dict access
56 if sys.version_info >= (3, 10):
57 entry_points_list = md.entry_points(group=ENTRYPOINTS_GROUP)
58 else:
59 # Python 3.9 returns a dict-like object
60 entry_points_list = md.entry_points().get(ENTRYPOINTS_GROUP, [])
62 for ep in entry_points_list:
63 if allow and ep.name not in allow:
64 logger.debug("Skipping plugin %s (not in allow list)", ep.name)
65 continue
66 if deny and ep.name in deny:
67 logger.debug("Skipping plugin %s (in deny list)", ep.name)
68 continue
69 try:
70 plugin = ep.load()
71 # Version fence (best effort)
72 requires = getattr(plugin, "PREFECT_PLUGIN_API_REQUIRES", ">=0.1,<1")
74 # Validate plugin API version requirement
75 try:
76 spec = SpecifierSet(requires)
77 if PREFECT_PLUGIN_API_VERSION not in spec:
78 logger.warning(
79 "Skipping plugin %s: requires API version %s, current version is %s",
80 ep.name,
81 requires,
82 PREFECT_PLUGIN_API_VERSION,
83 )
84 continue
85 except InvalidSpecifier:
86 logger.debug(
87 "Plugin %s has invalid version specifier %r, ignoring version check",
88 ep.name,
89 requires,
90 )
92 pm.register(plugin, name=ep.name)
93 logger.debug(
94 "Loaded plugin %s (requires API %s, current %s)",
95 ep.name,
96 requires,
97 PREFECT_PLUGIN_API_VERSION,
98 )
99 except Exception:
100 logger.exception("Failed to load plugin %s", ep.name)
103async def call_async_hook( 1a
104 pm: pluggy.PluginManager, hook_name: str, **kwargs: Any
105) -> list[tuple[str, Any, Exception | None]]:
106 """
107 Call a hook that may return coroutines.
109 This function handles both sync and async hook implementations, gathering
110 results and exceptions per plugin.
112 Args:
113 pm: The PluginManager
114 hook_name: Name of the hook to call
115 **kwargs: Arguments to pass to the hook
117 Returns:
118 List of tuples: (plugin_name, result, exception)
119 - If successful: (name, result, None)
120 - If failed: (name, None, exception)
121 """
122 hook = getattr(pm.hook, hook_name)
123 results: list[tuple[str, Any, Exception | None]] = []
124 for impl in hook.get_hookimpls():
125 fn = impl.function
126 try:
127 res = fn(**kwargs)
128 if inspect.iscoroutine(res):
129 res = await res
130 results.append((impl.plugin_name, res, None))
131 except Exception as exc:
132 results.append((impl.plugin_name, None, exc))
133 return results