Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flows/client.py: 25%

75 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3from typing import TYPE_CHECKING, Any 1a

4 

5from httpx import HTTPStatusError, RequestError 1a

6 

7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a

8from prefect.exceptions import ObjectNotFound 1a

9 

10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a

11 from uuid import UUID 

12 

13 from prefect.client.schemas.filters import ( 

14 DeploymentFilter, 

15 FlowFilter, 

16 FlowRunFilter, 

17 TaskRunFilter, 

18 WorkPoolFilter, 

19 WorkQueueFilter, 

20 ) 

21 from prefect.client.schemas.objects import ( 

22 Flow, 

23 ) 

24 from prefect.client.schemas.sorting import FlowSort 

25 from prefect.flows import Flow as FlowObject 

26 

27 

28class FlowClient(BaseClient): 1a

29 def create_flow(self, flow: "FlowObject[Any, Any]") -> "UUID": 1a

30 """ 

31 Create a flow in the Prefect API. 

32 

33 Args: 

34 flow: a `Flow` object 

35 

36 Raises: 

37 httpx.RequestError: if a flow was not created for any reason 

38 

39 Returns: 

40 the ID of the flow in the backend 

41 """ 

42 return self.create_flow_from_name(flow.name) 

43 

44 def create_flow_from_name(self, flow_name: str) -> "UUID": 1a

45 """ 

46 Create a flow in the Prefect API. 

47 

48 Args: 

49 flow_name: the name of the new flow 

50 

51 Raises: 

52 httpx.RequestError: if a flow was not created for any reason 

53 

54 Returns: 

55 the ID of the flow in the backend 

56 """ 

57 from prefect.client.schemas.actions import FlowCreate 

58 

59 flow_data = FlowCreate(name=flow_name) 

60 response = self.request( 

61 "POST", "/flows/", json=flow_data.model_dump(mode="json") 

62 ) 

63 

64 flow_id = response.json().get("id") 

65 if not flow_id: 

66 raise RequestError(f"Malformed response: {response}") 

67 

68 # Return the id of the created flow 

69 from uuid import UUID 

70 

71 return UUID(flow_id) 

72 

73 def read_flow(self, flow_id: "UUID") -> "Flow": 1a

74 """ 

75 Query the Prefect API for a flow by id. 

76 

77 Args: 

78 flow_id: the flow ID of interest 

79 

80 Returns: 

81 a Flow model representation of the flow 

82 """ 

83 response = self.request("GET", "/flows/{id}", path_params={"id": flow_id}) 

84 from prefect.client.schemas.objects import Flow 

85 

86 return Flow.model_validate(response.json()) 

87 

88 def delete_flow(self, flow_id: "UUID") -> None: 1a

89 """ 

90 Delete a flow by UUID. 

91 

92 Args: 

93 flow_id: ID of the flow to be deleted 

94 Raises: 

95 prefect.exceptions.ObjectNotFound: If request returns 404 

96 httpx.RequestError: If requests fail 

97 """ 

98 try: 

99 self.request("DELETE", "/flows/{id}", path_params={"id": flow_id}) 

100 except HTTPStatusError as e: 

101 if e.response.status_code == 404: 

102 raise ObjectNotFound(http_exc=e) from e 

103 else: 

104 raise 

105 

106 def read_flows( 1a

107 self, 

108 *, 

109 flow_filter: "FlowFilter | None" = None, 

110 flow_run_filter: "FlowRunFilter | None" = None, 

111 task_run_filter: "TaskRunFilter | None" = None, 

112 deployment_filter: "DeploymentFilter | None" = None, 

113 work_pool_filter: "WorkPoolFilter | None" = None, 

114 work_queue_filter: "WorkQueueFilter | None" = None, 

115 sort: "FlowSort | None" = None, 

116 limit: int | None = None, 

117 offset: int = 0, 

118 ) -> list["Flow"]: 

119 """ 

120 Query the Prefect API for flows. Only flows matching all criteria will 

121 be returned. 

122 

123 Args: 

124 flow_filter: filter criteria for flows 

125 flow_run_filter: filter criteria for flow runs 

126 task_run_filter: filter criteria for task runs 

127 deployment_filter: filter criteria for deployments 

128 work_pool_filter: filter criteria for work pools 

129 work_queue_filter: filter criteria for work pool queues 

130 sort: sort criteria for the flows 

131 limit: limit for the flow query 

132 offset: offset for the flow query 

133 

134 Returns: 

135 a list of Flow model representations of the flows 

136 """ 

137 body: dict[str, Any] = { 

138 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

139 "flow_runs": ( 

140 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

141 if flow_run_filter 

142 else None 

143 ), 

144 "task_runs": ( 

145 task_run_filter.model_dump(mode="json") if task_run_filter else None 

146 ), 

147 "deployments": ( 

148 deployment_filter.model_dump(mode="json") if deployment_filter else None 

149 ), 

150 "work_pools": ( 

151 work_pool_filter.model_dump(mode="json") if work_pool_filter else None 

152 ), 

153 "work_queues": ( 

154 work_queue_filter.model_dump(mode="json") if work_queue_filter else None 

155 ), 

156 "sort": sort, 

157 "limit": limit, 

158 "offset": offset, 

159 } 

160 

161 response = self.request("POST", "/flows/filter", json=body) 

162 from prefect.client.schemas.objects import Flow 

163 

164 return Flow.model_validate_list(response.json()) 

165 

166 def read_flow_by_name( 1a

167 self, 

168 flow_name: str, 

169 ) -> "Flow": 

170 """ 

171 Query the Prefect API for a flow by name. 

172 

173 Args: 

174 flow_name: the name of a flow 

175 

176 Returns: 

177 a fully hydrated Flow model 

178 """ 

179 response = self.request( 

180 "GET", "/flows/name/{name}", path_params={"name": flow_name} 

181 ) 

182 from prefect.client.schemas.objects import Flow 

183 

184 return Flow.model_validate(response.json()) 

185 

186 

187class FlowAsyncClient(BaseAsyncClient): 1a

188 async def create_flow(self, flow: "FlowObject[Any, Any]") -> "UUID": 1a

189 """ 

190 Create a flow in the Prefect API. 

191 

192 Args: 

193 flow: a `Flow` object 

194 

195 Raises: 

196 httpx.RequestError: if a flow was not created for any reason 

197 

198 Returns: 

199 the ID of the flow in the backend 

200 """ 

201 return await self.create_flow_from_name(flow.name) 

202 

203 async def create_flow_from_name(self, flow_name: str) -> "UUID": 1a

204 """ 

205 Create a flow in the Prefect API. 

206 

207 Args: 

208 flow_name: the name of the new flow 

209 

210 Raises: 

211 httpx.RequestError: if a flow was not created for any reason 

212 

213 Returns: 

214 the ID of the flow in the backend 

215 """ 

216 from prefect.client.schemas.actions import FlowCreate 

217 

218 flow_data = FlowCreate(name=flow_name) 

219 response = await self.request( 

220 "POST", "/flows/", json=flow_data.model_dump(mode="json") 

221 ) 

222 

223 flow_id = response.json().get("id") 

224 if not flow_id: 

225 raise RequestError(f"Malformed response: {response}") 

226 

227 # Return the id of the created flow 

228 from uuid import UUID 

229 

230 return UUID(flow_id) 

231 

232 async def read_flow(self, flow_id: "UUID") -> "Flow": 1a

233 """ 

234 Query the Prefect API for a flow by id. 

235 

236 Args: 

237 flow_id: the flow ID of interest 

238 

239 Returns: 

240 a Flow model representation of the flow 

241 """ 

242 response = await self.request("GET", "/flows/{id}", path_params={"id": flow_id}) 

243 from prefect.client.schemas.objects import Flow 

244 

245 return Flow.model_validate(response.json()) 

246 

247 async def delete_flow(self, flow_id: "UUID") -> None: 1a

248 """ 

249 Delete a flow by UUID. 

250 

251 Args: 

252 flow_id: ID of the flow to be deleted 

253 Raises: 

254 prefect.exceptions.ObjectNotFound: If request returns 404 

255 httpx.RequestError: If requests fail 

256 """ 

257 try: 

258 await self.request("DELETE", "/flows/{id}", path_params={"id": flow_id}) 

259 except HTTPStatusError as e: 

260 if e.response.status_code == 404: 

261 raise ObjectNotFound(http_exc=e) from e 

262 else: 

263 raise 

264 

265 async def read_flows( 1a

266 self, 

267 *, 

268 flow_filter: "FlowFilter | None" = None, 

269 flow_run_filter: "FlowRunFilter | None" = None, 

270 task_run_filter: "TaskRunFilter | None" = None, 

271 deployment_filter: "DeploymentFilter | None" = None, 

272 work_pool_filter: "WorkPoolFilter | None" = None, 

273 work_queue_filter: "WorkQueueFilter | None" = None, 

274 sort: "FlowSort | None" = None, 

275 limit: int | None = None, 

276 offset: int = 0, 

277 ) -> list["Flow"]: 

278 """ 

279 Query the Prefect API for flows. Only flows matching all criteria will 

280 be returned. 

281 

282 Args: 

283 flow_filter: filter criteria for flows 

284 flow_run_filter: filter criteria for flow runs 

285 task_run_filter: filter criteria for task runs 

286 deployment_filter: filter criteria for deployments 

287 work_pool_filter: filter criteria for work pools 

288 work_queue_filter: filter criteria for work pool queues 

289 sort: sort criteria for the flows 

290 limit: limit for the flow query 

291 offset: offset for the flow query 

292 

293 Returns: 

294 a list of Flow model representations of the flows 

295 """ 

296 body: dict[str, Any] = { 

297 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

298 "flow_runs": ( 

299 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

300 if flow_run_filter 

301 else None 

302 ), 

303 "task_runs": ( 

304 task_run_filter.model_dump(mode="json") if task_run_filter else None 

305 ), 

306 "deployments": ( 

307 deployment_filter.model_dump(mode="json") if deployment_filter else None 

308 ), 

309 "work_pools": ( 

310 work_pool_filter.model_dump(mode="json") if work_pool_filter else None 

311 ), 

312 "work_queues": ( 

313 work_queue_filter.model_dump(mode="json") if work_queue_filter else None 

314 ), 

315 "sort": sort, 

316 "limit": limit, 

317 "offset": offset, 

318 } 

319 

320 response = await self.request("POST", "/flows/filter", json=body) 

321 from prefect.client.schemas.objects import Flow 

322 

323 return Flow.model_validate_list(response.json()) 

324 

325 async def read_flow_by_name( 1a

326 self, 

327 flow_name: str, 

328 ) -> "Flow": 

329 """ 

330 Query the Prefect API for a flow by name. 

331 

332 Args: 

333 flow_name: the name of a flow 

334 

335 Returns: 

336 a fully hydrated Flow model 

337 """ 

338 response = await self.request( 

339 "GET", "/flows/name/{name}", path_params={"name": flow_name} 

340 ) 

341 from prefect.client.schemas.objects import Flow 

342 

343 return Flow.model_validate(response.json())