Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/configuration.py: 57%
21 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from typing import Optional 1a
3import sqlalchemy as sa 1a
4from sqlalchemy.ext.asyncio import AsyncSession 1a
6from prefect.server import schemas 1a
7from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a
10@db_injector 1a
11async def write_configuration( 1a
12 db: PrefectDBInterface,
13 session: AsyncSession,
14 configuration: schemas.core.Configuration,
15) -> orm_models.Configuration:
16 # first see if the key already exists
17 query = sa.select(db.Configuration).where(db.Configuration.key == configuration.key) 1b
18 result = await session.execute(query) # type: ignore 1b
19 existing_configuration = result.scalar()
20 # if it exists, update its value
21 if existing_configuration:
22 existing_configuration.value = configuration.value
23 # else create a new ORM object
24 else:
25 existing_configuration = db.Configuration(
26 key=configuration.key, value=configuration.value
27 )
28 session.add(existing_configuration)
29 await session.flush() 1b
31 # clear the cache for this key after writing a value
32 db.queries.clear_configuration_value_cache_for_key(key=configuration.key)
34 return existing_configuration
37@db_injector 1a
38async def read_configuration( 1a
39 db: PrefectDBInterface,
40 session: AsyncSession,
41 key: str,
42) -> Optional[schemas.core.Configuration]:
43 value = await db.queries.read_configuration_value(session=session, key=key) 1cb
44 return (
45 schemas.core.Configuration(key=key, value=value) if value is not None else None
46 )