Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_flow_runs/client.py: 14%

257 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3from collections.abc import Iterable 1a

4from typing import TYPE_CHECKING, Any 1a

5 

6import httpx 1a

7from typing_extensions import TypeVar 1a

8 

9from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a

10from prefect.exceptions import ObjectNotFound 1a

11 

12T = TypeVar("T") 1a

13R = TypeVar("R", infer_variance=True) 1a

14 

15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a

16 from uuid import UUID 

17 

18 from prefect.client.schemas import FlowRun, OrchestrationResult 

19 from prefect.client.schemas.filters import ( 

20 DeploymentFilter, 

21 FlowFilter, 

22 FlowRunFilter, 

23 TaskRunFilter, 

24 WorkPoolFilter, 

25 WorkQueueFilter, 

26 ) 

27 from prefect.client.schemas.objects import ( 

28 FlowRunInput, 

29 FlowRunPolicy, 

30 ) 

31 from prefect.client.schemas.sorting import ( 

32 FlowRunSort, 

33 ) 

34 from prefect.flows import Flow as FlowObject 

35 from prefect.states import State 

36 from prefect.types import KeyValueLabelsField 

37 

38 

39class FlowRunClient(BaseClient): 1a

40 def create_flow_run( 1a

41 self, 

42 flow: "FlowObject[Any, R]", 

43 name: str | None = None, 

44 parameters: dict[str, Any] | None = None, 

45 context: dict[str, Any] | None = None, 

46 tags: "Iterable[str] | None" = None, 

47 parent_task_run_id: "UUID | None" = None, 

48 state: "State[R] | None" = None, 

49 work_pool_name: str | None = None, 

50 work_queue_name: str | None = None, 

51 job_variables: dict[str, Any] | None = None, 

52 ) -> "FlowRun": 

53 """ 

54 Create a flow run for a flow. 

55 

56 Args: 

57 flow: The flow model to create the flow run for 

58 name: An optional name for the flow run 

59 parameters: Parameter overrides for this flow run. 

60 context: Optional run context data 

61 tags: a list of tags to apply to this flow run 

62 parent_task_run_id: if a subflow run is being created, the placeholder task 

63 run identifier in the parent flow 

64 state: The initial state for the run. If not provided, defaults to 

65 `Pending`. 

66 work_pool_name: The name of the work pool to run the flow run in. 

67 work_queue_name: The name of the work queue to place the flow run in. 

68 job_variables: The job variables to use when setting up flow run infrastructure. 

69 

70 Raises: 

71 httpx.RequestError: if the Prefect API does not successfully create a run for any reason 

72 

73 Returns: 

74 The flow run model 

75 """ 

76 from prefect.client.schemas.actions import FlowCreate, FlowRunCreate 

77 from prefect.client.schemas.objects import Flow, FlowRun, FlowRunPolicy 

78 from prefect.states import Pending, to_state_create 

79 

80 parameters = parameters or {} 

81 context = context or {} 

82 

83 if state is None: 

84 state = Pending() 

85 

86 # Retrieve the flow id 

87 

88 flow_data = FlowCreate(name=flow.name) 

89 response = self.request( 

90 "POST", "/flows/", json=flow_data.model_dump(mode="json") 

91 ) 

92 flow_id = Flow.model_validate(response.json()).id 

93 

94 flow_run_create = FlowRunCreate( 

95 flow_id=flow_id, 

96 flow_version=flow.version, 

97 name=name, 

98 parameters=parameters, 

99 context=context, 

100 tags=list(tags or []), 

101 parent_task_run_id=parent_task_run_id, 

102 state=to_state_create(state), 

103 empirical_policy=FlowRunPolicy( 

104 retries=flow.retries, 

105 retry_delay=int(flow.retry_delay_seconds or 0), 

106 ), 

107 ) 

108 

109 if work_pool_name is not None: 

110 flow_run_create.work_pool_name = work_pool_name 

111 if work_queue_name is not None: 

112 flow_run_create.work_queue_name = work_queue_name 

113 if job_variables is not None: 

114 flow_run_create.job_variables = job_variables 

115 

116 flow_run_create_json = flow_run_create.model_dump( 

117 mode="json", exclude_unset=True 

118 ) 

119 response = self.request("POST", "/flow_runs/", json=flow_run_create_json) 

120 

121 flow_run = FlowRun.model_validate(response.json()) 

122 

123 # Restore the parameters to the local objects to retain expectations about 

124 # Python objects 

125 flow_run.parameters = parameters 

126 

127 return flow_run 

128 

129 def update_flow_run( 1a

130 self, 

131 flow_run_id: "UUID", 

132 flow_version: str | None = None, 

133 parameters: dict[str, Any] | None = None, 

134 name: str | None = None, 

135 tags: "Iterable[str] | None" = None, 

136 empirical_policy: "FlowRunPolicy | None" = None, 

137 infrastructure_pid: str | None = None, 

138 job_variables: dict[str, Any] | None = None, 

139 ) -> httpx.Response: 

140 """ 

141 Update a flow run's details. 

142 

143 Args: 

144 flow_run_id: The identifier for the flow run to update. 

145 flow_version: A new version string for the flow run. 

146 parameters: A dictionary of parameter values for the flow run. This will not 

147 be merged with any existing parameters. 

148 name: A new name for the flow run. 

149 empirical_policy: A new flow run orchestration policy. This will not be 

150 merged with any existing policy. 

151 tags: An iterable of new tags for the flow run. These will not be merged with 

152 any existing tags. 

153 infrastructure_pid: The id of flow run as returned by an 

154 infrastructure block. 

155 

156 Returns: 

157 an `httpx.Response` object from the PATCH request 

158 """ 

159 params: dict[str, Any] = {} 

160 if flow_version is not None: 

161 params["flow_version"] = flow_version 

162 if parameters is not None: 

163 params["parameters"] = parameters 

164 if name is not None: 

165 params["name"] = name 

166 if tags is not None: 

167 params["tags"] = tags 

168 if empirical_policy is not None: 

169 params["empirical_policy"] = empirical_policy 

170 if infrastructure_pid: 

171 params["infrastructure_pid"] = infrastructure_pid 

172 if job_variables is not None: 

173 params["job_variables"] = job_variables 

174 

175 from prefect.client.schemas.actions import FlowRunUpdate 

176 

177 flow_run_data = FlowRunUpdate(**params) 

178 

179 return self.request( 

180 "PATCH", 

181 "/flow_runs/{id}", 

182 path_params={"id": flow_run_id}, 

183 json=flow_run_data.model_dump(mode="json", exclude_unset=True), 

184 ) 

185 

186 def delete_flow_run( 1a

187 self, 

188 flow_run_id: "UUID", 

189 ) -> None: 

190 """ 

191 Delete a flow run by UUID. 

192 

193 Args: 

194 flow_run_id: The flow run UUID of interest. 

195 Raises: 

196 ObjectNotFound: If request returns 404 

197 httpx.RequestError: If requests fails 

198 """ 

199 try: 

200 self.request("DELETE", "/flow_runs/{id}", path_params={"id": flow_run_id}) 

201 except httpx.HTTPStatusError as e: 

202 if e.response.status_code == 404: 

203 raise ObjectNotFound(http_exc=e) from e 

204 else: 

205 raise 

206 

207 def read_flow_run(self, flow_run_id: "UUID") -> "FlowRun": 1a

208 """ 

209 Query the Prefect API for a flow run by id. 

210 

211 Args: 

212 flow_run_id: the flow run ID of interest 

213 

214 Returns: 

215 a Flow Run model representation of the flow run 

216 """ 

217 try: 

218 response = self.request( 

219 "GET", "/flow_runs/{id}", path_params={"id": flow_run_id} 

220 ) 

221 except httpx.HTTPStatusError as e: 

222 if e.response.status_code == 404: 

223 raise ObjectNotFound(http_exc=e) from e 

224 else: 

225 raise 

226 from prefect.client.schemas.objects import FlowRun 

227 

228 return FlowRun.model_validate(response.json()) 

229 

230 def resume_flow_run( 1a

231 self, flow_run_id: "UUID", run_input: dict[str, Any] | None = None 

232 ) -> "OrchestrationResult[Any]": 

233 """ 

234 Resumes a paused flow run. 

235 

236 Args: 

237 flow_run_id: the flow run ID of interest 

238 run_input: the input to resume the flow run with 

239 

240 Returns: 

241 an OrchestrationResult model representation of state orchestration output 

242 """ 

243 try: 

244 response = self.request( 

245 "POST", 

246 "/flow_runs/{id}/resume", 

247 path_params={"id": flow_run_id}, 

248 json={"run_input": run_input}, 

249 ) 

250 except httpx.HTTPStatusError: 

251 raise 

252 from prefect.client.schemas import OrchestrationResult 

253 

254 result: OrchestrationResult[Any] = OrchestrationResult.model_validate( 

255 response.json() 

256 ) 

257 return result 

258 

259 def read_flow_runs( 1a

260 self, 

261 *, 

262 flow_filter: "FlowFilter | None" = None, 

263 flow_run_filter: "FlowRunFilter | None" = None, 

264 task_run_filter: "TaskRunFilter | None" = None, 

265 deployment_filter: "DeploymentFilter | None" = None, 

266 work_pool_filter: "WorkPoolFilter | None" = None, 

267 work_queue_filter: "WorkQueueFilter | None" = None, 

268 sort: "FlowRunSort | None" = None, 

269 limit: int | None = None, 

270 offset: int = 0, 

271 ) -> "list[FlowRun]": 

272 """ 

273 Query the Prefect API for flow runs. Only flow runs matching all criteria will 

274 be returned. 

275 

276 Args: 

277 flow_filter: filter criteria for flows 

278 flow_run_filter: filter criteria for flow runs 

279 task_run_filter: filter criteria for task runs 

280 deployment_filter: filter criteria for deployments 

281 work_pool_filter: filter criteria for work pools 

282 work_queue_filter: filter criteria for work pool queues 

283 sort: sort criteria for the flow runs 

284 limit: limit for the flow run query 

285 offset: offset for the flow run query 

286 

287 Returns: 

288 a list of Flow Run model representations 

289 of the flow runs 

290 """ 

291 body: dict[str, Any] = { 

292 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

293 "flow_runs": ( 

294 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

295 if flow_run_filter 

296 else None 

297 ), 

298 "task_runs": ( 

299 task_run_filter.model_dump(mode="json") if task_run_filter else None 

300 ), 

301 "deployments": ( 

302 deployment_filter.model_dump(mode="json") if deployment_filter else None 

303 ), 

304 "work_pools": ( 

305 work_pool_filter.model_dump(mode="json") if work_pool_filter else None 

306 ), 

307 "work_pool_queues": ( 

308 work_queue_filter.model_dump(mode="json") if work_queue_filter else None 

309 ), 

310 "sort": sort, 

311 "limit": limit, 

312 "offset": offset, 

313 } 

314 

315 response = self.request("POST", "/flow_runs/filter", json=body) 

316 from prefect.client.schemas.objects import FlowRun 

317 

318 return FlowRun.model_validate_list(response.json()) 

319 

320 def count_flow_runs( 1a

321 self, 

322 *, 

323 flow_filter: "FlowFilter | None" = None, 

324 flow_run_filter: "FlowRunFilter | None" = None, 

325 task_run_filter: "TaskRunFilter | None" = None, 

326 deployment_filter: "DeploymentFilter | None" = None, 

327 work_pool_filter: "WorkPoolFilter | None" = None, 

328 work_queue_filter: "WorkQueueFilter | None" = None, 

329 ) -> int: 

330 """ 

331 Returns the count of flow runs matching all criteria for flow runs. 

332 

333 Args: 

334 flow_filter: filter criteria for flows 

335 flow_run_filter: filter criteria for flow runs 

336 task_run_filter: filter criteria for task runs 

337 deployment_filter: filter criteria for deployments 

338 work_pool_filter: filter criteria for work pools 

339 work_queue_filter: filter criteria for work pool queues 

340 

341 Returns: 

342 count of flow runs 

343 """ 

344 body: dict[str, Any] = { 

345 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

346 "flow_runs": ( 

347 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

348 if flow_run_filter 

349 else None 

350 ), 

351 "task_runs": ( 

352 task_run_filter.model_dump(mode="json") if task_run_filter else None 

353 ), 

354 "deployments": ( 

355 deployment_filter.model_dump(mode="json") if deployment_filter else None 

356 ), 

357 "work_pools": ( 

358 work_pool_filter.model_dump(mode="json") if work_pool_filter else None 

359 ), 

360 "work_pool_queues": ( 

361 work_queue_filter.model_dump(mode="json") if work_queue_filter else None 

362 ), 

363 } 

364 

365 response = self.request("POST", "/flow_runs/count", json=body) 

366 return response.json() 

367 

368 def set_flow_run_state( 1a

369 self, 

370 flow_run_id: "UUID | str", 

371 state: "State[T]", 

372 force: bool = False, 

373 ) -> "OrchestrationResult[T]": 

374 """ 

375 Set the state of a flow run. 

376 

377 Args: 

378 flow_run_id: the id of the flow run 

379 state: the state to set 

380 force: if True, disregard orchestration logic when setting the state, 

381 forcing the Prefect API to accept the state 

382 

383 Returns: 

384 an OrchestrationResult model representation of state orchestration output 

385 """ 

386 from uuid import UUID, uuid4 

387 

388 from prefect.states import to_state_create 

389 

390 flow_run_id = ( 

391 flow_run_id if isinstance(flow_run_id, UUID) else UUID(flow_run_id) 

392 ) 

393 state_create = to_state_create(state) 

394 state_create.state_details.flow_run_id = flow_run_id 

395 state_create.state_details.transition_id = uuid4() 

396 try: 

397 response = self.request( 

398 "POST", 

399 "/flow_runs/{id}/set_state", 

400 path_params={"id": flow_run_id}, 

401 json=dict( 

402 state=state_create.model_dump(mode="json", serialize_as_any=True), 

403 force=force, 

404 ), 

405 ) 

406 except httpx.HTTPStatusError as e: 

407 if e.response.status_code == 404: 

408 raise ObjectNotFound(http_exc=e) from e 

409 else: 

410 raise 

411 from prefect.client.schemas import OrchestrationResult 

412 

413 result: OrchestrationResult[T] = OrchestrationResult.model_validate( 

414 response.json() 

415 ) 

416 return result 

417 

418 def read_flow_run_states(self, flow_run_id: "UUID") -> "list[State]": 1a

419 """ 

420 Query for the states of a flow run 

421 

422 Args: 

423 flow_run_id: the id of the flow run 

424 

425 Returns: 

426 a list of State model representations 

427 of the flow run states 

428 """ 

429 response = self.request( 

430 "GET", "/flow_run_states/", params=dict(flow_run_id=str(flow_run_id)) 

431 ) 

432 from prefect.states import State 

433 

434 return State.model_validate_list(response.json()) 

435 

436 def set_flow_run_name(self, flow_run_id: "UUID", name: str) -> httpx.Response: 1a

437 from prefect.client.schemas.actions import FlowRunUpdate 

438 

439 flow_run_data = FlowRunUpdate(name=name) 

440 return self.request( 

441 "PATCH", 

442 "/flow_runs/{id}", 

443 path_params={"id": flow_run_id}, 

444 json=flow_run_data.model_dump(mode="json", exclude_unset=True), 

445 ) 

446 

447 def create_flow_run_input( 1a

448 self, flow_run_id: "UUID", key: str, value: str, sender: str | None = None 

449 ) -> None: 

450 """ 

451 Creates a flow run input. 

452 

453 Args: 

454 flow_run_id: The flow run id. 

455 key: The input key. 

456 value: The input value. 

457 sender: The sender of the input. 

458 """ 

459 

460 # Initialize the input to ensure that the key is valid. 

461 FlowRunInput(flow_run_id=flow_run_id, key=key, value=value) 

462 

463 response = self.request( 

464 "POST", 

465 "/flow_runs/{id}/input", 

466 path_params={"id": flow_run_id}, 

467 json={"key": key, "value": value, "sender": sender}, 

468 ) 

469 response.raise_for_status() 

470 

471 def filter_flow_run_input( 1a

472 self, flow_run_id: "UUID", key_prefix: str, limit: int, exclude_keys: "set[str]" 

473 ) -> "list[FlowRunInput]": 

474 response = self.request( 

475 "POST", 

476 "/flow_runs/{id}/input/filter", 

477 path_params={"id": flow_run_id}, 

478 json={ 

479 "prefix": key_prefix, 

480 "limit": limit, 

481 "exclude_keys": list(exclude_keys), 

482 }, 

483 ) 

484 response.raise_for_status() 

485 from prefect.client.schemas.objects import FlowRunInput 

486 

487 return FlowRunInput.model_validate_list(response.json()) 

488 

489 def read_flow_run_input(self, flow_run_id: "UUID", key: str) -> str: 1a

490 """ 

491 Reads a flow run input. 

492 

493 Args: 

494 flow_run_id: The flow run id. 

495 key: The input key. 

496 """ 

497 response = self.request( 

498 "GET", 

499 "/flow_runs/{id}/input/{key}", 

500 path_params={"id": flow_run_id, "key": key}, 

501 ) 

502 response.raise_for_status() 

503 return response.content.decode() 

504 

505 def delete_flow_run_input(self, flow_run_id: "UUID", key: str) -> None: 1a

506 """ 

507 Deletes a flow run input. 

508 

509 Args: 

510 flow_run_id: The flow run id. 

511 key: The input key. 

512 """ 

513 response = self.request( 

514 "DELETE", 

515 "/flow_runs/{id}/input/{key}", 

516 path_params={"id": flow_run_id, "key": key}, 

517 ) 

518 response.raise_for_status() 

519 

520 def update_flow_run_labels( 1a

521 self, flow_run_id: "UUID", labels: "KeyValueLabelsField" 

522 ) -> None: 

523 """ 

524 Updates the labels of a flow run. 

525 """ 

526 

527 response = self.request( 

528 "PATCH", 

529 "/flow_runs/{id}/labels", 

530 path_params={"id": flow_run_id}, 

531 json=labels, 

532 ) 

533 response.raise_for_status() 

534 

535 

536class FlowRunAsyncClient(BaseAsyncClient): 1a

537 async def create_flow_run( 1a

538 self, 

539 flow: "FlowObject[Any, R]", 

540 name: str | None = None, 

541 parameters: dict[str, Any] | None = None, 

542 context: dict[str, Any] | None = None, 

543 tags: "Iterable[str] | None" = None, 

544 parent_task_run_id: "UUID | None" = None, 

545 state: "State[R] | None" = None, 

546 work_pool_name: str | None = None, 

547 work_queue_name: str | None = None, 

548 job_variables: dict[str, Any] | None = None, 

549 ) -> "FlowRun": 

550 """ 

551 Create a flow run for a flow. 

552 

553 Args: 

554 flow: The flow model to create the flow run for 

555 name: An optional name for the flow run 

556 parameters: Parameter overrides for this flow run. 

557 context: Optional run context data 

558 tags: a list of tags to apply to this flow run 

559 parent_task_run_id: if a subflow run is being created, the placeholder task 

560 run identifier in the parent flow 

561 state: The initial state for the run. If not provided, defaults to 

562 `Pending`. 

563 work_pool_name: The name of the work pool to run the flow run in. 

564 work_queue_name: The name of the work queue to place the flow run in. 

565 job_variables: The job variables to use when setting up flow run infrastructure. 

566 

567 Raises: 

568 httpx.RequestError: if the Prefect API does not successfully create a run for any reason 

569 

570 Returns: 

571 The flow run model 

572 """ 

573 from prefect.client.schemas.actions import FlowCreate, FlowRunCreate 

574 from prefect.client.schemas.objects import Flow, FlowRun, FlowRunPolicy 

575 from prefect.states import Pending, to_state_create 

576 

577 parameters = parameters or {} 

578 context = context or {} 

579 

580 if state is None: 

581 state = Pending() 

582 

583 # Retrieve the flow id 

584 

585 flow_data = FlowCreate(name=flow.name) 

586 response = await self.request( 

587 "POST", "/flows/", json=flow_data.model_dump(mode="json") 

588 ) 

589 flow_id = Flow.model_validate(response.json()).id 

590 

591 flow_run_create = FlowRunCreate( 

592 flow_id=flow_id, 

593 flow_version=flow.version, 

594 name=name, 

595 parameters=parameters, 

596 context=context, 

597 tags=list(tags or []), 

598 parent_task_run_id=parent_task_run_id, 

599 state=to_state_create(state), 

600 empirical_policy=FlowRunPolicy( 

601 retries=flow.retries, 

602 retry_delay=int(flow.retry_delay_seconds or 0), 

603 ), 

604 ) 

605 

606 if work_pool_name is not None: 

607 flow_run_create.work_pool_name = work_pool_name 

608 if work_queue_name is not None: 

609 flow_run_create.work_queue_name = work_queue_name 

610 if job_variables is not None: 

611 flow_run_create.job_variables = job_variables 

612 

613 flow_run_create_json = flow_run_create.model_dump( 

614 mode="json", exclude_unset=True 

615 ) 

616 response = await self.request("POST", "/flow_runs/", json=flow_run_create_json) 

617 

618 flow_run = FlowRun.model_validate(response.json()) 

619 

620 # Restore the parameters to the local objects to retain expectations about 

621 # Python objects 

622 flow_run.parameters = parameters 

623 

624 return flow_run 

625 

626 async def update_flow_run( 1a

627 self, 

628 flow_run_id: "UUID", 

629 flow_version: str | None = None, 

630 parameters: dict[str, Any] | None = None, 

631 name: str | None = None, 

632 tags: "Iterable[str] | None" = None, 

633 empirical_policy: "FlowRunPolicy | None" = None, 

634 infrastructure_pid: str | None = None, 

635 job_variables: dict[str, Any] | None = None, 

636 ) -> httpx.Response: 

637 """ 

638 Update a flow run's details. 

639 

640 Args: 

641 flow_run_id: The identifier for the flow run to update. 

642 flow_version: A new version string for the flow run. 

643 parameters: A dictionary of parameter values for the flow run. This will not 

644 be merged with any existing parameters. 

645 name: A new name for the flow run. 

646 empirical_policy: A new flow run orchestration policy. This will not be 

647 merged with any existing policy. 

648 tags: An iterable of new tags for the flow run. These will not be merged with 

649 any existing tags. 

650 infrastructure_pid: The id of flow run as returned by an 

651 infrastructure block. 

652 

653 Returns: 

654 an `httpx.Response` object from the PATCH request 

655 """ 

656 params: dict[str, Any] = {} 

657 if flow_version is not None: 

658 params["flow_version"] = flow_version 

659 if parameters is not None: 

660 params["parameters"] = parameters 

661 if name is not None: 

662 params["name"] = name 

663 if tags is not None: 

664 params["tags"] = tags 

665 if empirical_policy is not None: 

666 params["empirical_policy"] = empirical_policy 

667 if infrastructure_pid: 

668 params["infrastructure_pid"] = infrastructure_pid 

669 if job_variables is not None: 

670 params["job_variables"] = job_variables 

671 from prefect.client.schemas.actions import FlowRunUpdate 

672 

673 flow_run_data = FlowRunUpdate(**params) 

674 

675 return await self.request( 

676 "PATCH", 

677 "/flow_runs/{id}", 

678 path_params={"id": flow_run_id}, 

679 json=flow_run_data.model_dump(mode="json", exclude_unset=True), 

680 ) 

681 

682 async def delete_flow_run( 1a

683 self, 

684 flow_run_id: "UUID", 

685 ) -> None: 

686 """ 

687 Delete a flow run by UUID. 

688 

689 Args: 

690 flow_run_id: The flow run UUID of interest. 

691 Raises: 

692 ObjectNotFound: If request returns 404 

693 httpx.RequestError: If requests fails 

694 """ 

695 try: 

696 await self.request( 

697 "DELETE", "/flow_runs/{id}", path_params={"id": flow_run_id} 

698 ) 

699 except httpx.HTTPStatusError as e: 

700 if e.response.status_code == 404: 

701 raise ObjectNotFound(http_exc=e) from e 

702 else: 

703 raise 

704 

705 async def read_flow_run(self, flow_run_id: "UUID") -> "FlowRun": 1a

706 """ 

707 Query the Prefect API for a flow run by id. 

708 

709 Args: 

710 flow_run_id: the flow run ID of interest 

711 

712 Returns: 

713 a Flow Run model representation of the flow run 

714 """ 

715 try: 

716 response = await self.request( 

717 "GET", "/flow_runs/{id}", path_params={"id": flow_run_id} 

718 ) 

719 except httpx.HTTPStatusError as e: 

720 if e.response.status_code == 404: 

721 raise ObjectNotFound(http_exc=e) from e 

722 else: 

723 raise 

724 from prefect.client.schemas.objects import FlowRun 

725 

726 return FlowRun.model_validate(response.json()) 

727 

728 async def resume_flow_run( 1a

729 self, flow_run_id: "UUID", run_input: dict[str, Any] | None = None 

730 ) -> "OrchestrationResult[Any]": 

731 """ 

732 Resumes a paused flow run. 

733 

734 Args: 

735 flow_run_id: the flow run ID of interest 

736 run_input: the input to resume the flow run with 

737 

738 Returns: 

739 an OrchestrationResult model representation of state orchestration output 

740 """ 

741 try: 

742 response = await self.request( 

743 "POST", 

744 "/flow_runs/{id}/resume", 

745 path_params={"id": flow_run_id}, 

746 json={"run_input": run_input}, 

747 ) 

748 except httpx.HTTPStatusError: 

749 raise 

750 from prefect.client.schemas import OrchestrationResult 

751 

752 result: OrchestrationResult[Any] = OrchestrationResult.model_validate( 

753 response.json() 

754 ) 

755 return result 

756 

757 async def read_flow_runs( 1a

758 self, 

759 *, 

760 flow_filter: "FlowFilter | None" = None, 

761 flow_run_filter: "FlowRunFilter | None" = None, 

762 task_run_filter: "TaskRunFilter | None" = None, 

763 deployment_filter: "DeploymentFilter | None" = None, 

764 work_pool_filter: "WorkPoolFilter | None" = None, 

765 work_queue_filter: "WorkQueueFilter | None" = None, 

766 sort: "FlowRunSort | None" = None, 

767 limit: int | None = None, 

768 offset: int = 0, 

769 ) -> "list[FlowRun]": 

770 """ 

771 Query the Prefect API for flow runs. Only flow runs matching all criteria will 

772 be returned. 

773 

774 Args: 

775 flow_filter: filter criteria for flows 

776 flow_run_filter: filter criteria for flow runs 

777 task_run_filter: filter criteria for task runs 

778 deployment_filter: filter criteria for deployments 

779 work_pool_filter: filter criteria for work pools 

780 work_queue_filter: filter criteria for work pool queues 

781 sort: sort criteria for the flow runs 

782 limit: limit for the flow run query 

783 offset: offset for the flow run query 

784 

785 Returns: 

786 a list of Flow Run model representations 

787 of the flow runs 

788 """ 

789 body: dict[str, Any] = { 

790 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

791 "flow_runs": ( 

792 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

793 if flow_run_filter 

794 else None 

795 ), 

796 "task_runs": ( 

797 task_run_filter.model_dump(mode="json") if task_run_filter else None 

798 ), 

799 "deployments": ( 

800 deployment_filter.model_dump(mode="json") if deployment_filter else None 

801 ), 

802 "work_pools": ( 

803 work_pool_filter.model_dump(mode="json") if work_pool_filter else None 

804 ), 

805 "work_pool_queues": ( 

806 work_queue_filter.model_dump(mode="json") if work_queue_filter else None 

807 ), 

808 "sort": sort, 

809 "limit": limit, 

810 "offset": offset, 

811 } 

812 

813 response = await self.request("POST", "/flow_runs/filter", json=body) 

814 from prefect.client.schemas.objects import FlowRun 

815 

816 return FlowRun.model_validate_list(response.json()) 

817 

818 async def count_flow_runs( 1a

819 self, 

820 *, 

821 flow_filter: "FlowFilter | None" = None, 

822 flow_run_filter: "FlowRunFilter | None" = None, 

823 task_run_filter: "TaskRunFilter | None" = None, 

824 deployment_filter: "DeploymentFilter | None" = None, 

825 work_pool_filter: "WorkPoolFilter | None" = None, 

826 work_queue_filter: "WorkQueueFilter | None" = None, 

827 ) -> int: 

828 """ 

829 Returns the count of flow runs matching all criteria for flow runs. 

830 

831 Args: 

832 flow_filter: filter criteria for flows 

833 flow_run_filter: filter criteria for flow runs 

834 task_run_filter: filter criteria for task runs 

835 deployment_filter: filter criteria for deployments 

836 work_pool_filter: filter criteria for work pools 

837 work_queue_filter: filter criteria for work pool queues 

838 

839 Returns: 

840 count of flow runs 

841 """ 

842 body: dict[str, Any] = { 

843 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

844 "flow_runs": ( 

845 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

846 if flow_run_filter 

847 else None 

848 ), 

849 "task_runs": ( 

850 task_run_filter.model_dump(mode="json") if task_run_filter else None 

851 ), 

852 "deployments": ( 

853 deployment_filter.model_dump(mode="json") if deployment_filter else None 

854 ), 

855 "work_pools": ( 

856 work_pool_filter.model_dump(mode="json") if work_pool_filter else None 

857 ), 

858 "work_pool_queues": ( 

859 work_queue_filter.model_dump(mode="json") if work_queue_filter else None 

860 ), 

861 } 

862 

863 response = await self.request("POST", "/flow_runs/count", json=body) 

864 return response.json() 

865 

866 async def set_flow_run_state( 1a

867 self, 

868 flow_run_id: "UUID | str", 

869 state: "State[T]", 

870 force: bool = False, 

871 ) -> "OrchestrationResult[T]": 

872 """ 

873 Set the state of a flow run. 

874 

875 Args: 

876 flow_run_id: the id of the flow run 

877 state: the state to set 

878 force: if True, disregard orchestration logic when setting the state, 

879 forcing the Prefect API to accept the state 

880 

881 Returns: 

882 an OrchestrationResult model representation of state orchestration output 

883 """ 

884 from uuid import UUID, uuid4 

885 

886 from prefect.states import to_state_create 

887 

888 flow_run_id = ( 

889 flow_run_id if isinstance(flow_run_id, UUID) else UUID(flow_run_id) 

890 ) 

891 state_create = to_state_create(state) 

892 state_create.state_details.flow_run_id = flow_run_id 

893 state_create.state_details.transition_id = uuid4() 

894 try: 

895 response = await self.request( 

896 "POST", 

897 "/flow_runs/{id}/set_state", 

898 path_params={"id": flow_run_id}, 

899 json=dict( 

900 state=state_create.model_dump(mode="json", serialize_as_any=True), 

901 force=force, 

902 ), 

903 ) 

904 except httpx.HTTPStatusError as e: 

905 if e.response.status_code == 404: 

906 raise ObjectNotFound(http_exc=e) from e 

907 else: 

908 raise 

909 from prefect.client.schemas import OrchestrationResult 

910 

911 result: OrchestrationResult[T] = OrchestrationResult.model_validate( 

912 response.json() 

913 ) 

914 return result 

915 

916 async def read_flow_run_states(self, flow_run_id: "UUID") -> "list[State]": 1a

917 """ 

918 Query for the states of a flow run 

919 

920 Args: 

921 flow_run_id: the id of the flow run 

922 

923 Returns: 

924 a list of State model representations 

925 of the flow run states 

926 """ 

927 response = await self.request( 

928 "GET", "/flow_run_states/", params=dict(flow_run_id=str(flow_run_id)) 

929 ) 

930 from prefect.states import State 

931 

932 return State.model_validate_list(response.json()) 

933 

934 async def set_flow_run_name(self, flow_run_id: "UUID", name: str) -> httpx.Response: 1a

935 from prefect.client.schemas.actions import FlowRunUpdate 

936 

937 flow_run_data = FlowRunUpdate(name=name) 

938 return await self.request( 

939 "PATCH", 

940 "/flow_runs/{id}", 

941 path_params={"id": flow_run_id}, 

942 json=flow_run_data.model_dump(mode="json", exclude_unset=True), 

943 ) 

944 

945 async def create_flow_run_input( 1a

946 self, flow_run_id: "UUID", key: str, value: str, sender: str | None = None 

947 ) -> None: 

948 """ 

949 Creates a flow run input. 

950 

951 Args: 

952 flow_run_id: The flow run id. 

953 key: The input key. 

954 value: The input value. 

955 sender: The sender of the input. 

956 """ 

957 

958 # Initialize the input to ensure that the key is valid. 

959 from prefect.client.schemas.objects import FlowRunInput 

960 

961 FlowRunInput(flow_run_id=flow_run_id, key=key, value=value) 

962 

963 response = await self.request( 

964 "POST", 

965 "/flow_runs/{id}/input", 

966 path_params={"id": flow_run_id}, 

967 json={"key": key, "value": value, "sender": sender}, 

968 ) 

969 response.raise_for_status() 

970 

971 async def filter_flow_run_input( 1a

972 self, flow_run_id: "UUID", key_prefix: str, limit: int, exclude_keys: "set[str]" 

973 ) -> "list[FlowRunInput]": 

974 response = await self.request( 

975 "POST", 

976 "/flow_runs/{id}/input/filter", 

977 path_params={"id": flow_run_id}, 

978 json={ 

979 "prefix": key_prefix, 

980 "limit": limit, 

981 "exclude_keys": list(exclude_keys), 

982 }, 

983 ) 

984 response.raise_for_status() 

985 from prefect.client.schemas.objects import FlowRunInput 

986 

987 return FlowRunInput.model_validate_list(response.json()) 

988 

989 async def read_flow_run_input(self, flow_run_id: "UUID", key: str) -> str: 1a

990 """ 

991 Reads a flow run input. 

992 

993 Args: 

994 flow_run_id: The flow run id. 

995 key: The input key. 

996 """ 

997 response = await self.request( 

998 "GET", 

999 "/flow_runs/{id}/input/{key}", 

1000 path_params={"id": flow_run_id, "key": key}, 

1001 ) 

1002 response.raise_for_status() 

1003 return response.content.decode() 

1004 

1005 async def delete_flow_run_input(self, flow_run_id: "UUID", key: str) -> None: 1a

1006 """ 

1007 Deletes a flow run input. 

1008 

1009 Args: 

1010 flow_run_id: The flow run id. 

1011 key: The input key. 

1012 """ 

1013 response = await self.request( 

1014 "DELETE", 

1015 "/flow_runs/{id}/input/{key}", 

1016 path_params={"id": flow_run_id, "key": key}, 

1017 ) 

1018 response.raise_for_status() 

1019 

1020 async def update_flow_run_labels( 1a

1021 self, flow_run_id: "UUID", labels: "KeyValueLabelsField" 

1022 ) -> None: 

1023 """ 

1024 Updates the labels of a flow run. 

1025 """ 

1026 

1027 response = await self.request( 

1028 "PATCH", 

1029 "/flow_runs/{id}/labels", 

1030 path_params={"id": flow_run_id}, 

1031 json=labels, 

1032 ) 

1033 response.raise_for_status()