Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/ui/flows.py: 55%
60 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3from datetime import datetime 1a
4from typing import TYPE_CHECKING, Dict, List, Optional 1a
5from uuid import UUID 1a
6from zoneinfo import ZoneInfo 1a
8import sqlalchemy as sa 1a
9from fastapi import Body, Depends 1a
10from pydantic import Field, field_validator 1a
12from prefect.logging import get_logger 1a
13from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
14from prefect.server.schemas.states import StateType 1a
15from prefect.server.utilities.database import UUID as UUIDTypeDecorator 1a
16from prefect.server.utilities.schemas import PrefectBaseModel 1a
17from prefect.server.utilities.server import PrefectRouter 1a
18from prefect.types import DateTime 1a
19from prefect.types._datetime import create_datetime_instance, parse_datetime 1a
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1a
22 import logging
24logger: "logging.Logger" = get_logger() 1a
26router: PrefectRouter = PrefectRouter(prefix="/ui/flows", tags=["Flows", "UI"]) 1a
29class SimpleNextFlowRun(PrefectBaseModel): 1a
30 id: UUID = Field(default=..., description="The flow run id.") 1a
31 flow_id: UUID = Field(default=..., description="The flow id.") 1a
32 name: str = Field(default=..., description="The flow run name") 1a
33 state_name: str = Field(default=..., description="The state name.") 1a
34 state_type: StateType = Field(default=..., description="The state type.") 1a
35 next_scheduled_start_time: DateTime = Field( 1a
36 default=..., description="The next scheduled start time"
37 )
39 @field_validator("next_scheduled_start_time", mode="before") 1a
40 @classmethod 1a
41 def validate_next_scheduled_start_time(cls, v: DateTime | datetime) -> DateTime: 1a
42 if isinstance(v, datetime):
43 return create_datetime_instance(v)
44 return v
47@router.post("/count-deployments") 1a
48async def count_deployments_by_flow( 1a
49 flow_ids: List[UUID] = Body(default=..., embed=True, max_items=200),
50 db: PrefectDBInterface = Depends(provide_database_interface),
51) -> Dict[UUID, int]:
52 """
53 Get deployment counts by flow id.
54 """
55 async with db.session_context() as session:
56 query = (
57 sa.select(
58 db.Deployment.flow_id,
59 sa.func.count(db.Deployment.id).label("deployment_count"),
60 )
61 .where(db.Deployment.flow_id.in_(flow_ids))
62 .group_by(db.Deployment.flow_id)
63 )
65 results = await session.execute(query)
67 deployment_counts_by_flow = {
68 flow_id: deployment_count for flow_id, deployment_count in results.all()
69 }
71 return {
72 flow_id: deployment_counts_by_flow.get(flow_id, 0) for flow_id in flow_ids
73 }
76def _get_postgres_next_runs_query(flow_ids: List[UUID]): 1a
77 # Here we use the raw query because CROSS LATERAL JOINS are very
78 # difficult to express correctly in sqlalchemy.
79 raw_query = sa.text(
80 """
81 SELECT fr.id, fr.name, fr.flow_id, fr.state_name, fr.state_type, fr.state_name, fr.next_scheduled_start_time
82 FROM (
83 SELECT DISTINCT flow_id FROM flow_run
84 WHERE flow_id IN :flow_ids
85 AND state_type = 'SCHEDULED'
86 ) AS unique_flows
87 CROSS JOIN LATERAL (
88 SELECT *
89 FROM flow_run fr
90 WHERE fr.flow_id = unique_flows.flow_id
91 AND fr.state_type = 'SCHEDULED'
92 ORDER BY fr.next_scheduled_start_time ASC
93 LIMIT 1
94 ) fr;
95 """
96 )
98 bindparams = [
99 sa.bindparam(
100 "flow_ids",
101 flow_ids,
102 expanding=True,
103 type_=UUIDTypeDecorator,
104 ),
105 ]
107 query = raw_query.bindparams(*bindparams)
108 return query
111def _get_sqlite_next_runs_query(flow_ids: List[UUID]): 1a
112 raw_query = sa.text(
113 """
114 WITH min_times AS (
115 SELECT flow_id, MIN(next_scheduled_start_time) AS min_next_scheduled_start_time
116 FROM flow_run
117 WHERE flow_id IN :flow_ids
118 AND state_type = 'SCHEDULED'
119 GROUP BY flow_id
120 )
121 SELECT fr.id, fr.name, fr.flow_id, fr.state_name, fr.state_type, fr.next_scheduled_start_time
122 FROM flow_run fr
123 JOIN min_times mt ON fr.flow_id = mt.flow_id AND fr.next_scheduled_start_time = mt.min_next_scheduled_start_time
124 WHERE fr.state_type = 'SCHEDULED';
126 """
127 )
129 bindparams = [
130 sa.bindparam(
131 "flow_ids",
132 flow_ids,
133 expanding=True,
134 type_=UUIDTypeDecorator,
135 ),
136 ]
138 query = raw_query.bindparams(*bindparams)
139 return query
142@router.post("/next-runs") 1a
143async def next_runs_by_flow( 1a
144 flow_ids: List[UUID] = Body(default=..., embed=True, max_items=200),
145 db: PrefectDBInterface = Depends(provide_database_interface),
146) -> Dict[UUID, Optional[SimpleNextFlowRun]]:
147 """
148 Get the next flow run by flow id.
149 """
151 async with db.session_context() as session:
152 if db.dialect.name == "postgresql":
153 query = _get_postgres_next_runs_query(flow_ids=flow_ids)
154 else:
155 query = _get_sqlite_next_runs_query(flow_ids=flow_ids)
157 results = await session.execute(query)
159 results_by_flow_id = {
160 UUID(str(result.flow_id)): SimpleNextFlowRun(
161 id=result.id,
162 flow_id=result.flow_id,
163 name=result.name,
164 state_name=result.state_name,
165 state_type=result.state_type,
166 next_scheduled_start_time=parse_datetime(
167 result.next_scheduled_start_time
168 ).replace(tzinfo=ZoneInfo("UTC"))
169 if isinstance(result.next_scheduled_start_time, str)
170 else result.next_scheduled_start_time,
171 )
172 for result in results.all()
173 }
175 response = {
176 flow_id: results_by_flow_id.get(flow_id, None) for flow_id in flow_ids
177 }
178 return response