Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flow_run_states.py: 68%
19 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Functions for interacting with flow run state ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6from typing import Sequence, Union 1a
7from uuid import UUID 1a
9from sqlalchemy import delete, select 1a
10from sqlalchemy.ext.asyncio import AsyncSession 1a
12from prefect.server.database import orm_models 1a
13from prefect.server.database.dependencies import db_injector 1a
14from prefect.server.database.interface import PrefectDBInterface 1a
17@db_injector 1a
18async def read_flow_run_state( 1a
19 db: PrefectDBInterface, session: AsyncSession, flow_run_state_id: UUID
20) -> Union[orm_models.FlowRunState, None]:
21 """
22 Reads a flow run state by id.
24 Args:
25 session: A database session
26 flow_run_state_id: a flow run state id
28 Returns:
29 orm_models.FlowRunState: the flow state
30 """
32 return await session.get(db.FlowRunState, flow_run_state_id)
35@db_injector 1a
36async def read_flow_run_states( 1a
37 db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID
38) -> Sequence[orm_models.FlowRunState]:
39 """
40 Reads flow runs states for a flow run.
42 Args:
43 session: A database session
44 flow_run_id: the flow run id
46 Returns:
47 List[orm_models.FlowRunState]: the flow run states
48 """
50 query = (
51 select(db.FlowRunState)
52 .filter_by(flow_run_id=flow_run_id)
53 .order_by(db.FlowRunState.timestamp)
54 )
55 result = await session.execute(query)
56 return result.scalars().unique().all()
59@db_injector 1a
60async def delete_flow_run_state( 1a
61 db: PrefectDBInterface,
62 session: AsyncSession,
63 flow_run_state_id: UUID,
64) -> bool:
65 """
66 Delete a flow run state by id.
68 Args:
69 session: A database session
70 flow_run_state_id: a flow run state id
72 Returns:
73 bool: whether or not the flow run state was deleted
74 """
76 result = await session.execute(
77 delete(db.FlowRunState).where(db.FlowRunState.id == flow_run_state_id)
78 )
79 return result.rowcount > 0