Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flow_run_states.py: 84%

19 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Functions for interacting with flow run state ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6from typing import Sequence, Union 1a

7from uuid import UUID 1a

8 

9from sqlalchemy import delete, select 1a

10from sqlalchemy.ext.asyncio import AsyncSession 1a

11 

12from prefect.server.database import orm_models 1a

13from prefect.server.database.dependencies import db_injector 1a

14from prefect.server.database.interface import PrefectDBInterface 1a

15 

16 

17@db_injector 1a

18async def read_flow_run_state( 1a

19 db: PrefectDBInterface, session: AsyncSession, flow_run_state_id: UUID 

20) -> Union[orm_models.FlowRunState, None]: 

21 """ 

22 Reads a flow run state by id. 

23 

24 Args: 

25 session: A database session 

26 flow_run_state_id: a flow run state id 

27 

28 Returns: 

29 orm_models.FlowRunState: the flow state 

30 """ 

31 

32 return await session.get(db.FlowRunState, flow_run_state_id) 1b

33 

34 

35@db_injector 1a

36async def read_flow_run_states( 1a

37 db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID 

38) -> Sequence[orm_models.FlowRunState]: 

39 """ 

40 Reads flow runs states for a flow run. 

41 

42 Args: 

43 session: A database session 

44 flow_run_id: the flow run id 

45 

46 Returns: 

47 List[orm_models.FlowRunState]: the flow run states 

48 """ 

49 

50 query = ( 1bc

51 select(db.FlowRunState) 

52 .filter_by(flow_run_id=flow_run_id) 

53 .order_by(db.FlowRunState.timestamp) 

54 ) 

55 result = await session.execute(query) 1bc

56 return result.scalars().unique().all() 

57 

58 

59@db_injector 1a

60async def delete_flow_run_state( 1a

61 db: PrefectDBInterface, 

62 session: AsyncSession, 

63 flow_run_state_id: UUID, 

64) -> bool: 

65 """ 

66 Delete a flow run state by id. 

67 

68 Args: 

69 session: A database session 

70 flow_run_state_id: a flow run state id 

71 

72 Returns: 

73 bool: whether or not the flow run state was deleted 

74 """ 

75 

76 result = await session.execute( 

77 delete(db.FlowRunState).where(db.FlowRunState.id == flow_run_state_id) 

78 ) 

79 return result.rowcount > 0