Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/flows.py: 39%

62 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Routes for interacting with flow objects. 

3""" 

4 

5from typing import List, Optional 1a

6from uuid import UUID 1a

7 

8from fastapi import Depends, HTTPException, Path, Response, status 1a

9from fastapi.param_functions import Body 1a

10 

11import prefect.server.api.dependencies as dependencies 1a

12import prefect.server.models as models 1a

13import prefect.server.schemas as schemas 1a

14from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

15from prefect.server.schemas.responses import FlowPaginationResponse 1a

16from prefect.server.utilities.server import PrefectRouter 1a

17from prefect.types._datetime import now 1a

18 

19router: PrefectRouter = PrefectRouter(prefix="/flows", tags=["Flows"]) 1a

20 

21 

22@router.post("/") 1a

23async def create_flow( 1a

24 flow: schemas.actions.FlowCreate, 

25 response: Response, 

26 db: PrefectDBInterface = Depends(provide_database_interface), 

27) -> schemas.core.Flow: 

28 """Creates a new flow from the provided schema. If a flow with the 

29 same name already exists, the existing flow is returned. 

30 

31 For more information, see https://docs.prefect.io/v3/concepts/flows. 

32 """ 

33 # hydrate the input model into a full flow model 

34 flow = schemas.core.Flow(**flow.model_dump()) 

35 

36 right_now = now("UTC") 

37 

38 async with db.session_context(begin_transaction=True) as session: 

39 model = await models.flows.create_flow(session=session, flow=flow) 

40 

41 if model.created >= right_now: 

42 response.status_code = status.HTTP_201_CREATED 

43 return model 

44 

45 

46@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

47async def update_flow( 1a

48 flow: schemas.actions.FlowUpdate, 

49 flow_id: UUID = Path(..., description="The flow id", alias="id"), 

50 db: PrefectDBInterface = Depends(provide_database_interface), 

51) -> None: 

52 """ 

53 Updates a flow. 

54 """ 

55 async with db.session_context(begin_transaction=True) as session: 

56 result = await models.flows.update_flow( 

57 session=session, flow=flow, flow_id=flow_id 

58 ) 

59 if not result: 

60 raise HTTPException( 

61 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found" 

62 ) 

63 

64 

65@router.post("/count") 1a

66async def count_flows( 1a

67 flows: schemas.filters.FlowFilter = None, 

68 flow_runs: schemas.filters.FlowRunFilter = None, 

69 task_runs: schemas.filters.TaskRunFilter = None, 

70 deployments: schemas.filters.DeploymentFilter = None, 

71 work_pools: schemas.filters.WorkPoolFilter = None, 

72 db: PrefectDBInterface = Depends(provide_database_interface), 

73) -> int: 

74 """ 

75 Count flows. 

76 """ 

77 async with db.session_context() as session: 

78 return await models.flows.count_flows( 

79 session=session, 

80 flow_filter=flows, 

81 flow_run_filter=flow_runs, 

82 task_run_filter=task_runs, 

83 deployment_filter=deployments, 

84 work_pool_filter=work_pools, 

85 ) 

86 

87 

88@router.get("/name/{name}") 1a

89async def read_flow_by_name( 1a

90 name: str = Path(..., description="The name of the flow"), 

91 db: PrefectDBInterface = Depends(provide_database_interface), 

92) -> schemas.core.Flow: 

93 """ 

94 Get a flow by name. 

95 """ 

96 async with db.session_context() as session: 

97 flow = await models.flows.read_flow_by_name(session=session, name=name) 

98 if not flow: 

99 raise HTTPException( 

100 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found" 

101 ) 

102 return flow 

103 

104 

105@router.get("/{id:uuid}") 1a

106async def read_flow( 1a

107 flow_id: UUID = Path(..., description="The flow id", alias="id"), 

108 db: PrefectDBInterface = Depends(provide_database_interface), 

109) -> schemas.core.Flow: 

110 """ 

111 Get a flow by id. 

112 """ 

113 async with db.session_context() as session: 

114 flow = await models.flows.read_flow(session=session, flow_id=flow_id) 

115 if not flow: 

116 raise HTTPException( 

117 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found" 

118 ) 

119 return flow 

120 

121 

122@router.post("/filter") 1a

123async def read_flows( 1a

124 limit: int = dependencies.LimitBody(), 

125 offset: int = Body(0, ge=0), 

126 flows: schemas.filters.FlowFilter = None, 

127 flow_runs: schemas.filters.FlowRunFilter = None, 

128 task_runs: schemas.filters.TaskRunFilter = None, 

129 deployments: schemas.filters.DeploymentFilter = None, 

130 work_pools: schemas.filters.WorkPoolFilter = None, 

131 sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC), 

132 db: PrefectDBInterface = Depends(provide_database_interface), 

133) -> List[schemas.core.Flow]: 

134 """ 

135 Query for flows. 

136 """ 

137 async with db.session_context() as session: 

138 return await models.flows.read_flows( 

139 session=session, 

140 flow_filter=flows, 

141 flow_run_filter=flow_runs, 

142 task_run_filter=task_runs, 

143 deployment_filter=deployments, 

144 work_pool_filter=work_pools, 

145 sort=sort, 

146 offset=offset, 

147 limit=limit, 

148 ) 

149 

150 

151@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

152async def delete_flow( 1a

153 flow_id: UUID = Path(..., description="The flow id", alias="id"), 

154 db: PrefectDBInterface = Depends(provide_database_interface), 

155) -> None: 

156 """ 

157 Delete a flow by id. 

158 """ 

159 async with db.session_context(begin_transaction=True) as session: 

160 result = await models.flows.delete_flow(session=session, flow_id=flow_id) 

161 if not result: 

162 raise HTTPException( 

163 status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found" 

164 ) 

165 

166 

167@router.post("/paginate") 1a

168async def paginate_flows( 1a

169 limit: int = dependencies.LimitBody(), 

170 page: int = Body(1, ge=1), 

171 flows: Optional[schemas.filters.FlowFilter] = None, 

172 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

173 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

174 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

175 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

176 sort: schemas.sorting.FlowSort = Body(schemas.sorting.FlowSort.NAME_ASC), 

177 db: PrefectDBInterface = Depends(provide_database_interface), 

178) -> FlowPaginationResponse: 

179 """ 

180 Pagination query for flows. 

181 """ 

182 offset = (page - 1) * limit 

183 

184 async with db.session_context() as session: 

185 results = await models.flows.read_flows( 

186 session=session, 

187 flow_filter=flows, 

188 flow_run_filter=flow_runs, 

189 task_run_filter=task_runs, 

190 deployment_filter=deployments, 

191 work_pool_filter=work_pools, 

192 sort=sort, 

193 offset=offset, 

194 limit=limit, 

195 ) 

196 

197 count = await models.flows.count_flows( 

198 session=session, 

199 flow_filter=flows, 

200 flow_run_filter=flow_runs, 

201 task_run_filter=task_runs, 

202 deployment_filter=deployments, 

203 work_pool_filter=work_pools, 

204 ) 

205 

206 return FlowPaginationResponse( 

207 results=results, 

208 count=count, 

209 limit=limit, 

210 pages=(count + limit - 1) // limit, 

211 page=page, 

212 )