Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/configuration.py: 57%

21 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from typing import Optional 1a

2 

3import sqlalchemy as sa 1a

4from sqlalchemy.ext.asyncio import AsyncSession 1a

5 

6from prefect.server import schemas 1a

7from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a

8 

9 

10@db_injector 1a

11async def write_configuration( 1a

12 db: PrefectDBInterface, 

13 session: AsyncSession, 

14 configuration: schemas.core.Configuration, 

15) -> orm_models.Configuration: 

16 # first see if the key already exists 

17 query = sa.select(db.Configuration).where(db.Configuration.key == configuration.key) 1b

18 result = await session.execute(query) # type: ignore 1b

19 existing_configuration = result.scalar() 

20 # if it exists, update its value 

21 if existing_configuration: 

22 existing_configuration.value = configuration.value 

23 # else create a new ORM object 

24 else: 

25 existing_configuration = db.Configuration( 

26 key=configuration.key, value=configuration.value 

27 ) 

28 session.add(existing_configuration) 

29 await session.flush() 1b

30 

31 # clear the cache for this key after writing a value 

32 db.queries.clear_configuration_value_cache_for_key(key=configuration.key) 

33 

34 return existing_configuration 

35 

36 

37@db_injector 1a

38async def read_configuration( 1a

39 db: PrefectDBInterface, 

40 session: AsyncSession, 

41 key: str, 

42) -> Optional[schemas.core.Configuration]: 

43 value = await db.queries.read_configuration_value(session=session, key=key) 1cb

44 return ( 

45 schemas.core.Configuration(key=key, value=value) if value is not None else None 

46 )