Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/ui/flow_runs.py: 72%

37 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import datetime 1a

4from typing import TYPE_CHECKING, List 1a

5from uuid import UUID 1a

6 

7import sqlalchemy as sa 1a

8from fastapi import Body, Depends 1a

9from pydantic import Field 1a

10 

11import prefect.server.schemas as schemas 1a

12from prefect.logging import get_logger 1a

13from prefect.server import models 1a

14from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

15from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a

16from prefect.server.utilities.server import PrefectRouter 1a

17from prefect.types import DateTime 1a

18 

19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a

20 import logging 

21 

22logger: "logging.Logger" = get_logger("server.api.ui.flow_runs") 1a

23 

24router: PrefectRouter = PrefectRouter(prefix="/ui/flow_runs", tags=["Flow Runs", "UI"]) 1a

25 

26 

27class SimpleFlowRun(PrefectBaseModel): 1a

28 id: UUID = Field(default=..., description="The flow run id.") 1a

29 state_type: schemas.states.StateType = Field( 1a

30 default=..., description="The state type." 

31 ) 

32 timestamp: DateTime = Field( 1a

33 default=..., 

34 description=( 

35 "The start time of the run, or the expected start time " 

36 "if it hasn't run yet." 

37 ), 

38 ) 

39 duration: datetime.timedelta = Field( 1a

40 default=..., description="The total run time of the run." 

41 ) 

42 lateness: datetime.timedelta = Field( 1a

43 default=..., description="The delay between the expected and actual start time." 

44 ) 

45 

46 

47@router.post("/history") 1a

48async def read_flow_run_history( 1a

49 sort: schemas.sorting.FlowRunSort = Body( 

50 schemas.sorting.FlowRunSort.EXPECTED_START_TIME_DESC 

51 ), 

52 limit: int = Body(1000, le=1000), 

53 offset: int = Body(0, ge=0), 

54 flows: schemas.filters.FlowFilter = None, 

55 flow_runs: schemas.filters.FlowRunFilter = None, 

56 task_runs: schemas.filters.TaskRunFilter = None, 

57 deployments: schemas.filters.DeploymentFilter = None, 

58 work_pools: schemas.filters.WorkPoolFilter = None, 

59 db: PrefectDBInterface = Depends(provide_database_interface), 

60) -> List[SimpleFlowRun]: 

61 columns = [ 

62 db.FlowRun.id, 

63 db.FlowRun.state_type, 

64 db.FlowRun.start_time, 

65 db.FlowRun.expected_start_time, 

66 db.FlowRun.total_run_time, 

67 # Although it isn't returned, we need to select 

68 # this field in order to compute `estimated_run_time` 

69 db.FlowRun.state_timestamp, 

70 ] 

71 async with db.session_context() as session: 

72 result = await models.flow_runs.read_flow_runs( 

73 columns=columns, 

74 flow_filter=flows, 

75 flow_run_filter=flow_runs, 

76 task_run_filter=task_runs, 

77 deployment_filter=deployments, 

78 work_pool_filter=work_pools, 

79 sort=sort, 

80 limit=limit, 

81 offset=offset, 

82 session=session, 

83 ) 

84 return [ 

85 SimpleFlowRun( 

86 id=r.id, 

87 state_type=r.state_type, 

88 timestamp=r.start_time or r.expected_start_time, 

89 duration=r.estimated_run_time, 

90 lateness=r.estimated_start_time_delta, 

91 ) 

92 for r in result 

93 ] 

94 

95 

96@router.post("/count-task-runs") 1a

97async def count_task_runs_by_flow_run( 1a

98 flow_run_ids: list[UUID] = Body(default=..., embed=True, max_items=200), 

99 db: PrefectDBInterface = Depends(provide_database_interface), 

100) -> dict[UUID, int]: 

101 """ 

102 Get task run counts by flow run id. 

103 """ 

104 async with db.session_context() as session: 

105 query = ( 

106 sa.select( 

107 db.TaskRun.flow_run_id, 

108 sa.func.count(db.TaskRun.id).label("task_run_count"), 

109 ) 

110 .where( 

111 sa.and_( 

112 db.TaskRun.flow_run_id.in_(flow_run_ids), 

113 sa.not_(db.TaskRun.subflow_run.has()), 

114 ) 

115 ) 

116 .group_by(db.TaskRun.flow_run_id) 

117 ) 

118 

119 results = await session.execute(query) 

120 

121 task_run_counts_by_flow_run = { 

122 flow_run_id: task_run_count for flow_run_id, task_run_count in results.t 

123 } 

124 

125 return { 

126 flow_run_id: task_run_counts_by_flow_run.get(flow_run_id, 0) 

127 for flow_run_id in flow_run_ids 

128 }