Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/ui/flow_runs.py: 72%
37 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import datetime 1a
4from typing import TYPE_CHECKING, List 1a
5from uuid import UUID 1a
7import sqlalchemy as sa 1a
8from fastapi import Body, Depends 1a
9from pydantic import Field 1a
11import prefect.server.schemas as schemas 1a
12from prefect.logging import get_logger 1a
13from prefect.server import models 1a
14from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
15from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a
16from prefect.server.utilities.server import PrefectRouter 1a
17from prefect.types import DateTime 1a
19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a
20 import logging
22logger: "logging.Logger" = get_logger("server.api.ui.flow_runs") 1a
24router: PrefectRouter = PrefectRouter(prefix="/ui/flow_runs", tags=["Flow Runs", "UI"]) 1a
27class SimpleFlowRun(PrefectBaseModel): 1a
28 id: UUID = Field(default=..., description="The flow run id.") 1a
29 state_type: schemas.states.StateType = Field( 1a
30 default=..., description="The state type."
31 )
32 timestamp: DateTime = Field( 1a
33 default=...,
34 description=(
35 "The start time of the run, or the expected start time "
36 "if it hasn't run yet."
37 ),
38 )
39 duration: datetime.timedelta = Field( 1a
40 default=..., description="The total run time of the run."
41 )
42 lateness: datetime.timedelta = Field( 1a
43 default=..., description="The delay between the expected and actual start time."
44 )
47@router.post("/history") 1a
48async def read_flow_run_history( 1a
49 sort: schemas.sorting.FlowRunSort = Body(
50 schemas.sorting.FlowRunSort.EXPECTED_START_TIME_DESC
51 ),
52 limit: int = Body(1000, le=1000),
53 offset: int = Body(0, ge=0),
54 flows: schemas.filters.FlowFilter = None,
55 flow_runs: schemas.filters.FlowRunFilter = None,
56 task_runs: schemas.filters.TaskRunFilter = None,
57 deployments: schemas.filters.DeploymentFilter = None,
58 work_pools: schemas.filters.WorkPoolFilter = None,
59 db: PrefectDBInterface = Depends(provide_database_interface),
60) -> List[SimpleFlowRun]:
61 columns = [
62 db.FlowRun.id,
63 db.FlowRun.state_type,
64 db.FlowRun.start_time,
65 db.FlowRun.expected_start_time,
66 db.FlowRun.total_run_time,
67 # Although it isn't returned, we need to select
68 # this field in order to compute `estimated_run_time`
69 db.FlowRun.state_timestamp,
70 ]
71 async with db.session_context() as session:
72 result = await models.flow_runs.read_flow_runs(
73 columns=columns,
74 flow_filter=flows,
75 flow_run_filter=flow_runs,
76 task_run_filter=task_runs,
77 deployment_filter=deployments,
78 work_pool_filter=work_pools,
79 sort=sort,
80 limit=limit,
81 offset=offset,
82 session=session,
83 )
84 return [
85 SimpleFlowRun(
86 id=r.id,
87 state_type=r.state_type,
88 timestamp=r.start_time or r.expected_start_time,
89 duration=r.estimated_run_time,
90 lateness=r.estimated_start_time_delta,
91 )
92 for r in result
93 ]
96@router.post("/count-task-runs") 1a
97async def count_task_runs_by_flow_run( 1a
98 flow_run_ids: list[UUID] = Body(default=..., embed=True, max_items=200),
99 db: PrefectDBInterface = Depends(provide_database_interface),
100) -> dict[UUID, int]:
101 """
102 Get task run counts by flow run id.
103 """
104 async with db.session_context() as session:
105 query = (
106 sa.select(
107 db.TaskRun.flow_run_id,
108 sa.func.count(db.TaskRun.id).label("task_run_count"),
109 )
110 .where(
111 sa.and_(
112 db.TaskRun.flow_run_id.in_(flow_run_ids),
113 sa.not_(db.TaskRun.subflow_run.has()),
114 )
115 )
116 .group_by(db.TaskRun.flow_run_id)
117 )
119 results = await session.execute(query)
121 task_run_counts_by_flow_run = {
122 flow_run_id: task_run_count for flow_run_id, task_run_count in results.t
123 }
125 return {
126 flow_run_id: task_run_counts_by_flow_run.get(flow_run_id, 0)
127 for flow_run_id in flow_run_ids
128 }