Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/dependencies.py: 67%
195 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Injected database interface dependencies
3"""
5from collections.abc import Generator 1a
6from contextlib import ExitStack, contextmanager 1a
7from functools import wraps 1a
8from typing import ( 1a
9 TYPE_CHECKING,
10 Any,
11 Callable,
12 Generic,
13 Optional,
14 Union,
15 cast,
16 overload,
17)
19from typing_extensions import ( 1a
20 Concatenate,
21 Never,
22 ParamSpec,
23 Self,
24 TypeAlias,
25 TypedDict,
26 TypeVar,
27)
29from prefect.server.database.configurations import ( 1a
30 AioSqliteConfiguration,
31 AsyncPostgresConfiguration,
32 BaseDatabaseConfiguration,
33)
34from prefect.server.utilities.database import get_dialect 1a
35from prefect.server.utilities.schemas import PrefectDescriptorBase 1a
36from prefect.settings import PREFECT_API_DATABASE_CONNECTION_URL 1a
38if TYPE_CHECKING: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true1a
39 from prefect.server.database.interface import PrefectDBInterface
40 from prefect.server.database.orm_models import BaseORMConfiguration
41 from prefect.server.database.query_components import BaseQueryComponents
43P = ParamSpec("P") 1a
44R = TypeVar("R", infer_variance=True) 1a
45T = TypeVar("T", infer_variance=True) 1a
47_Function = Callable[P, R] 1a
48_Method = Callable[Concatenate[T, P], R] 1a
49_DBFunction: TypeAlias = Callable[Concatenate["PrefectDBInterface", P], R] 1a
50_DBMethod: TypeAlias = Callable[Concatenate[T, "PrefectDBInterface", P], R] 1a
53class _ModelDependencies(TypedDict): 1a
54 database_config: Optional[BaseDatabaseConfiguration] 1a
55 query_components: Optional["BaseQueryComponents"] 1a
56 orm: Optional["BaseORMConfiguration"] 1a
57 interface_class: Optional[type["PrefectDBInterface"]] 1a
60MODELS_DEPENDENCIES: _ModelDependencies = { 1a
61 "database_config": None,
62 "query_components": None,
63 "orm": None,
64 "interface_class": None,
65}
68def provide_database_interface() -> "PrefectDBInterface": 1a
69 """
70 Get the current Prefect REST API database interface.
72 If components of the interface are not set, defaults will be inferred
73 based on the dialect of the connection URL.
74 """
75 from prefect.server.database.interface import PrefectDBInterface 1aebcd
76 from prefect.server.database.orm_models import ( 1aebcd
77 AioSqliteORMConfiguration,
78 AsyncPostgresORMConfiguration,
79 )
80 from prefect.server.database.query_components import ( 1aebcd
81 AioSqliteQueryComponents,
82 AsyncPostgresQueryComponents,
83 )
85 connection_url = PREFECT_API_DATABASE_CONNECTION_URL.value() 1aebcd
87 database_config = MODELS_DEPENDENCIES.get("database_config") 1aebcd
88 query_components = MODELS_DEPENDENCIES.get("query_components") 1aebcd
89 orm = MODELS_DEPENDENCIES.get("orm") 1aebcd
90 interface_class = MODELS_DEPENDENCIES.get("interface_class") 1aebcd
91 dialect = get_dialect(connection_url) 1aebcd
93 if database_config is None: 1aebcd
94 if dialect.name == "postgresql": 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true1a
95 database_config = AsyncPostgresConfiguration(connection_url=connection_url)
96 elif dialect.name == "sqlite": 96 ↛ 99line 96 didn't jump to line 99 because the condition on line 96 was always true1a
97 database_config = AioSqliteConfiguration(connection_url=connection_url) 1a
98 else:
99 raise ValueError(
100 "Unable to infer database configuration from provided dialect. Got"
101 f" dialect name {dialect.name!r}"
102 )
104 MODELS_DEPENDENCIES["database_config"] = database_config 1a
106 if query_components is None: 1aebcd
107 if dialect.name == "postgresql": 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true1a
108 query_components = AsyncPostgresQueryComponents()
109 elif dialect.name == "sqlite": 109 ↛ 112line 109 didn't jump to line 112 because the condition on line 109 was always true1a
110 query_components = AioSqliteQueryComponents() 1a
111 else:
112 raise ValueError(
113 "Unable to infer query components from provided dialect. Got dialect"
114 f" name {dialect.name!r}"
115 )
117 MODELS_DEPENDENCIES["query_components"] = query_components 1a
119 if orm is None: 1aebcd
120 if dialect.name == "postgresql": 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true1a
121 orm = AsyncPostgresORMConfiguration()
122 elif dialect.name == "sqlite": 122 ↛ 125line 122 didn't jump to line 125 because the condition on line 122 was always true1a
123 orm = AioSqliteORMConfiguration() 1a
124 else:
125 raise ValueError(
126 "Unable to infer orm configuration from provided dialect. Got dialect"
127 f" name {dialect.name!r}"
128 )
130 MODELS_DEPENDENCIES["orm"] = orm 1a
132 if interface_class is None: 132 ↛ 135line 132 didn't jump to line 135 because the condition on line 132 was always true1aebcd
133 interface_class = PrefectDBInterface 1aebcd
135 return interface_class( 1aebcd
136 database_config=database_config,
137 query_components=query_components,
138 orm=orm,
139 )
142def inject_db(fn: Callable[P, R]) -> Callable[P, R]: 1a
143 """
144 Decorator that provides a database interface to a function.
146 The decorated function _must_ take a `db` kwarg and if a db is passed
147 when called it will be used instead of creating a new one.
149 """
151 # NOTE: this wrapper will not pass a iscoroutinefunction()
152 # check unless the caller first uses inspect.unwrap()
153 # or we start using inspect.markcoroutinefunction() (Python 3.12)
154 # In the past this has only been an issue when @inject_db
155 # was being used in tests.
156 #
157 # If this becomes an issue again in future, use the @db_injector decorator
158 # instead.
160 @wraps(fn) 1a
161 def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 1a
162 if "db" not in kwargs or kwargs["db"] is None: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true1b
163 kwargs["db"] = provide_database_interface()
164 return fn(*args, **kwargs) 1b
166 return wrapper 1a
169@overload 1a
170def db_injector(func: _DBMethod[T, P, R]) -> _Method[T, P, R]: ... 170 ↛ exitline 170 didn't return from function 'db_injector' because 1a
173@overload 1a
174def db_injector(func: _DBFunction[P, R]) -> _Function[P, R]: ... 174 ↛ exitline 174 didn't return from function 'db_injector' because 1a
177def db_injector( 1a
178 func: Union[_DBMethod[T, P, R], _DBFunction[P, R]],
179) -> Union[_Method[T, P, R], _Function[P, R]]:
180 """
181 Decorator to inject a PrefectDBInterface instance as the first positional
182 argument to the decorated function.
184 Unlike `inject_db`, which injects the database connection as a keyword
185 argument, `db_injector` adds it explicitly as the first positional
186 argument. This change enhances type hinting by making the dependency on
187 PrefectDBInterface explicit in the function signature.
189 When decorating a coroutine function, the result will continue to pass the
190 iscoroutinefunction() test.
192 Args:
193 func: The function or method to decorate.
195 Returns:
196 A wrapped descriptor object which injects the PrefectDBInterface instance
197 as the first argument to the function or method. This handles method
198 binding transparently.
200 """
201 return DBInjector(func) 1ab
204class _FuncWrapper(Generic[P, R]): 1a
205 """Mixin class to delegate all attribute access to a wrapped function
207 This helps compatibility and echos what the Python method wrapper object
208 does, and makes subclasses transarent to many introspection techniques.
210 """
212 __slots__ = "_func" 1a
214 def __init__(self, func: Callable[P, R]) -> None: 1a
215 object.__setattr__(self, "_func", func) 1abcd
217 @property 1a
218 def __wrapped__(self) -> Callable[P, R]: 1a
219 """Access the underlying wrapped function"""
220 return self._func
222 if not TYPE_CHECKING: 222 ↛ exitline 222 didn't exit class '_FuncWrapper' because the condition on line 222 was always true1a
223 # Attribute hooks are guarded against typecheckers which then tend to
224 # mark the class as 'anything goes' otherwise.
226 def __getattr__(self, name: str) -> Any: 1a
227 return getattr(self._func, name) 1a
229 def __setattr__(self, name: str, value: Any) -> None: 1a
230 setattr(self._func, name, value)
232 def __delattr__(self, name: str) -> None: 1a
233 delattr(self._func, name)
236# Descriptor object responsible for injecting the PrefectDBInterface instance.
237# It has no docstring to encourage Python to find the wrapped callable docstring
238# instead.
239class DBInjector( 1a
240 PrefectDescriptorBase,
241 _FuncWrapper[P, R],
242 Generic[T, P, R],
243):
244 __slots__ = ("__name__",) 1a
246 __name__: str 1a
248 if TYPE_CHECKING: 248 ↛ 250line 248 didn't jump to line 250 because the condition on line 248 was never true1a
250 @overload
251 def __new__(cls, func: _DBMethod[T, P, R]) -> "DBInjector[T, P, R]": ...
253 @overload
254 def __new__(cls, func: _DBFunction[P, R]) -> "DBInjector[None, P, R]": ...
256 def __new__(
257 cls, func: Union[_DBMethod[T, P, R], _DBFunction[P, R]]
258 ) -> Union["DBInjector[T, P, R]", "DBInjector[None, P, R]"]: ...
260 def __init__(self, func: Union[_DBMethod[T, P, R], _DBFunction[P, R]]) -> None: 1a
261 super().__init__(cast(Callable[P, R], func)) 1ab
263 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: 1a
264 db = provide_database_interface() 1bc
265 func = cast(_DBFunction[P, R], self._func) 1bc
266 return func(db, *args, **kwargs) 1bc
268 def __set_name__(self, owner: type[T], name: str) -> None: 1a
269 object.__setattr__(self, "__name__", name) 1a
271 @overload 1a
272 def __get__(self, instance: None, owner: type[T]) -> Self: ... 272 ↛ exitline 272 didn't return from function '__get__' because 1a
274 @overload 1a
275 def __get__( 275 ↛ exitline 275 didn't return from function '__get__' because 1a
276 self, instance: T, owner: Optional[type[T]] = None
277 ) -> "_DBInjectorMethod[T, P, R]": ...
279 @overload 1a
280 def __get__(self, instance: None, owner: None) -> Never: ... 280 ↛ exitline 280 didn't return from function '__get__' because 1a
282 def __get__( 1a
283 self, instance: Optional[T], owner: Optional[type[T]] = None
284 ) -> Union[Self, "_DBInjectorMethod[T, P, R]"]:
285 if instance is None: 1abcd
286 if owner is None: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true1a
287 raise TypeError("__get__(None, None) is invalid")
288 return self 1a
289 return _DBInjectorMethod(instance, self._func.__get__(instance)) 1bcd
291 def __repr__(self) -> str: 1a
292 return f"<DBInjector({self._func.__qualname__} at {id(self):#x}>"
294 # The __doc__ property can't be defined in a mix-in.
296 @property 1a
297 def __doc__(self) -> Optional[str]: 1a
298 return getattr(self._func, "__doc__", None) 1abc
300 @__doc__.setter 1a
301 def __doc__(self, doc: Optional[str]) -> None: 1a
302 self._func.__doc__ = doc
304 @__doc__.deleter 1a
305 def __doc__(self) -> None: # type: ignore # pyright doesn't like the override but it is fine 1a
306 self._func.__doc__ = None
309# Proxy object to handle db interface injecting for bound methods.
310class _DBInjectorMethod(_FuncWrapper[P, R], Generic[T, P, R]): 1a
311 __slots__ = ("_owner",) 1a
312 _owner: T 1a
314 def __init__(self, owner: T, func: _DBFunction[P, R]) -> None: 1a
315 super().__init__(cast(Callable[P, R], func)) 1bcd
316 object.__setattr__(self, "_owner", owner) 1bcd
318 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: 1a
319 db = provide_database_interface() 1bcd
320 func = cast(_DBFunction[P, R], self._func) 1bcd
321 return func(db, *args, **kwargs) 1bcd
323 @property 1a
324 def __self__(self) -> T: 1a
325 return self._owner
327 def __repr__(self) -> str: 1a
328 return f"<bound _DBInjectorMethod({self._func.__qualname__} at {id(self):#x}>"
330 # The __doc__ property can't be defined in a mix-in.
332 @property 1a
333 def __doc__(self) -> Optional[str]: 1a
334 return getattr(self._func, "__doc__", None)
336 @__doc__.setter 1a
337 def __doc__(self, doc: Optional[str]) -> None: 1a
338 self._func.__doc__ = doc
340 @__doc__.deleter 1a
341 def __doc__(self) -> None: # type: ignore # pyright doesn't like the override but it is fine 1a
342 self._func.__doc__ = None
345@contextmanager 1a
346def temporary_database_config( 1a
347 tmp_database_config: Optional[BaseDatabaseConfiguration],
348) -> Generator[None, object, None]:
349 """
350 Temporarily override the Prefect REST API database configuration.
351 When the context is closed, the existing database configuration will
352 be restored.
354 Args:
355 tmp_database_config: Prefect REST API database configuration to inject.
357 """
358 starting_config = MODELS_DEPENDENCIES["database_config"]
359 try:
360 MODELS_DEPENDENCIES["database_config"] = tmp_database_config
361 yield
362 finally:
363 MODELS_DEPENDENCIES["database_config"] = starting_config
366@contextmanager 1a
367def temporary_query_components( 1a
368 tmp_queries: Optional["BaseQueryComponents"],
369) -> Generator[None, object, None]:
370 """
371 Temporarily override the Prefect REST API database query components.
372 When the context is closed, the existing query components will
373 be restored.
375 Args:
376 tmp_queries: Prefect REST API query components to inject.
378 """
379 starting_queries = MODELS_DEPENDENCIES["query_components"]
380 try:
381 MODELS_DEPENDENCIES["query_components"] = tmp_queries
382 yield
383 finally:
384 MODELS_DEPENDENCIES["query_components"] = starting_queries
387@contextmanager 1a
388def temporary_orm_config( 1a
389 tmp_orm_config: Optional["BaseORMConfiguration"],
390) -> Generator[None, object, None]:
391 """
392 Temporarily override the Prefect REST API ORM configuration.
393 When the context is closed, the existing orm configuration will
394 be restored.
396 Args:
397 tmp_orm_config: Prefect REST API ORM configuration to inject.
399 """
400 starting_orm_config = MODELS_DEPENDENCIES["orm"]
401 try:
402 MODELS_DEPENDENCIES["orm"] = tmp_orm_config
403 yield
404 finally:
405 MODELS_DEPENDENCIES["orm"] = starting_orm_config
408@contextmanager 1a
409def temporary_interface_class( 1a
410 tmp_interface_class: Optional[type["PrefectDBInterface"]],
411) -> Generator[None, object, None]:
412 """
413 Temporarily override the Prefect REST API interface class When the context is closed,
414 the existing interface will be restored.
416 Args:
417 tmp_interface_class: Prefect REST API interface class to inject.
419 """
420 starting_interface_class = MODELS_DEPENDENCIES["interface_class"]
421 try:
422 MODELS_DEPENDENCIES["interface_class"] = tmp_interface_class
423 yield
424 finally:
425 MODELS_DEPENDENCIES["interface_class"] = starting_interface_class
428@contextmanager 1a
429def temporary_database_interface( 1a
430 tmp_database_config: Optional[BaseDatabaseConfiguration] = None,
431 tmp_queries: Optional["BaseQueryComponents"] = None,
432 tmp_orm_config: Optional["BaseORMConfiguration"] = None,
433 tmp_interface_class: Optional[type["PrefectDBInterface"]] = None,
434) -> Generator[None, object, None]:
435 """
436 Temporarily override the Prefect REST API database interface.
438 Any interface components that are not explicitly provided will be
439 cleared and inferred from the Prefect REST API database connection string
440 dialect.
442 When the context is closed, the existing database interface will
443 be restored.
445 Args:
446 tmp_database_config: An optional Prefect REST API database configuration to inject.
447 tmp_orm_config: An optional Prefect REST API ORM configuration to inject.
448 tmp_queries: Optional Prefect REST API query components to inject.
449 tmp_interface_class: Optional database interface class to inject
451 """
452 with ExitStack() as stack:
453 stack.enter_context(
454 temporary_database_config(tmp_database_config=tmp_database_config)
455 )
456 stack.enter_context(temporary_query_components(tmp_queries=tmp_queries))
457 stack.enter_context(temporary_orm_config(tmp_orm_config=tmp_orm_config))
458 stack.enter_context(
459 temporary_interface_class(tmp_interface_class=tmp_interface_class)
460 )
461 yield
464def set_database_config(database_config: Optional[BaseDatabaseConfiguration]) -> None: 1a
465 """Set Prefect REST API database configuration."""
466 MODELS_DEPENDENCIES["database_config"] = database_config
469def set_query_components(query_components: Optional["BaseQueryComponents"]) -> None: 1a
470 """Set Prefect REST API query components."""
471 MODELS_DEPENDENCIES["query_components"] = query_components
474def set_orm_config(orm_config: Optional["BaseORMConfiguration"]) -> None: 1a
475 """Set Prefect REST API orm configuration."""
476 MODELS_DEPENDENCIES["orm"] = orm_config
479def set_interface_class(interface_class: Optional[type["PrefectDBInterface"]]) -> None: 1a
480 """Set Prefect REST API interface class."""
481 MODELS_DEPENDENCIES["interface_class"] = interface_class