Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/flow_runs.py: 24%

251 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Routes for interacting with flow run objects. 

3""" 

4 

5import csv 1a

6import datetime 1a

7import io 1a

8from typing import TYPE_CHECKING, Any, Dict, List, Optional 1a

9from uuid import UUID 1a

10 

11import orjson 1a

12import sqlalchemy as sa 1a

13from docket import Depends as DocketDepends 1a

14from docket import Retry 1a

15from fastapi import ( 1a

16 Body, 

17 Depends, 

18 HTTPException, 

19 Path, 

20 Query, 

21 Response, 

22) 

23from fastapi.encoders import jsonable_encoder 1a

24from fastapi.responses import ORJSONResponse, PlainTextResponse, StreamingResponse 1a

25from sqlalchemy.exc import IntegrityError 1a

26 

27import prefect.server.api.dependencies as dependencies 1a

28import prefect.server.models as models 1a

29import prefect.server.schemas as schemas 1a

30from prefect._internal.compatibility.starlette import status 1a

31from prefect.logging import get_logger 1a

32from prefect.server.api.run_history import run_history 1a

33from prefect.server.api.validation import validate_job_variables_for_deployment_flow_run 1a

34from prefect.server.api.workers import WorkerLookups 1a

35from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

36from prefect.server.exceptions import FlowRunGraphTooLarge 1a

37from prefect.server.models.flow_runs import ( 1a

38 DependencyResult, 

39 read_flow_run_graph, 

40) 

41from prefect.server.orchestration import dependencies as orchestration_dependencies 1a

42from prefect.server.orchestration.policies import ( 1a

43 FlowRunOrchestrationPolicy, 

44 TaskRunOrchestrationPolicy, 

45) 

46from prefect.server.schemas.graph import Graph 1a

47from prefect.server.schemas.responses import ( 1a

48 FlowRunPaginationResponse, 

49 OrchestrationResult, 

50) 

51from prefect.server.utilities.server import PrefectRouter 1a

52from prefect.types import DateTime 1a

53from prefect.types._datetime import earliest_possible_datetime, now 1a

54from prefect.utilities import schema_tools 1a

55 

56if TYPE_CHECKING: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true1a

57 import logging 

58 

59logger: "logging.Logger" = get_logger("server.api") 1a

60 

61router: PrefectRouter = PrefectRouter(prefix="/flow_runs", tags=["Flow Runs"]) 1a

62 

63 

64@router.post("/") 1a

65async def create_flow_run( 1a

66 flow_run: schemas.actions.FlowRunCreate, 

67 db: PrefectDBInterface = Depends(provide_database_interface), 

68 response: Response = None, # type: ignore 

69 created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by), 

70 orchestration_parameters: Dict[str, Any] = Depends( 

71 orchestration_dependencies.provide_flow_orchestration_parameters 

72 ), 

73 api_version: str = Depends(dependencies.provide_request_api_version), 

74 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

75) -> schemas.responses.FlowRunResponse: 

76 """ 

77 Create a flow run. If a flow run with the same flow_id and 

78 idempotency key already exists, the existing flow run will be returned. 

79 

80 If no state is provided, the flow run will be created in a PENDING state. 

81 

82 For more information, see https://docs.prefect.io/v3/concepts/flows. 

83 """ 

84 # hydrate the input model into a full flow run / state model 

85 flow_run_object = schemas.core.FlowRun( 

86 **flow_run.model_dump(), created_by=created_by 

87 ) 

88 

89 # pass the request version to the orchestration engine to support compatibility code 

90 orchestration_parameters.update({"api-version": api_version}) 

91 

92 if not flow_run_object.state: 

93 flow_run_object.state = schemas.states.Pending() 

94 

95 right_now = now("UTC") 

96 

97 async with db.session_context(begin_transaction=True) as session: 

98 if flow_run.work_pool_name: 

99 if flow_run.work_queue_name: 

100 work_queue_id = await worker_lookups._get_work_queue_id_from_name( 

101 session=session, 

102 work_pool_name=flow_run.work_pool_name, 

103 work_queue_name=flow_run.work_queue_name, 

104 ) 

105 else: 

106 work_queue_id = ( 

107 await worker_lookups._get_default_work_queue_id_from_work_pool_name( 

108 session=session, 

109 work_pool_name=flow_run.work_pool_name, 

110 ) 

111 ) 

112 else: 

113 work_queue_id = None 

114 

115 flow_run_object.work_queue_id = work_queue_id 

116 

117 model = await models.flow_runs.create_flow_run( 

118 session=session, 

119 flow_run=flow_run_object, 

120 orchestration_parameters=orchestration_parameters, 

121 ) 

122 if model.created >= right_now: 

123 response.status_code = status.HTTP_201_CREATED 

124 

125 return schemas.responses.FlowRunResponse.model_validate( 

126 model, from_attributes=True 

127 ) 

128 

129 

130@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

131async def update_flow_run( 1a

132 flow_run: schemas.actions.FlowRunUpdate, 

133 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

134 db: PrefectDBInterface = Depends(provide_database_interface), 

135) -> None: 

136 """ 

137 Updates a flow run. 

138 """ 

139 async with db.session_context(begin_transaction=True) as session: 

140 if flow_run.job_variables is not None: 

141 this_run = await models.flow_runs.read_flow_run( 

142 session, flow_run_id=flow_run_id 

143 ) 

144 if this_run is None: 

145 raise HTTPException( 

146 status.HTTP_404_NOT_FOUND, detail="Flow run not found" 

147 ) 

148 if not this_run.state: 

149 raise HTTPException( 

150 status.HTTP_400_BAD_REQUEST, 

151 detail="Flow run state is required to update job variables but none exists", 

152 ) 

153 if this_run.state.type != schemas.states.StateType.SCHEDULED: 

154 raise HTTPException( 

155 status_code=status.HTTP_400_BAD_REQUEST, 

156 detail=f"Job variables for a flow run in state {this_run.state.type.name} cannot be updated", 

157 ) 

158 if this_run.deployment_id is None: 

159 raise HTTPException( 

160 status_code=status.HTTP_400_BAD_REQUEST, 

161 detail="A deployment for the flow run could not be found", 

162 ) 

163 

164 deployment = await models.deployments.read_deployment( 

165 session=session, deployment_id=this_run.deployment_id 

166 ) 

167 if deployment is None: 

168 raise HTTPException( 

169 status_code=status.HTTP_400_BAD_REQUEST, 

170 detail="A deployment for the flow run could not be found", 

171 ) 

172 

173 await validate_job_variables_for_deployment_flow_run( 

174 session, deployment, flow_run 

175 ) 

176 

177 result = await models.flow_runs.update_flow_run( 

178 session=session, flow_run=flow_run, flow_run_id=flow_run_id 

179 ) 

180 if not result: 

181 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow run not found") 

182 

183 

184@router.post("/count") 1a

185async def count_flow_runs( 1a

186 flows: Optional[schemas.filters.FlowFilter] = None, 

187 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

188 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

189 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

190 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

191 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

192 db: PrefectDBInterface = Depends(provide_database_interface), 

193) -> int: 

194 """ 

195 Query for flow runs. 

196 """ 

197 async with db.session_context() as session: 

198 return await models.flow_runs.count_flow_runs( 

199 session=session, 

200 flow_filter=flows, 

201 flow_run_filter=flow_runs, 

202 task_run_filter=task_runs, 

203 deployment_filter=deployments, 

204 work_pool_filter=work_pools, 

205 work_queue_filter=work_pool_queues, 

206 ) 

207 

208 

209@router.post("/lateness") 1a

210async def average_flow_run_lateness( 1a

211 flows: Optional[schemas.filters.FlowFilter] = None, 

212 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

213 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

214 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

215 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

216 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

217 db: PrefectDBInterface = Depends(provide_database_interface), 

218) -> Optional[float]: 

219 """ 

220 Query for average flow-run lateness in seconds. 

221 """ 

222 async with db.session_context() as session: 

223 if db.dialect.name == "sqlite": 

224 # Since we want an _average_ of the lateness we're unable to use 

225 # the existing FlowRun.expected_start_time_delta property as it 

226 # returns a timedelta and SQLite is unable to properly deal with it 

227 # and always returns 1970.0 as the average. This copies the same 

228 # logic but ensures that it returns the number of seconds instead 

229 # so it's compatible with SQLite. 

230 base_query = sa.case( 

231 ( 

232 db.FlowRun.start_time > db.FlowRun.expected_start_time, 

233 sa.func.strftime("%s", db.FlowRun.start_time) 

234 - sa.func.strftime("%s", db.FlowRun.expected_start_time), 

235 ), 

236 ( 

237 db.FlowRun.start_time.is_(None) 

238 & db.FlowRun.state_type.notin_(schemas.states.TERMINAL_STATES) 

239 & (db.FlowRun.expected_start_time < sa.func.datetime("now")), 

240 sa.func.strftime("%s", sa.func.datetime("now")) 

241 - sa.func.strftime("%s", db.FlowRun.expected_start_time), 

242 ), 

243 else_=0, 

244 ) 

245 else: 

246 base_query = db.FlowRun.estimated_start_time_delta 

247 

248 query = await models.flow_runs._apply_flow_run_filters( 

249 db, 

250 sa.select(sa.func.avg(base_query)), 

251 flow_filter=flows, 

252 flow_run_filter=flow_runs, 

253 task_run_filter=task_runs, 

254 deployment_filter=deployments, 

255 work_pool_filter=work_pools, 

256 work_queue_filter=work_pool_queues, 

257 ) 

258 result = await session.execute(query) 

259 

260 avg_lateness = result.scalar() 

261 

262 if avg_lateness is None: 

263 return None 

264 elif isinstance(avg_lateness, datetime.timedelta): 

265 return avg_lateness.total_seconds() 

266 else: 

267 return avg_lateness 

268 

269 

270@router.post("/history") 1a

271async def flow_run_history( 1a

272 history_start: DateTime = Body(..., description="The history's start time."), 

273 history_end: DateTime = Body(..., description="The history's end time."), 

274 # Workaround for the fact that FastAPI does not let us configure ser_json_timedelta 

275 # to represent timedeltas as floats in JSON. 

276 history_interval: float = Body( 

277 ..., 

278 description=( 

279 "The size of each history interval, in seconds. Must be at least 1 second." 

280 ), 

281 json_schema_extra={"format": "time-delta"}, 

282 alias="history_interval_seconds", 

283 ), 

284 flows: Optional[schemas.filters.FlowFilter] = None, 

285 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

286 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

287 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

288 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

289 work_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

290 db: PrefectDBInterface = Depends(provide_database_interface), 

291) -> List[schemas.responses.HistoryResponse]: 

292 """ 

293 Query for flow run history data across a given range and interval. 

294 """ 

295 if isinstance(history_interval, float): 

296 history_interval = datetime.timedelta(seconds=history_interval) 

297 

298 assert isinstance(history_interval, datetime.timedelta) 

299 if history_interval < datetime.timedelta(seconds=1): 

300 raise HTTPException( 

301 status.HTTP_422_UNPROCESSABLE_ENTITY, 

302 detail="History interval must not be less than 1 second.", 

303 ) 

304 

305 async with db.session_context() as session: 

306 return await run_history( 

307 session=session, 

308 run_type="flow_run", 

309 history_start=history_start, 

310 history_end=history_end, 

311 history_interval=history_interval, 

312 flows=flows, 

313 flow_runs=flow_runs, 

314 task_runs=task_runs, 

315 deployments=deployments, 

316 work_pools=work_pools, 

317 work_queues=work_queues, 

318 ) 

319 

320 

321@router.get("/{id:uuid}") 1a

322async def read_flow_run( 1a

323 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

324 db: PrefectDBInterface = Depends(provide_database_interface), 

325) -> schemas.responses.FlowRunResponse: 

326 """ 

327 Get a flow run by id. 

328 """ 

329 async with db.session_context() as session: 

330 flow_run = await models.flow_runs.read_flow_run( 

331 session=session, flow_run_id=flow_run_id 

332 ) 

333 if not flow_run: 

334 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow run not found") 

335 return schemas.responses.FlowRunResponse.model_validate( 

336 flow_run, from_attributes=True 

337 ) 

338 

339 

340@router.get("/{id:uuid}/graph", tags=["Flow Run Graph"]) 1a

341async def read_flow_run_graph_v1( 1a

342 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

343 db: PrefectDBInterface = Depends(provide_database_interface), 

344) -> List[DependencyResult]: 

345 """ 

346 Get a task run dependency map for a given flow run. 

347 """ 

348 async with db.session_context() as session: 

349 return await models.flow_runs.read_task_run_dependencies( 

350 session=session, flow_run_id=flow_run_id 

351 ) 

352 

353 

354@router.get("/{id:uuid}/graph-v2", tags=["Flow Run Graph"]) 1a

355async def read_flow_run_graph_v2( 1a

356 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

357 since: datetime.datetime = Query( 

358 default=jsonable_encoder(earliest_possible_datetime()), 

359 description="Only include runs that start or end after this time.", 

360 ), 

361 db: PrefectDBInterface = Depends(provide_database_interface), 

362) -> Graph: 

363 """ 

364 Get a graph of the tasks and subflow runs for the given flow run 

365 """ 

366 async with db.session_context() as session: 

367 try: 

368 return await read_flow_run_graph( 

369 session=session, 

370 flow_run_id=flow_run_id, 

371 since=since, 

372 ) 

373 except FlowRunGraphTooLarge as e: 

374 raise HTTPException( 

375 status_code=status.HTTP_400_BAD_REQUEST, 

376 detail=str(e), 

377 ) 

378 

379 

380@router.post("/{id:uuid}/resume") 1a

381async def resume_flow_run( 1a

382 response: Response, 

383 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

384 db: PrefectDBInterface = Depends(provide_database_interface), 

385 run_input: Optional[dict[str, Any]] = Body(default=None, embed=True), 

386 flow_policy: type[FlowRunOrchestrationPolicy] = Depends( 

387 orchestration_dependencies.provide_flow_policy 

388 ), 

389 task_policy: type[TaskRunOrchestrationPolicy] = Depends( 

390 orchestration_dependencies.provide_task_policy 

391 ), 

392 orchestration_parameters: Dict[str, Any] = Depends( 

393 orchestration_dependencies.provide_flow_orchestration_parameters 

394 ), 

395 api_version: str = Depends(dependencies.provide_request_api_version), 

396 client_version: Optional[str] = Depends(dependencies.get_prefect_client_version), 

397) -> OrchestrationResult: 

398 """ 

399 Resume a paused flow run. 

400 """ 

401 right_now = now("UTC") 

402 

403 async with db.session_context(begin_transaction=True) as session: 

404 flow_run = await models.flow_runs.read_flow_run(session, flow_run_id) 

405 state = flow_run.state 

406 

407 if state is None or state.type != schemas.states.StateType.PAUSED: 

408 result = OrchestrationResult( 

409 state=None, 

410 status=schemas.responses.SetStateStatus.ABORT, 

411 details=schemas.responses.StateAbortDetails( 

412 reason="Cannot resume a flow run that is not paused." 

413 ), 

414 ) 

415 return result 

416 

417 orchestration_parameters.update({"api-version": api_version}) 

418 

419 keyset = state.state_details.run_input_keyset 

420 

421 if keyset: 

422 run_input = run_input or {} 

423 

424 try: 

425 hydration_context = await schema_tools.HydrationContext.build( 

426 session=session, 

427 raise_on_error=True, 

428 render_jinja=True, 

429 render_workspace_variables=True, 

430 ) 

431 run_input = schema_tools.hydrate(run_input, hydration_context) or {} 

432 except schema_tools.HydrationError as exc: 

433 return OrchestrationResult( 

434 state=state, 

435 status=schemas.responses.SetStateStatus.REJECT, 

436 details=schemas.responses.StateAbortDetails( 

437 reason=f"Error hydrating run input: {exc}", 

438 ), 

439 ) 

440 

441 schema_json = await models.flow_run_input.read_flow_run_input( 

442 session=session, flow_run_id=flow_run.id, key=keyset["schema"] 

443 ) 

444 

445 if schema_json is None: 

446 return OrchestrationResult( 

447 state=state, 

448 status=schemas.responses.SetStateStatus.REJECT, 

449 details=schemas.responses.StateAbortDetails( 

450 reason="Run input schema not found." 

451 ), 

452 ) 

453 

454 try: 

455 schema = orjson.loads(schema_json.value) 

456 except orjson.JSONDecodeError: 

457 return OrchestrationResult( 

458 state=state, 

459 status=schemas.responses.SetStateStatus.REJECT, 

460 details=schemas.responses.StateAbortDetails( 

461 reason="Run input schema is not valid JSON." 

462 ), 

463 ) 

464 

465 try: 

466 schema_tools.validate(run_input, schema, raise_on_error=True) 

467 except schema_tools.ValidationError as exc: 

468 return OrchestrationResult( 

469 state=state, 

470 status=schemas.responses.SetStateStatus.REJECT, 

471 details=schemas.responses.StateAbortDetails( 

472 reason=f"Reason: {exc}" 

473 ), 

474 ) 

475 except schema_tools.CircularSchemaRefError: 

476 return OrchestrationResult( 

477 state=state, 

478 status=schemas.responses.SetStateStatus.REJECT, 

479 details=schemas.responses.StateAbortDetails( 

480 reason="Invalid schema: Unable to validate schema with circular references.", 

481 ), 

482 ) 

483 

484 if state.state_details.pause_reschedule: 

485 orchestration_result = await models.flow_runs.set_flow_run_state( 

486 session=session, 

487 flow_run_id=flow_run_id, 

488 state=schemas.states.Scheduled( 

489 name="Resuming", scheduled_time=now("UTC") 

490 ), 

491 flow_policy=flow_policy, 

492 orchestration_parameters=orchestration_parameters, 

493 client_version=client_version, 

494 ) 

495 else: 

496 orchestration_result = await models.flow_runs.set_flow_run_state( 

497 session=session, 

498 flow_run_id=flow_run_id, 

499 state=schemas.states.Running(), 

500 flow_policy=flow_policy, 

501 orchestration_parameters=orchestration_parameters, 

502 client_version=client_version, 

503 ) 

504 

505 if ( 

506 keyset 

507 and run_input 

508 and orchestration_result.status == schemas.responses.SetStateStatus.ACCEPT 

509 ): 

510 # The state change is accepted, go ahead and store the validated 

511 # run input. 

512 await models.flow_run_input.create_flow_run_input( 

513 session=session, 

514 flow_run_input=schemas.core.FlowRunInput( 

515 flow_run_id=flow_run_id, 

516 key=keyset["response"], 

517 value=orjson.dumps(run_input).decode("utf-8"), 

518 ), 

519 ) 

520 

521 # set the 201 if a new state was created 

522 if ( 

523 orchestration_result.state 

524 and orchestration_result.state.timestamp >= right_now 

525 ): 

526 response.status_code = status.HTTP_201_CREATED 

527 else: 

528 response.status_code = status.HTTP_200_OK 

529 

530 return orchestration_result 

531 

532 

533@router.post("/filter", response_class=ORJSONResponse) 1a

534async def read_flow_runs( 1a

535 sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC), 

536 limit: int = dependencies.LimitBody(), 

537 offset: int = Body(0, ge=0), 

538 flows: Optional[schemas.filters.FlowFilter] = None, 

539 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

540 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

541 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

542 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

543 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

544 db: PrefectDBInterface = Depends(provide_database_interface), 

545) -> List[schemas.responses.FlowRunResponse]: 

546 """ 

547 Query for flow runs. 

548 """ 

549 async with db.session_context() as session: 

550 db_flow_runs = await models.flow_runs.read_flow_runs( 

551 session=session, 

552 flow_filter=flows, 

553 flow_run_filter=flow_runs, 

554 task_run_filter=task_runs, 

555 deployment_filter=deployments, 

556 work_pool_filter=work_pools, 

557 work_queue_filter=work_pool_queues, 

558 offset=offset, 

559 limit=limit, 

560 sort=sort, 

561 ) 

562 

563 # Instead of relying on fastapi.encoders.jsonable_encoder to convert the 

564 # response to JSON, we do so more efficiently ourselves. 

565 # In particular, the FastAPI encoder is very slow for large, nested objects. 

566 # See: https://github.com/tiangolo/fastapi/issues/1224 

567 encoded = [ 

568 schemas.responses.FlowRunResponse.model_validate( 

569 fr, from_attributes=True 

570 ).model_dump(mode="json") 

571 for fr in db_flow_runs 

572 ] 

573 return ORJSONResponse(content=encoded) 

574 

575 

576@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1a

577async def delete_flow_run( 1a

578 docket: dependencies.Docket, 

579 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

580 db: PrefectDBInterface = Depends(provide_database_interface), 

581) -> None: 

582 """ 

583 Delete a flow run by id. 

584 """ 

585 async with db.session_context(begin_transaction=True) as session: 

586 result = await models.flow_runs.delete_flow_run( 

587 session=session, flow_run_id=flow_run_id 

588 ) 

589 if not result: 

590 raise HTTPException( 

591 status_code=status.HTTP_404_NOT_FOUND, detail="Flow run not found" 

592 ) 

593 await docket.add(delete_flow_run_logs)(flow_run_id=flow_run_id) 

594 

595 

596async def delete_flow_run_logs( 1a

597 *, 

598 db: PrefectDBInterface = DocketDepends(provide_database_interface), 

599 flow_run_id: UUID, 

600 retry: Retry = Retry(attempts=5, delay=datetime.timedelta(seconds=0.5)), 

601) -> None: 

602 async with db.session_context(begin_transaction=True) as session: 

603 await models.logs.delete_logs( 

604 session=session, 

605 log_filter=schemas.filters.LogFilter( 

606 flow_run_id=schemas.filters.LogFilterFlowRunId(any_=[flow_run_id]) 

607 ), 

608 ) 

609 

610 

611@router.post("/{id:uuid}/set_state") 1a

612async def set_flow_run_state( 1a

613 response: Response, 

614 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

615 state: schemas.actions.StateCreate = Body(..., description="The intended state."), 

616 force: bool = Body( 

617 False, 

618 description=( 

619 "If false, orchestration rules will be applied that may alter or prevent" 

620 " the state transition. If True, orchestration rules are not applied." 

621 ), 

622 ), 

623 db: PrefectDBInterface = Depends(provide_database_interface), 

624 flow_policy: type[FlowRunOrchestrationPolicy] = Depends( 

625 orchestration_dependencies.provide_flow_policy 

626 ), 

627 orchestration_parameters: Dict[str, Any] = Depends( 

628 orchestration_dependencies.provide_flow_orchestration_parameters 

629 ), 

630 api_version: str = Depends(dependencies.provide_request_api_version), 

631 client_version: Optional[str] = Depends(dependencies.get_prefect_client_version), 

632) -> OrchestrationResult: 

633 """Set a flow run state, invoking any orchestration rules.""" 

634 

635 # pass the request version to the orchestration engine to support compatibility code 

636 orchestration_parameters.update({"api-version": api_version}) 

637 

638 right_now = now("UTC") 

639 

640 # create the state 

641 async with db.session_context( 

642 begin_transaction=True, with_for_update=True 

643 ) as session: 

644 orchestration_result = await models.flow_runs.set_flow_run_state( 

645 session=session, 

646 flow_run_id=flow_run_id, 

647 # convert to a full State object 

648 state=schemas.states.State.model_validate(state), 

649 force=force, 

650 flow_policy=flow_policy, 

651 orchestration_parameters=orchestration_parameters, 

652 client_version=client_version, 

653 ) 

654 

655 # set the 201 if a new state was created 

656 if orchestration_result.state and orchestration_result.state.timestamp >= right_now: 

657 response.status_code = status.HTTP_201_CREATED 

658 else: 

659 response.status_code = status.HTTP_200_OK 

660 

661 return orchestration_result 

662 

663 

664@router.post("/{id:uuid}/input", status_code=status.HTTP_201_CREATED) 1a

665async def create_flow_run_input( 1a

666 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

667 key: str = Body(..., description="The input key"), 

668 value: bytes = Body(..., description="The value of the input"), 

669 sender: Optional[str] = Body(None, description="The sender of the input"), 

670 db: PrefectDBInterface = Depends(provide_database_interface), 

671) -> None: 

672 """ 

673 Create a key/value input for a flow run. 

674 """ 

675 async with db.session_context() as session: 

676 try: 

677 await models.flow_run_input.create_flow_run_input( 

678 session=session, 

679 flow_run_input=schemas.core.FlowRunInput( 

680 flow_run_id=flow_run_id, 

681 key=key, 

682 sender=sender, 

683 value=value.decode(), 

684 ), 

685 ) 

686 await session.commit() 

687 

688 except IntegrityError as exc: 

689 if "unique constraint" in str(exc).lower(): 

690 raise HTTPException( 

691 status_code=status.HTTP_409_CONFLICT, 

692 detail="A flow run input with this key already exists.", 

693 ) 

694 else: 

695 raise HTTPException( 

696 status_code=status.HTTP_404_NOT_FOUND, detail="Flow run not found" 

697 ) 

698 

699 

700@router.post("/{id:uuid}/input/filter") 1a

701async def filter_flow_run_input( 1a

702 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

703 prefix: str = Body(..., description="The input key prefix", embed=True), 

704 limit: int = Body( 

705 1, description="The maximum number of results to return", embed=True 

706 ), 

707 exclude_keys: List[str] = Body( 

708 [], description="Exclude inputs with these keys", embed=True 

709 ), 

710 db: PrefectDBInterface = Depends(provide_database_interface), 

711) -> List[schemas.core.FlowRunInput]: 

712 """ 

713 Filter flow run inputs by key prefix 

714 """ 

715 async with db.session_context() as session: 

716 return await models.flow_run_input.filter_flow_run_input( 

717 session=session, 

718 flow_run_id=flow_run_id, 

719 prefix=prefix, 

720 limit=limit, 

721 exclude_keys=exclude_keys, 

722 ) 

723 

724 

725@router.get("/{id:uuid}/input/{key}") 1a

726async def read_flow_run_input( 1a

727 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

728 key: str = Path(..., description="The input key", alias="key"), 

729 db: PrefectDBInterface = Depends(provide_database_interface), 

730) -> PlainTextResponse: 

731 """ 

732 Create a value from a flow run input 

733 """ 

734 

735 async with db.session_context() as session: 

736 flow_run_input = await models.flow_run_input.read_flow_run_input( 

737 session=session, flow_run_id=flow_run_id, key=key 

738 ) 

739 

740 if flow_run_input: 

741 return PlainTextResponse(flow_run_input.value) 

742 else: 

743 raise HTTPException( 

744 status_code=status.HTTP_404_NOT_FOUND, detail="Flow run input not found" 

745 ) 

746 

747 

748@router.delete("/{id:uuid}/input/{key}", status_code=status.HTTP_204_NO_CONTENT) 1a

749async def delete_flow_run_input( 1a

750 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

751 key: str = Path(..., description="The input key", alias="key"), 

752 db: PrefectDBInterface = Depends(provide_database_interface), 

753) -> None: 

754 """ 

755 Delete a flow run input 

756 """ 

757 

758 async with db.session_context() as session: 

759 deleted = await models.flow_run_input.delete_flow_run_input( 

760 session=session, flow_run_id=flow_run_id, key=key 

761 ) 

762 await session.commit() 

763 

764 if not deleted: 

765 raise HTTPException( 

766 status_code=status.HTTP_404_NOT_FOUND, detail="Flow run input not found" 

767 ) 

768 

769 

770@router.post("/paginate", response_class=ORJSONResponse) 1a

771async def paginate_flow_runs( 1a

772 sort: schemas.sorting.FlowRunSort = Body(schemas.sorting.FlowRunSort.ID_DESC), 

773 limit: int = dependencies.LimitBody(), 

774 page: int = Body(1, ge=1), 

775 flows: Optional[schemas.filters.FlowFilter] = None, 

776 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

777 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

778 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

779 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

780 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

781 db: PrefectDBInterface = Depends(provide_database_interface), 

782) -> FlowRunPaginationResponse: 

783 """ 

784 Pagination query for flow runs. 

785 """ 

786 offset = (page - 1) * limit 

787 

788 async with db.session_context() as session: 

789 runs = await models.flow_runs.read_flow_runs( 

790 session=session, 

791 flow_filter=flows, 

792 flow_run_filter=flow_runs, 

793 task_run_filter=task_runs, 

794 deployment_filter=deployments, 

795 work_pool_filter=work_pools, 

796 work_queue_filter=work_pool_queues, 

797 offset=offset, 

798 limit=limit, 

799 sort=sort, 

800 ) 

801 

802 count = await models.flow_runs.count_flow_runs( 

803 session=session, 

804 flow_filter=flows, 

805 flow_run_filter=flow_runs, 

806 task_run_filter=task_runs, 

807 deployment_filter=deployments, 

808 work_pool_filter=work_pools, 

809 work_queue_filter=work_pool_queues, 

810 ) 

811 

812 # Instead of relying on fastapi.encoders.jsonable_encoder to convert the 

813 # response to JSON, we do so more efficiently ourselves. 

814 # In particular, the FastAPI encoder is very slow for large, nested objects. 

815 # See: https://github.com/tiangolo/fastapi/issues/1224 

816 results = [ 

817 schemas.responses.FlowRunResponse.model_validate( 

818 run, from_attributes=True 

819 ).model_dump(mode="json") 

820 for run in runs 

821 ] 

822 

823 response = FlowRunPaginationResponse( 

824 results=results, 

825 count=count, 

826 limit=limit, 

827 pages=(count + limit - 1) // limit, 

828 page=page, 

829 ).model_dump(mode="json") 

830 

831 return ORJSONResponse(content=response) 

832 

833 

834FLOW_RUN_LOGS_DOWNLOAD_PAGE_LIMIT = 1000 1a

835 

836 

837@router.get("/{id:uuid}/logs/download") 1a

838async def download_logs( 1a

839 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

840 db: PrefectDBInterface = Depends(provide_database_interface), 

841) -> StreamingResponse: 

842 """ 

843 Download all flow run logs as a CSV file, collecting all logs until there are no more logs to retrieve. 

844 """ 

845 async with db.session_context() as session: 

846 flow_run = await models.flow_runs.read_flow_run( 

847 session=session, flow_run_id=flow_run_id 

848 ) 

849 

850 if not flow_run: 

851 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Flow run not found") 

852 

853 async def generate(): 

854 data = io.StringIO() 

855 csv_writer = csv.writer(data) 

856 csv_writer.writerow( 

857 ["timestamp", "level", "flow_run_id", "task_run_id", "message"] 

858 ) 

859 

860 offset = 0 

861 limit = FLOW_RUN_LOGS_DOWNLOAD_PAGE_LIMIT 

862 

863 while True: 

864 results = await models.logs.read_logs( 

865 session=session, 

866 log_filter=schemas.filters.LogFilter( 

867 flow_run_id={"any_": [flow_run_id]} 

868 ), 

869 offset=offset, 

870 limit=limit, 

871 sort=schemas.sorting.LogSort.TIMESTAMP_ASC, 

872 ) 

873 

874 if not results: 

875 break 

876 

877 offset += limit 

878 

879 for log in results: 

880 csv_writer.writerow( 

881 [ 

882 log.timestamp, 

883 log.level, 

884 log.flow_run_id, 

885 log.task_run_id, 

886 log.message, 

887 ] 

888 ) 

889 data.seek(0) 

890 yield data.read() 

891 data.seek(0) 

892 data.truncate(0) 

893 

894 return StreamingResponse( 

895 generate(), 

896 media_type="text/csv", 

897 headers={ 

898 "Content-Disposition": f"attachment; filename={flow_run.name}-logs.csv" 

899 }, 

900 ) 

901 

902 

903@router.patch("/{id:uuid}/labels", status_code=status.HTTP_204_NO_CONTENT) 1a

904async def update_flow_run_labels( 1a

905 flow_run_id: UUID = Path(..., description="The flow run id", alias="id"), 

906 labels: Dict[str, Any] = Body(..., description="The labels to update"), 

907 db: PrefectDBInterface = Depends(provide_database_interface), 

908) -> None: 

909 """ 

910 Update the labels of a flow run. 

911 """ 

912 async with db.session_context(begin_transaction=True) as session: 

913 await models.flow_runs.update_flow_run_labels( 

914 session=session, flow_run_id=flow_run_id, labels=labels 

915 )