Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/flows.py: 97%
62 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Routes for interacting with flow objects.
3"""
5from typing import List, Optional 1c
6from uuid import UUID 1c
8from fastapi import Depends, HTTPException, Path, Response, status 1c
9from fastapi.param_functions import Body 1c
11import prefect.server.api.dependencies as dependencies 1c
12import prefect.server.models as models 1c
13import prefect.server.schemas as schemas 1c
14from prefect.server.database import PrefectDBInterface, provide_database_interface 1c
15from prefect.server.schemas.responses import FlowPaginationResponse 1c
16from prefect.server.utilities.server import PrefectRouter 1c
17from prefect.types._datetime import now 1c
19router: PrefectRouter = PrefectRouter(prefix="/flows", tags=["Flows"]) 1c
22@router.post("/") 1c
23async def create_flow( 1c
24 flow: schemas.actions.FlowCreate,
25 response: Response,
26 db: PrefectDBInterface = Depends(provide_database_interface),
27) -> schemas.core.Flow:
28 """Creates a new flow from the provided schema. If a flow with the
29 same name already exists, the existing flow is returned.
31 For more information, see https://docs.prefect.io/v3/concepts/flows.
32 """
33 # hydrate the input model into a full flow model
34 flow = schemas.core.Flow(**flow.model_dump()) 1adb
36 right_now = now("UTC") 1adb
38 async with db.session_context(begin_transaction=True) as session: 1adb
39 model = await models.flows.create_flow(session=session, flow=flow) 1adb
41 if model.created >= right_now: 1adb
42 response.status_code = status.HTTP_201_CREATED 1adb
43 return model 1adb
46@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1c
47async def update_flow( 1c
48 flow: schemas.actions.FlowUpdate,
49 flow_id: UUID = Path(..., description="The flow id", alias="id"),
50 db: PrefectDBInterface = Depends(provide_database_interface),
51) -> None:
52 """
53 Updates a flow.
54 """
55 async with db.session_context(begin_transaction=True) as session: 1ab
56 result = await models.flows.update_flow( 1ab
57 session=session, flow=flow, flow_id=flow_id
58 )
59 if not result: 1ab
60 raise HTTPException( 1ab
61 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
62 )
65@router.post("/count") 1c
66async def count_flows( 1c
67 flows: schemas.filters.FlowFilter = None,
68 flow_runs: schemas.filters.FlowRunFilter = None,
69 task_runs: schemas.filters.TaskRunFilter = None,
70 deployments: schemas.filters.DeploymentFilter = None,
71 work_pools: schemas.filters.WorkPoolFilter = None,
72 db: PrefectDBInterface = Depends(provide_database_interface),
73) -> int:
74 """
75 Count flows.
76 """
77 async with db.session_context() as session: 1ab
78 return await models.flows.count_flows( 1ab
79 session=session,
80 flow_filter=flows,
81 flow_run_filter=flow_runs,
82 task_run_filter=task_runs,
83 deployment_filter=deployments,
84 work_pool_filter=work_pools,
85 )
88@router.get("/name/{name}") 1c
89async def read_flow_by_name( 1c
90 name: str = Path(..., description="The name of the flow"),
91 db: PrefectDBInterface = Depends(provide_database_interface),
92) -> schemas.core.Flow:
93 """
94 Get a flow by name.
95 """
96 async with db.session_context() as session: 1ab
97 flow = await models.flows.read_flow_by_name(session=session, name=name) 1ab
98 if not flow: 98 ↛ 102line 98 didn't jump to line 102 because the condition on line 98 was always true1ab
99 raise HTTPException( 1ab
100 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
101 )
102 return flow
105@router.get("/{id:uuid}") 1c
106async def read_flow( 1c
107 flow_id: UUID = Path(..., description="The flow id", alias="id"),
108 db: PrefectDBInterface = Depends(provide_database_interface),
109) -> schemas.core.Flow:
110 """
111 Get a flow by id.
112 """
113 async with db.session_context() as session: 1adb
114 flow = await models.flows.read_flow(session=session, flow_id=flow_id) 1adb
115 if not flow: 1adb
116 raise HTTPException( 1ab
117 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
118 )
119 return flow 1adb
122@router.post("/filter") 1c
123async def read_flows( 1c
124 limit: int = dependencies.LimitBody(),
125 offset: int = Body(0, ge=0),
126 flows: schemas.filters.FlowFilter = None,
127 flow_runs: schemas.filters.FlowRunFilter = None,
128 task_runs: schemas.filters.TaskRunFilter = None,
129 deployments: schemas.filters.DeploymentFilter = None,
130 work_pools: schemas.filters.WorkPoolFilter = None,
131 sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC),
132 db: PrefectDBInterface = Depends(provide_database_interface),
133) -> List[schemas.core.Flow]:
134 """
135 Query for flows.
136 """
137 async with db.session_context() as session: 1adeb
138 return await models.flows.read_flows( 1adeb
139 session=session,
140 flow_filter=flows,
141 flow_run_filter=flow_runs,
142 task_run_filter=task_runs,
143 deployment_filter=deployments,
144 work_pool_filter=work_pools,
145 sort=sort,
146 offset=offset,
147 limit=limit,
148 )
151@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1c
152async def delete_flow( 1c
153 flow_id: UUID = Path(..., description="The flow id", alias="id"),
154 db: PrefectDBInterface = Depends(provide_database_interface),
155) -> None:
156 """
157 Delete a flow by id.
158 """
159 async with db.session_context(begin_transaction=True) as session: 1adb
160 result = await models.flows.delete_flow(session=session, flow_id=flow_id) 1adb
161 if not result: 1adb
162 raise HTTPException( 1adb
163 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
164 )
167@router.post("/paginate") 1c
168async def paginate_flows( 1c
169 limit: int = dependencies.LimitBody(),
170 page: int = Body(1, ge=1),
171 flows: Optional[schemas.filters.FlowFilter] = None,
172 flow_runs: Optional[schemas.filters.FlowRunFilter] = None,
173 task_runs: Optional[schemas.filters.TaskRunFilter] = None,
174 deployments: Optional[schemas.filters.DeploymentFilter] = None,
175 work_pools: Optional[schemas.filters.WorkPoolFilter] = None,
176 sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC),
177 db: PrefectDBInterface = Depends(provide_database_interface),
178) -> FlowPaginationResponse:
179 """
180 Pagination query for flows.
181 """
182 offset = (page - 1) * limit 1adb
184 async with db.session_context() as session: 1adb
185 results = await models.flows.read_flows( 1adb
186 session=session,
187 flow_filter=flows,
188 flow_run_filter=flow_runs,
189 task_run_filter=task_runs,
190 deployment_filter=deployments,
191 work_pool_filter=work_pools,
192 sort=sort,
193 offset=offset,
194 limit=limit,
195 )
197 count = await models.flows.count_flows( 1adb
198 session=session,
199 flow_filter=flows,
200 flow_run_filter=flow_runs,
201 task_run_filter=task_runs,
202 deployment_filter=deployments,
203 work_pool_filter=work_pools,
204 )
206 return FlowPaginationResponse( 1adb
207 results=results,
208 count=count,
209 limit=limit,
210 pages=(count + limit - 1) // limit,
211 page=page,
212 )