Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/database/dependencies.py: 67%

195 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Injected database interface dependencies 

3""" 

4 

5from collections.abc import Generator 1a

6from contextlib import ExitStack, contextmanager 1a

7from functools import wraps 1a

8from typing import ( 1a

9 TYPE_CHECKING, 

10 Any, 

11 Callable, 

12 Generic, 

13 Optional, 

14 Union, 

15 cast, 

16 overload, 

17) 

18 

19from typing_extensions import ( 1a

20 Concatenate, 

21 Never, 

22 ParamSpec, 

23 Self, 

24 TypeAlias, 

25 TypedDict, 

26 TypeVar, 

27) 

28 

29from prefect.server.database.configurations import ( 1a

30 AioSqliteConfiguration, 

31 AsyncPostgresConfiguration, 

32 BaseDatabaseConfiguration, 

33) 

34from prefect.server.utilities.database import get_dialect 1a

35from prefect.server.utilities.schemas import PrefectDescriptorBase 1a

36from prefect.settings import PREFECT_API_DATABASE_CONNECTION_URL 1a

37 

38if TYPE_CHECKING: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true1a

39 from prefect.server.database.interface import PrefectDBInterface 

40 from prefect.server.database.orm_models import BaseORMConfiguration 

41 from prefect.server.database.query_components import BaseQueryComponents 

42 

43P = ParamSpec("P") 1a

44R = TypeVar("R", infer_variance=True) 1a

45T = TypeVar("T", infer_variance=True) 1a

46 

47_Function = Callable[P, R] 1a

48_Method = Callable[Concatenate[T, P], R] 1a

49_DBFunction: TypeAlias = Callable[Concatenate["PrefectDBInterface", P], R] 1a

50_DBMethod: TypeAlias = Callable[Concatenate[T, "PrefectDBInterface", P], R] 1a

51 

52 

53class _ModelDependencies(TypedDict): 1a

54 database_config: Optional[BaseDatabaseConfiguration] 1a

55 query_components: Optional["BaseQueryComponents"] 1a

56 orm: Optional["BaseORMConfiguration"] 1a

57 interface_class: Optional[type["PrefectDBInterface"]] 1a

58 

59 

60MODELS_DEPENDENCIES: _ModelDependencies = { 1a

61 "database_config": None, 

62 "query_components": None, 

63 "orm": None, 

64 "interface_class": None, 

65} 

66 

67 

68def provide_database_interface() -> "PrefectDBInterface": 1a

69 """ 

70 Get the current Prefect REST API database interface. 

71 

72 If components of the interface are not set, defaults will be inferred 

73 based on the dialect of the connection URL. 

74 """ 

75 from prefect.server.database.interface import PrefectDBInterface 1aebcd

76 from prefect.server.database.orm_models import ( 1aebcd

77 AioSqliteORMConfiguration, 

78 AsyncPostgresORMConfiguration, 

79 ) 

80 from prefect.server.database.query_components import ( 1aebcd

81 AioSqliteQueryComponents, 

82 AsyncPostgresQueryComponents, 

83 ) 

84 

85 connection_url = PREFECT_API_DATABASE_CONNECTION_URL.value() 1aebcd

86 

87 database_config = MODELS_DEPENDENCIES.get("database_config") 1aebcd

88 query_components = MODELS_DEPENDENCIES.get("query_components") 1aebcd

89 orm = MODELS_DEPENDENCIES.get("orm") 1aebcd

90 interface_class = MODELS_DEPENDENCIES.get("interface_class") 1aebcd

91 dialect = get_dialect(connection_url) 1aebcd

92 

93 if database_config is None: 1aebcd

94 if dialect.name == "postgresql": 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true1a

95 database_config = AsyncPostgresConfiguration(connection_url=connection_url) 

96 elif dialect.name == "sqlite": 96 ↛ 99line 96 didn't jump to line 99 because the condition on line 96 was always true1a

97 database_config = AioSqliteConfiguration(connection_url=connection_url) 1a

98 else: 

99 raise ValueError( 

100 "Unable to infer database configuration from provided dialect. Got" 

101 f" dialect name {dialect.name!r}" 

102 ) 

103 

104 MODELS_DEPENDENCIES["database_config"] = database_config 1a

105 

106 if query_components is None: 1aebcd

107 if dialect.name == "postgresql": 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true1a

108 query_components = AsyncPostgresQueryComponents() 

109 elif dialect.name == "sqlite": 109 ↛ 112line 109 didn't jump to line 112 because the condition on line 109 was always true1a

110 query_components = AioSqliteQueryComponents() 1a

111 else: 

112 raise ValueError( 

113 "Unable to infer query components from provided dialect. Got dialect" 

114 f" name {dialect.name!r}" 

115 ) 

116 

117 MODELS_DEPENDENCIES["query_components"] = query_components 1a

118 

119 if orm is None: 1aebcd

120 if dialect.name == "postgresql": 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true1a

121 orm = AsyncPostgresORMConfiguration() 

122 elif dialect.name == "sqlite": 122 ↛ 125line 122 didn't jump to line 125 because the condition on line 122 was always true1a

123 orm = AioSqliteORMConfiguration() 1a

124 else: 

125 raise ValueError( 

126 "Unable to infer orm configuration from provided dialect. Got dialect" 

127 f" name {dialect.name!r}" 

128 ) 

129 

130 MODELS_DEPENDENCIES["orm"] = orm 1a

131 

132 if interface_class is None: 132 ↛ 135line 132 didn't jump to line 135 because the condition on line 132 was always true1aebcd

133 interface_class = PrefectDBInterface 1aebcd

134 

135 return interface_class( 1aebcd

136 database_config=database_config, 

137 query_components=query_components, 

138 orm=orm, 

139 ) 

140 

141 

142def inject_db(fn: Callable[P, R]) -> Callable[P, R]: 1a

143 """ 

144 Decorator that provides a database interface to a function. 

145 

146 The decorated function _must_ take a `db` kwarg and if a db is passed 

147 when called it will be used instead of creating a new one. 

148 

149 """ 

150 

151 # NOTE: this wrapper will not pass a iscoroutinefunction() 

152 # check unless the caller first uses inspect.unwrap() 

153 # or we start using inspect.markcoroutinefunction() (Python 3.12) 

154 # In the past this has only been an issue when @inject_db 

155 # was being used in tests. 

156 # 

157 # If this becomes an issue again in future, use the @db_injector decorator 

158 # instead. 

159 

160 @wraps(fn) 1a

161 def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 1a

162 if "db" not in kwargs or kwargs["db"] is None: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true1b

163 kwargs["db"] = provide_database_interface() 

164 return fn(*args, **kwargs) 1b

165 

166 return wrapper 1a

167 

168 

169@overload 1a

170def db_injector(func: _DBMethod[T, P, R]) -> _Method[T, P, R]: ... 170 ↛ exitline 170 didn't return from function 'db_injector' because 1a

171 

172 

173@overload 1a

174def db_injector(func: _DBFunction[P, R]) -> _Function[P, R]: ... 174 ↛ exitline 174 didn't return from function 'db_injector' because 1a

175 

176 

177def db_injector( 1a

178 func: Union[_DBMethod[T, P, R], _DBFunction[P, R]], 

179) -> Union[_Method[T, P, R], _Function[P, R]]: 

180 """ 

181 Decorator to inject a PrefectDBInterface instance as the first positional 

182 argument to the decorated function. 

183 

184 Unlike `inject_db`, which injects the database connection as a keyword 

185 argument, `db_injector` adds it explicitly as the first positional 

186 argument. This change enhances type hinting by making the dependency on 

187 PrefectDBInterface explicit in the function signature. 

188 

189 When decorating a coroutine function, the result will continue to pass the 

190 iscoroutinefunction() test. 

191 

192 Args: 

193 func: The function or method to decorate. 

194 

195 Returns: 

196 A wrapped descriptor object which injects the PrefectDBInterface instance 

197 as the first argument to the function or method. This handles method 

198 binding transparently. 

199 

200 """ 

201 return DBInjector(func) 1ab

202 

203 

204class _FuncWrapper(Generic[P, R]): 1a

205 """Mixin class to delegate all attribute access to a wrapped function 

206 

207 This helps compatibility and echos what the Python method wrapper object 

208 does, and makes subclasses transarent to many introspection techniques. 

209 

210 """ 

211 

212 __slots__ = "_func" 1a

213 

214 def __init__(self, func: Callable[P, R]) -> None: 1a

215 object.__setattr__(self, "_func", func) 1abc

216 

217 @property 1a

218 def __wrapped__(self) -> Callable[P, R]: 1a

219 """Access the underlying wrapped function""" 

220 return self._func 

221 

222 if not TYPE_CHECKING: 222 ↛ exitline 222 didn't exit class '_FuncWrapper' because the condition on line 222 was always true1a

223 # Attribute hooks are guarded against typecheckers which then tend to 

224 # mark the class as 'anything goes' otherwise. 

225 

226 def __getattr__(self, name: str) -> Any: 1a

227 return getattr(self._func, name) 1a

228 

229 def __setattr__(self, name: str, value: Any) -> None: 1a

230 setattr(self._func, name, value) 

231 

232 def __delattr__(self, name: str) -> None: 1a

233 delattr(self._func, name) 

234 

235 

236# Descriptor object responsible for injecting the PrefectDBInterface instance. 

237# It has no docstring to encourage Python to find the wrapped callable docstring 

238# instead. 

239class DBInjector( 1a

240 PrefectDescriptorBase, 

241 _FuncWrapper[P, R], 

242 Generic[T, P, R], 

243): 

244 __slots__ = ("__name__",) 1a

245 

246 __name__: str 1a

247 

248 if TYPE_CHECKING: 248 ↛ 250line 248 didn't jump to line 250 because the condition on line 248 was never true1a

249 

250 @overload 

251 def __new__(cls, func: _DBMethod[T, P, R]) -> "DBInjector[T, P, R]": ... 

252 

253 @overload 

254 def __new__(cls, func: _DBFunction[P, R]) -> "DBInjector[None, P, R]": ... 

255 

256 def __new__( 

257 cls, func: Union[_DBMethod[T, P, R], _DBFunction[P, R]] 

258 ) -> Union["DBInjector[T, P, R]", "DBInjector[None, P, R]"]: ... 

259 

260 def __init__(self, func: Union[_DBMethod[T, P, R], _DBFunction[P, R]]) -> None: 1a

261 super().__init__(cast(Callable[P, R], func)) 1ab

262 

263 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: 1a

264 db = provide_database_interface() 1bcd

265 func = cast(_DBFunction[P, R], self._func) 1bcd

266 return func(db, *args, **kwargs) 1bcd

267 

268 def __set_name__(self, owner: type[T], name: str) -> None: 1a

269 object.__setattr__(self, "__name__", name) 1a

270 

271 @overload 1a

272 def __get__(self, instance: None, owner: type[T]) -> Self: ... 272 ↛ exitline 272 didn't return from function '__get__' because 1a

273 

274 @overload 1a

275 def __get__( 275 ↛ exitline 275 didn't return from function '__get__' because 1a

276 self, instance: T, owner: Optional[type[T]] = None 

277 ) -> "_DBInjectorMethod[T, P, R]": ... 

278 

279 @overload 1a

280 def __get__(self, instance: None, owner: None) -> Never: ... 280 ↛ exitline 280 didn't return from function '__get__' because 1a

281 

282 def __get__( 1a

283 self, instance: Optional[T], owner: Optional[type[T]] = None 

284 ) -> Union[Self, "_DBInjectorMethod[T, P, R]"]: 

285 if instance is None: 1abc

286 if owner is None: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true1a

287 raise TypeError("__get__(None, None) is invalid") 

288 return self 1a

289 return _DBInjectorMethod(instance, self._func.__get__(instance)) 1bc

290 

291 def __repr__(self) -> str: 1a

292 return f"<DBInjector({self._func.__qualname__} at {id(self):#x}>" 

293 

294 # The __doc__ property can't be defined in a mix-in. 

295 

296 @property 1a

297 def __doc__(self) -> Optional[str]: 1a

298 return getattr(self._func, "__doc__", None) 1abc

299 

300 @__doc__.setter 1a

301 def __doc__(self, doc: Optional[str]) -> None: 1a

302 self._func.__doc__ = doc 

303 

304 @__doc__.deleter 1a

305 def __doc__(self) -> None: # type: ignore # pyright doesn't like the override but it is fine 1a

306 self._func.__doc__ = None 

307 

308 

309# Proxy object to handle db interface injecting for bound methods. 

310class _DBInjectorMethod(_FuncWrapper[P, R], Generic[T, P, R]): 1a

311 __slots__ = ("_owner",) 1a

312 _owner: T 1a

313 

314 def __init__(self, owner: T, func: _DBFunction[P, R]) -> None: 1a

315 super().__init__(cast(Callable[P, R], func)) 1bc

316 object.__setattr__(self, "_owner", owner) 1bc

317 

318 def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R: 1a

319 db = provide_database_interface() 1bc

320 func = cast(_DBFunction[P, R], self._func) 1bc

321 return func(db, *args, **kwargs) 1bc

322 

323 @property 1a

324 def __self__(self) -> T: 1a

325 return self._owner 

326 

327 def __repr__(self) -> str: 1a

328 return f"<bound _DBInjectorMethod({self._func.__qualname__} at {id(self):#x}>" 

329 

330 # The __doc__ property can't be defined in a mix-in. 

331 

332 @property 1a

333 def __doc__(self) -> Optional[str]: 1a

334 return getattr(self._func, "__doc__", None) 

335 

336 @__doc__.setter 1a

337 def __doc__(self, doc: Optional[str]) -> None: 1a

338 self._func.__doc__ = doc 

339 

340 @__doc__.deleter 1a

341 def __doc__(self) -> None: # type: ignore # pyright doesn't like the override but it is fine 1a

342 self._func.__doc__ = None 

343 

344 

345@contextmanager 1a

346def temporary_database_config( 1a

347 tmp_database_config: Optional[BaseDatabaseConfiguration], 

348) -> Generator[None, object, None]: 

349 """ 

350 Temporarily override the Prefect REST API database configuration. 

351 When the context is closed, the existing database configuration will 

352 be restored. 

353 

354 Args: 

355 tmp_database_config: Prefect REST API database configuration to inject. 

356 

357 """ 

358 starting_config = MODELS_DEPENDENCIES["database_config"] 

359 try: 

360 MODELS_DEPENDENCIES["database_config"] = tmp_database_config 

361 yield 

362 finally: 

363 MODELS_DEPENDENCIES["database_config"] = starting_config 

364 

365 

366@contextmanager 1a

367def temporary_query_components( 1a

368 tmp_queries: Optional["BaseQueryComponents"], 

369) -> Generator[None, object, None]: 

370 """ 

371 Temporarily override the Prefect REST API database query components. 

372 When the context is closed, the existing query components will 

373 be restored. 

374 

375 Args: 

376 tmp_queries: Prefect REST API query components to inject. 

377 

378 """ 

379 starting_queries = MODELS_DEPENDENCIES["query_components"] 

380 try: 

381 MODELS_DEPENDENCIES["query_components"] = tmp_queries 

382 yield 

383 finally: 

384 MODELS_DEPENDENCIES["query_components"] = starting_queries 

385 

386 

387@contextmanager 1a

388def temporary_orm_config( 1a

389 tmp_orm_config: Optional["BaseORMConfiguration"], 

390) -> Generator[None, object, None]: 

391 """ 

392 Temporarily override the Prefect REST API ORM configuration. 

393 When the context is closed, the existing orm configuration will 

394 be restored. 

395 

396 Args: 

397 tmp_orm_config: Prefect REST API ORM configuration to inject. 

398 

399 """ 

400 starting_orm_config = MODELS_DEPENDENCIES["orm"] 

401 try: 

402 MODELS_DEPENDENCIES["orm"] = tmp_orm_config 

403 yield 

404 finally: 

405 MODELS_DEPENDENCIES["orm"] = starting_orm_config 

406 

407 

408@contextmanager 1a

409def temporary_interface_class( 1a

410 tmp_interface_class: Optional[type["PrefectDBInterface"]], 

411) -> Generator[None, object, None]: 

412 """ 

413 Temporarily override the Prefect REST API interface class When the context is closed, 

414 the existing interface will be restored. 

415 

416 Args: 

417 tmp_interface_class: Prefect REST API interface class to inject. 

418 

419 """ 

420 starting_interface_class = MODELS_DEPENDENCIES["interface_class"] 

421 try: 

422 MODELS_DEPENDENCIES["interface_class"] = tmp_interface_class 

423 yield 

424 finally: 

425 MODELS_DEPENDENCIES["interface_class"] = starting_interface_class 

426 

427 

428@contextmanager 1a

429def temporary_database_interface( 1a

430 tmp_database_config: Optional[BaseDatabaseConfiguration] = None, 

431 tmp_queries: Optional["BaseQueryComponents"] = None, 

432 tmp_orm_config: Optional["BaseORMConfiguration"] = None, 

433 tmp_interface_class: Optional[type["PrefectDBInterface"]] = None, 

434) -> Generator[None, object, None]: 

435 """ 

436 Temporarily override the Prefect REST API database interface. 

437 

438 Any interface components that are not explicitly provided will be 

439 cleared and inferred from the Prefect REST API database connection string 

440 dialect. 

441 

442 When the context is closed, the existing database interface will 

443 be restored. 

444 

445 Args: 

446 tmp_database_config: An optional Prefect REST API database configuration to inject. 

447 tmp_orm_config: An optional Prefect REST API ORM configuration to inject. 

448 tmp_queries: Optional Prefect REST API query components to inject. 

449 tmp_interface_class: Optional database interface class to inject 

450 

451 """ 

452 with ExitStack() as stack: 

453 stack.enter_context( 

454 temporary_database_config(tmp_database_config=tmp_database_config) 

455 ) 

456 stack.enter_context(temporary_query_components(tmp_queries=tmp_queries)) 

457 stack.enter_context(temporary_orm_config(tmp_orm_config=tmp_orm_config)) 

458 stack.enter_context( 

459 temporary_interface_class(tmp_interface_class=tmp_interface_class) 

460 ) 

461 yield 

462 

463 

464def set_database_config(database_config: Optional[BaseDatabaseConfiguration]) -> None: 1a

465 """Set Prefect REST API database configuration.""" 

466 MODELS_DEPENDENCIES["database_config"] = database_config 

467 

468 

469def set_query_components(query_components: Optional["BaseQueryComponents"]) -> None: 1a

470 """Set Prefect REST API query components.""" 

471 MODELS_DEPENDENCIES["query_components"] = query_components 

472 

473 

474def set_orm_config(orm_config: Optional["BaseORMConfiguration"]) -> None: 1a

475 """Set Prefect REST API orm configuration.""" 

476 MODELS_DEPENDENCIES["orm"] = orm_config 

477 

478 

479def set_interface_class(interface_class: Optional[type["PrefectDBInterface"]]) -> None: 1a

480 """Set Prefect REST API interface class.""" 

481 MODELS_DEPENDENCIES["interface_class"] = interface_class