Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flow_run_input.py: 45%

29 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import uuid 1a

2from typing import List, Optional 1a

3 

4import sqlalchemy as sa 1a

5from sqlalchemy.ext.asyncio import AsyncSession 1a

6 

7import prefect.server.schemas as schemas 1a

8from prefect.server.database import PrefectDBInterface, db_injector 1a

9 

10 

11@db_injector 1a

12async def create_flow_run_input( 1a

13 db: PrefectDBInterface, 

14 session: AsyncSession, 

15 flow_run_input: schemas.core.FlowRunInput, 

16) -> schemas.core.FlowRunInput: 

17 model = db.FlowRunInput(**flow_run_input.model_dump()) 

18 session.add(model) 

19 await session.flush() 

20 

21 return schemas.core.FlowRunInput.model_validate(model, from_attributes=True) 

22 

23 

24@db_injector 1a

25async def filter_flow_run_input( 1a

26 db: PrefectDBInterface, 

27 session: AsyncSession, 

28 flow_run_id: uuid.UUID, 

29 prefix: str, 

30 limit: int, 

31 exclude_keys: List[str], 

32) -> List[schemas.core.FlowRunInput]: 

33 query = ( 

34 sa.select(db.FlowRunInput) 

35 .where( 

36 sa.and_( 

37 db.FlowRunInput.flow_run_id == flow_run_id, 

38 db.FlowRunInput.key.like(prefix + "%"), 

39 db.FlowRunInput.key.not_in(exclude_keys), 

40 ) 

41 ) 

42 .order_by(db.FlowRunInput.created) 

43 .limit(limit) 

44 ) 

45 

46 result = await session.execute(query) 

47 return [ 

48 schemas.core.FlowRunInput.model_validate(model, from_attributes=True) 

49 for model in result.scalars().all() 

50 ] 

51 

52 

53@db_injector 1a

54async def read_flow_run_input( 1a

55 db: PrefectDBInterface, 

56 session: AsyncSession, 

57 flow_run_id: uuid.UUID, 

58 key: str, 

59) -> Optional[schemas.core.FlowRunInput]: 

60 query = sa.select(db.FlowRunInput).where( 

61 sa.and_( 

62 db.FlowRunInput.flow_run_id == flow_run_id, 

63 db.FlowRunInput.key == key, 

64 ) 

65 ) 

66 

67 result = await session.execute(query) 

68 model = result.scalar() 

69 if model: 

70 return schemas.core.FlowRunInput.model_validate(model, from_attributes=True) 

71 

72 return None 

73 

74 

75@db_injector 1a

76async def delete_flow_run_input( 1a

77 db: PrefectDBInterface, 

78 session: AsyncSession, 

79 flow_run_id: uuid.UUID, 

80 key: str, 

81) -> bool: 

82 result = await session.execute( 

83 sa.delete(db.FlowRunInput).where( 

84 sa.and_( 

85 db.FlowRunInput.flow_run_id == flow_run_id, 

86 db.FlowRunInput.key == key, 

87 ) 

88 ) 

89 ) 

90 

91 return result.rowcount > 0