Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/flows.py: 39%
62 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Routes for interacting with flow objects.
3"""
5from typing import List, Optional 1a
6from uuid import UUID 1a
8from fastapi import Depends, HTTPException, Path, Response, status 1a
9from fastapi.param_functions import Body 1a
11import prefect.server.api.dependencies as dependencies 1a
12import prefect.server.models as models 1a
13import prefect.server.schemas as schemas 1a
14from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
15from prefect.server.schemas.responses import FlowPaginationResponse 1a
16from prefect.server.utilities.server import PrefectRouter 1a
17from prefect.types._datetime import now 1a
19router: PrefectRouter = PrefectRouter(prefix="/flows", tags=["Flows"]) 1a
22@router.post("/") 1a
23async def create_flow( 1a
24 flow: schemas.actions.FlowCreate,
25 response: Response,
26 db: PrefectDBInterface = Depends(provide_database_interface),
27) -> schemas.core.Flow:
28 """Creates a new flow from the provided schema. If a flow with the
29 same name already exists, the existing flow is returned.
31 For more information, see https://docs.prefect.io/v3/concepts/flows.
32 """
33 # hydrate the input model into a full flow model
34 flow = schemas.core.Flow(**flow.model_dump())
36 right_now = now("UTC")
38 async with db.session_context(begin_transaction=True) as session:
39 model = await models.flows.create_flow(session=session, flow=flow)
41 if model.created >= right_now:
42 response.status_code = status.HTTP_201_CREATED
43 return model
46@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a
47async def update_flow( 1a
48 flow: schemas.actions.FlowUpdate,
49 flow_id: UUID = Path(..., description="The flow id", alias="id"),
50 db: PrefectDBInterface = Depends(provide_database_interface),
51) -> None:
52 """
53 Updates a flow.
54 """
55 async with db.session_context(begin_transaction=True) as session:
56 result = await models.flows.update_flow(
57 session=session, flow=flow, flow_id=flow_id
58 )
59 if not result:
60 raise HTTPException(
61 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
62 )
65@router.post("/count") 1a
66async def count_flows( 1a
67 flows: schemas.filters.FlowFilter = None,
68 flow_runs: schemas.filters.FlowRunFilter = None,
69 task_runs: schemas.filters.TaskRunFilter = None,
70 deployments: schemas.filters.DeploymentFilter = None,
71 work_pools: schemas.filters.WorkPoolFilter = None,
72 db: PrefectDBInterface = Depends(provide_database_interface),
73) -> int:
74 """
75 Count flows.
76 """
77 async with db.session_context() as session:
78 return await models.flows.count_flows(
79 session=session,
80 flow_filter=flows,
81 flow_run_filter=flow_runs,
82 task_run_filter=task_runs,
83 deployment_filter=deployments,
84 work_pool_filter=work_pools,
85 )
88@router.get("/name/{name}") 1a
89async def read_flow_by_name( 1a
90 name: str = Path(..., description="The name of the flow"),
91 db: PrefectDBInterface = Depends(provide_database_interface),
92) -> schemas.core.Flow:
93 """
94 Get a flow by name.
95 """
96 async with db.session_context() as session:
97 flow = await models.flows.read_flow_by_name(session=session, name=name)
98 if not flow:
99 raise HTTPException(
100 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
101 )
102 return flow
105@router.get("/{id:uuid}") 1a
106async def read_flow( 1a
107 flow_id: UUID = Path(..., description="The flow id", alias="id"),
108 db: PrefectDBInterface = Depends(provide_database_interface),
109) -> schemas.core.Flow:
110 """
111 Get a flow by id.
112 """
113 async with db.session_context() as session:
114 flow = await models.flows.read_flow(session=session, flow_id=flow_id)
115 if not flow:
116 raise HTTPException(
117 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
118 )
119 return flow
122@router.post("/filter") 1a
123async def read_flows( 1a
124 limit: int = dependencies.LimitBody(),
125 offset: int = Body(0, ge=0),
126 flows: schemas.filters.FlowFilter = None,
127 flow_runs: schemas.filters.FlowRunFilter = None,
128 task_runs: schemas.filters.TaskRunFilter = None,
129 deployments: schemas.filters.DeploymentFilter = None,
130 work_pools: schemas.filters.WorkPoolFilter = None,
131 sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC),
132 db: PrefectDBInterface = Depends(provide_database_interface),
133) -> List[schemas.core.Flow]:
134 """
135 Query for flows.
136 """
137 async with db.session_context() as session:
138 return await models.flows.read_flows(
139 session=session,
140 flow_filter=flows,
141 flow_run_filter=flow_runs,
142 task_run_filter=task_runs,
143 deployment_filter=deployments,
144 work_pool_filter=work_pools,
145 sort=sort,
146 offset=offset,
147 limit=limit,
148 )
151@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a
152async def delete_flow( 1a
153 flow_id: UUID = Path(..., description="The flow id", alias="id"),
154 db: PrefectDBInterface = Depends(provide_database_interface),
155) -> None:
156 """
157 Delete a flow by id.
158 """
159 async with db.session_context(begin_transaction=True) as session:
160 result = await models.flows.delete_flow(session=session, flow_id=flow_id)
161 if not result:
162 raise HTTPException(
163 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found"
164 )
167@router.post("/paginate") 1a
168async def paginate_flows( 1a
169 limit: int = dependencies.LimitBody(),
170 page: int = Body(1, ge=1),
171 flows: Optional[schemas.filters.FlowFilter] = None,
172 flow_runs: Optional[schemas.filters.FlowRunFilter] = None,
173 task_runs: Optional[schemas.filters.TaskRunFilter] = None,
174 deployments: Optional[schemas.filters.DeploymentFilter] = None,
175 work_pools: Optional[schemas.filters.WorkPoolFilter] = None,
176 sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC),
177 db: PrefectDBInterface = Depends(provide_database_interface),
178) -> FlowPaginationResponse:
179 """
180 Pagination query for flows.
181 """
182 offset = (page - 1) * limit
184 async with db.session_context() as session:
185 results = await models.flows.read_flows(
186 session=session,
187 flow_filter=flows,
188 flow_run_filter=flow_runs,
189 task_run_filter=task_runs,
190 deployment_filter=deployments,
191 work_pool_filter=work_pools,
192 sort=sort,
193 offset=offset,
194 limit=limit,
195 )
197 count = await models.flows.count_flows(
198 session=session,
199 flow_filter=flows,
200 flow_run_filter=flow_runs,
201 task_run_filter=task_runs,
202 deployment_filter=deployments,
203 work_pool_filter=work_pools,
204 )
206 return FlowPaginationResponse(
207 results=results,
208 count=count,
209 limit=limit,
210 pages=(count + limit - 1) // limit,
211 page=page,
212 )