Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/responses.py: 84%

223 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Schemas for special responses from the Prefect REST API. 

3""" 

4 

5import datetime 1a

6from typing import Any, ClassVar, Dict, List, Optional, Type, Union 1a

7from uuid import UUID 1a

8 

9from pydantic import BaseModel, ConfigDict, Field, model_validator 1a

10from typing_extensions import Literal, Self 1a

11 

12import prefect.server.schemas as schemas 1a

13from prefect.server.schemas.core import ( 1a

14 CreatedBy, 

15 FlowRunPolicy, 

16 UpdatedBy, 

17 WorkQueueStatusDetail, 

18) 

19from prefect.server.utilities.schemas.bases import ORMBaseModel, PrefectBaseModel 1a

20from prefect.types import DateTime, KeyValueLabelsField 1a

21from prefect.types._datetime import create_datetime_instance 1a

22from prefect.utilities.collections import AutoEnum 1a

23from prefect.utilities.names import generate_slug 1a

24 

25 

26class SetStateStatus(AutoEnum): 1a

27 """Enumerates return statuses for setting run states.""" 

28 

29 ACCEPT = AutoEnum.auto() 1a

30 REJECT = AutoEnum.auto() 1a

31 ABORT = AutoEnum.auto() 1a

32 WAIT = AutoEnum.auto() 1a

33 

34 

35class StateAcceptDetails(PrefectBaseModel): 1a

36 """Details associated with an ACCEPT state transition.""" 

37 

38 type: Literal["accept_details"] = Field( 1a

39 default="accept_details", 

40 description=( 

41 "The type of state transition detail. Used to ensure pydantic does not" 

42 " coerce into a different type." 

43 ), 

44 ) 

45 

46 

47class StateRejectDetails(PrefectBaseModel): 1a

48 """Details associated with a REJECT state transition.""" 

49 

50 type: Literal["reject_details"] = Field( 1a

51 default="reject_details", 

52 description=( 

53 "The type of state transition detail. Used to ensure pydantic does not" 

54 " coerce into a different type." 

55 ), 

56 ) 

57 reason: Optional[str] = Field( 1a

58 default=None, description="The reason why the state transition was rejected." 

59 ) 

60 

61 

62class StateAbortDetails(PrefectBaseModel): 1a

63 """Details associated with an ABORT state transition.""" 

64 

65 type: Literal["abort_details"] = Field( 1a

66 default="abort_details", 

67 description=( 

68 "The type of state transition detail. Used to ensure pydantic does not" 

69 " coerce into a different type." 

70 ), 

71 ) 

72 reason: Optional[str] = Field( 1a

73 default=None, description="The reason why the state transition was aborted." 

74 ) 

75 

76 

77class StateWaitDetails(PrefectBaseModel): 1a

78 """Details associated with a WAIT state transition.""" 

79 

80 type: Literal["wait_details"] = Field( 1a

81 default="wait_details", 

82 description=( 

83 "The type of state transition detail. Used to ensure pydantic does not" 

84 " coerce into a different type." 

85 ), 

86 ) 

87 delay_seconds: int = Field( 1a

88 default=..., 

89 description=( 

90 "The length of time in seconds the client should wait before transitioning" 

91 " states." 

92 ), 

93 ) 

94 reason: Optional[str] = Field( 1a

95 default=None, description="The reason why the state transition should wait." 

96 ) 

97 

98 

99class HistoryResponseState(PrefectBaseModel): 1a

100 """Represents a single state's history over an interval.""" 

101 

102 state_type: schemas.states.StateType = Field( 1a

103 default=..., description="The state type." 

104 ) 

105 state_name: str = Field(default=..., description="The state name.") 1a

106 count_runs: int = Field( 1a

107 default=..., 

108 description="The number of runs in the specified state during the interval.", 

109 ) 

110 sum_estimated_run_time: datetime.timedelta = Field( 1a

111 default=..., 

112 description="The total estimated run time of all runs during the interval.", 

113 ) 

114 sum_estimated_lateness: datetime.timedelta = Field( 1a

115 default=..., 

116 description=( 

117 "The sum of differences between actual and expected start time during the" 

118 " interval." 

119 ), 

120 ) 

121 

122 

123class HistoryResponse(PrefectBaseModel): 1a

124 """Represents a history of aggregation states over an interval""" 

125 

126 interval_start: DateTime = Field( 1a

127 default=..., description="The start date of the interval." 

128 ) 

129 interval_end: DateTime = Field( 1a

130 default=..., description="The end date of the interval." 

131 ) 

132 states: List[HistoryResponseState] = Field( 1a

133 default=..., description="A list of state histories during the interval." 

134 ) 

135 

136 @model_validator(mode="before") 1a

137 @classmethod 1a

138 def validate_timestamps( 1a

139 cls, values: dict 

140 ) -> dict: # TODO: remove this, handle with ORM 

141 d = {"interval_start": None, "interval_end": None} 

142 for field in d.keys(): 

143 val = values.get(field) 

144 if isinstance(val, datetime.datetime): 

145 d[field] = create_datetime_instance(values[field]) 

146 else: 

147 d[field] = val 

148 

149 return {**values, **d} 

150 

151 

152StateResponseDetails = Union[ 1a

153 StateAcceptDetails, StateWaitDetails, StateRejectDetails, StateAbortDetails 

154] 

155 

156 

157class OrchestrationResult(PrefectBaseModel): 1a

158 """ 

159 A container for the output of state orchestration. 

160 """ 

161 

162 state: Optional[schemas.states.State] 1a

163 status: SetStateStatus 1a

164 details: StateResponseDetails 1a

165 

166 

167class WorkerFlowRunResponse(PrefectBaseModel): 1a

168 model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True) 1a

169 

170 work_pool_id: UUID 1a

171 work_queue_id: UUID 1a

172 flow_run: schemas.core.FlowRun 1a

173 

174 

175class FlowRunResponse(ORMBaseModel): 1a

176 name: str = Field( 1a

177 default_factory=lambda: generate_slug(2), 

178 description=( 

179 "The name of the flow run. Defaults to a random slug if not specified." 

180 ), 

181 examples=["my-flow-run"], 

182 ) 

183 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a

184 state_id: Optional[UUID] = Field( 1a

185 default=None, description="The id of the flow run's current state." 

186 ) 

187 deployment_id: Optional[UUID] = Field( 1a

188 default=None, 

189 description=( 

190 "The id of the deployment associated with this flow run, if available." 

191 ), 

192 ) 

193 deployment_version: Optional[str] = Field( 1a

194 default=None, 

195 description="The version of the deployment associated with this flow run.", 

196 examples=["1.0"], 

197 ) 

198 work_queue_id: Optional[UUID] = Field( 1a

199 default=None, description="The id of the run's work pool queue." 

200 ) 

201 work_queue_name: Optional[str] = Field( 1a

202 default=None, description="The work queue that handled this flow run." 

203 ) 

204 flow_version: Optional[str] = Field( 1a

205 default=None, 

206 description="The version of the flow executed in this flow run.", 

207 examples=["1.0"], 

208 ) 

209 parameters: Dict[str, Any] = Field( 1a

210 default_factory=dict, description="Parameters for the flow run." 

211 ) 

212 idempotency_key: Optional[str] = Field( 1a

213 default=None, 

214 description=( 

215 "An optional idempotency key for the flow run. Used to ensure the same flow" 

216 " run is not created multiple times." 

217 ), 

218 ) 

219 context: Dict[str, Any] = Field( 1a

220 default_factory=dict, 

221 description="Additional context for the flow run.", 

222 examples=[{"my_var": "my_val"}], 

223 ) 

224 empirical_policy: FlowRunPolicy = Field( 1a

225 default_factory=FlowRunPolicy, 

226 ) 

227 tags: List[str] = Field( 1a

228 default_factory=list, 

229 description="A list of tags on the flow run", 

230 examples=[["tag-1", "tag-2"]], 

231 ) 

232 labels: KeyValueLabelsField 1a

233 parent_task_run_id: Optional[UUID] = Field( 1a

234 default=None, 

235 description=( 

236 "If the flow run is a subflow, the id of the 'dummy' task in the parent" 

237 " flow used to track subflow state." 

238 ), 

239 ) 

240 state_type: Optional[schemas.states.StateType] = Field( 1a

241 default=None, description="The type of the current flow run state." 

242 ) 

243 state_name: Optional[str] = Field( 1a

244 default=None, description="The name of the current flow run state." 

245 ) 

246 run_count: int = Field( 1a

247 default=0, description="The number of times the flow run was executed." 

248 ) 

249 expected_start_time: Optional[DateTime] = Field( 1a

250 default=None, 

251 description="The flow run's expected start time.", 

252 ) 

253 next_scheduled_start_time: Optional[DateTime] = Field( 1a

254 default=None, 

255 description="The next time the flow run is scheduled to start.", 

256 ) 

257 start_time: Optional[DateTime] = Field( 1a

258 default=None, description="The actual start time." 

259 ) 

260 end_time: Optional[DateTime] = Field( 1a

261 default=None, description="The actual end time." 

262 ) 

263 total_run_time: datetime.timedelta = Field( 1a

264 default=datetime.timedelta(0), 

265 description=( 

266 "Total run time. If the flow run was executed multiple times, the time of" 

267 " each run will be summed." 

268 ), 

269 ) 

270 estimated_run_time: datetime.timedelta = Field( 1a

271 default=datetime.timedelta(0), 

272 description="A real-time estimate of the total run time.", 

273 ) 

274 estimated_start_time_delta: datetime.timedelta = Field( 1a

275 default=datetime.timedelta(0), 

276 description="The difference between actual and expected start time.", 

277 ) 

278 auto_scheduled: bool = Field( 1a

279 default=False, 

280 description="Whether or not the flow run was automatically scheduled.", 

281 ) 

282 infrastructure_document_id: Optional[UUID] = Field( 1a

283 default=None, 

284 description="The block document defining infrastructure to use this flow run.", 

285 ) 

286 infrastructure_pid: Optional[str] = Field( 1a

287 default=None, 

288 description="The id of the flow run as returned by an infrastructure block.", 

289 ) 

290 created_by: Optional[CreatedBy] = Field( 1a

291 default=None, 

292 description="Optional information about the creator of this flow run.", 

293 ) 

294 work_pool_id: Optional[UUID] = Field( 1a

295 default=None, 

296 description="The id of the flow run's work pool.", 

297 ) 

298 work_pool_name: Optional[str] = Field( 1a

299 default=None, 

300 description="The name of the flow run's work pool.", 

301 examples=["my-work-pool"], 

302 ) 

303 state: Optional[schemas.states.State] = Field( 1a

304 default=None, description="The current state of the flow run." 

305 ) 

306 job_variables: Optional[Dict[str, Any]] = Field( 1a

307 default=None, 

308 description="Variables used as overrides in the base job template", 

309 ) 

310 

311 @classmethod 1a

312 def model_validate( 1a

313 cls: Type[Self], 

314 obj: Any, 

315 *, 

316 strict: Optional[bool] = None, 

317 from_attributes: Optional[bool] = None, 

318 context: Optional[dict[str, Any]] = None, 

319 ) -> Self: 

320 response = super().model_validate(obj) 1bc

321 

322 if from_attributes: 322 ↛ 330line 322 didn't jump to line 330 because the condition on line 322 was always true1bc

323 if obj.work_queue: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true1bc

324 response.work_queue_id = obj.work_queue.id 

325 response.work_queue_name = obj.work_queue.name 

326 if obj.work_queue.work_pool: 

327 response.work_pool_id = obj.work_queue.work_pool.id 

328 response.work_pool_name = obj.work_queue.work_pool.name 

329 

330 return response 1bc

331 

332 def __eq__(self, other: Any) -> bool: 1a

333 """ 

334 Check for "equality" to another flow run schema 

335 

336 Estimates times are rolling and will always change with repeated queries for 

337 a flow run so we ignore them during equality checks. 

338 """ 

339 if isinstance(other, FlowRunResponse): 

340 exclude_fields = {"estimated_run_time", "estimated_start_time_delta"} 

341 return self.model_dump(exclude=exclude_fields) == other.model_dump( 

342 exclude=exclude_fields 

343 ) 

344 return super().__eq__(other) 

345 

346 

347class TaskRunResponse(ORMBaseModel): 1a

348 name: str = Field( 1a

349 default_factory=lambda: generate_slug(2), 

350 description=( 

351 "The name of the task run. Defaults to a random slug if not specified." 

352 ), 

353 examples=["my-task-run"], 

354 ) 

355 flow_run_id: Optional[UUID] = Field( 1a

356 default=None, description="The id of the flow run this task run belongs to." 

357 ) 

358 task_key: str = Field( 1a

359 default=..., description="The key of the task this run represents." 

360 ) 

361 state_id: Optional[UUID] = Field( 1a

362 default=None, description="The id of the task run's current state." 

363 ) 

364 state: Optional[schemas.states.State] = Field( 1a

365 default=None, description="The current state of the task run." 

366 ) 

367 task_version: Optional[str] = Field( 1a

368 default=None, 

369 description="The version of the task executed in this task run.", 

370 examples=["1.0"], 

371 ) 

372 parameters: dict[str, Any] = Field( 1a

373 default_factory=dict, description="Parameters for the task run." 

374 ) 

375 task_inputs: dict[ 1a

376 str, 

377 list[ 

378 Union[ 

379 schemas.core.TaskRunResult, 

380 schemas.core.FlowRunResult, 

381 schemas.core.Parameter, 

382 schemas.core.Constant, 

383 ] 

384 ], 

385 ] = Field(default_factory=dict, description="Inputs provided to the task run.") 

386 context: dict[str, Any] = Field( 1a

387 default_factory=dict, 

388 description="Additional context for the task run.", 

389 examples=[{"my_var": "my_val"}], 

390 ) 

391 empirical_policy: schemas.core.TaskRunPolicy = Field( 1a

392 default_factory=schemas.core.TaskRunPolicy, 

393 description="The task run's empirical retry policy.", 

394 ) 

395 tags: list[str] = Field( 1a

396 default_factory=list, 

397 description="A list of tags for the task run.", 

398 examples=[["tag-1", "tag-2"]], 

399 ) 

400 

401 

402class DeploymentResponse(ORMBaseModel): 1a

403 name: str = Field(default=..., description="The name of the deployment.") 1a

404 version: Optional[str] = Field( 1a

405 default=None, description="An optional version for the deployment." 

406 ) 

407 description: Optional[str] = Field( 1a

408 default=None, description="A description for the deployment." 

409 ) 

410 flow_id: UUID = Field( 1a

411 default=..., description="The flow id associated with the deployment." 

412 ) 

413 paused: bool = Field( 1a

414 default=False, description="Whether or not the deployment is paused." 

415 ) 

416 schedules: List[schemas.core.DeploymentSchedule] = Field( 1a

417 default_factory=list, description="A list of schedules for the deployment." 

418 ) 

419 concurrency_limit: Optional[int] = Field( 1a

420 default=None, 

421 description="DEPRECATED: Prefer `global_concurrency_limit`. Will always be None for backwards compatibility. Will be removed after December 2024.", 

422 deprecated=True, 

423 ) 

424 global_concurrency_limit: Optional["GlobalConcurrencyLimitResponse"] = Field( 1a

425 default=None, 

426 description="The global concurrency limit object for enforcing the maximum number of flow runs that can be active at once.", 

427 ) 

428 concurrency_options: Optional[schemas.core.ConcurrencyOptions] = Field( 1a

429 default=None, 

430 description="The concurrency options for the deployment.", 

431 ) 

432 job_variables: Dict[str, Any] = Field( 1a

433 default_factory=dict, 

434 description="Overrides to apply to the base infrastructure block at runtime.", 

435 ) 

436 parameters: Dict[str, Any] = Field( 1a

437 default_factory=dict, 

438 description="Parameters for flow runs scheduled by the deployment.", 

439 ) 

440 tags: List[str] = Field( 1a

441 default_factory=list, 

442 description="A list of tags for the deployment", 

443 examples=[["tag-1", "tag-2"]], 

444 ) 

445 labels: KeyValueLabelsField 1a

446 work_queue_name: Optional[str] = Field( 1a

447 default=None, 

448 description=( 

449 "The work queue for the deployment. If no work queue is set, work will not" 

450 " be scheduled." 

451 ), 

452 ) 

453 work_queue_id: Optional[UUID] = Field( 1a

454 default=None, 

455 description="The id of the work pool queue to which this deployment is assigned.", 

456 ) 

457 last_polled: Optional[DateTime] = Field( 1a

458 default=None, 

459 description="The last time the deployment was polled for status updates.", 

460 ) 

461 parameter_openapi_schema: Optional[Dict[str, Any]] = Field( 1a

462 default=None, 

463 description="The parameter schema of the flow, including defaults.", 

464 json_schema_extra={"additionalProperties": True}, 

465 ) 

466 path: Optional[str] = Field( 1a

467 default=None, 

468 description=( 

469 "The path to the working directory for the workflow, relative to remote" 

470 " storage or an absolute path." 

471 ), 

472 ) 

473 pull_steps: Optional[list[dict[str, Any]]] = Field( 1a

474 default=None, description="Pull steps for cloning and running this deployment." 

475 ) 

476 entrypoint: Optional[str] = Field( 1a

477 default=None, 

478 description=( 

479 "The path to the entrypoint for the workflow, relative to the `path`." 

480 ), 

481 ) 

482 storage_document_id: Optional[UUID] = Field( 1a

483 default=None, 

484 description="The block document defining storage used for this flow.", 

485 ) 

486 infrastructure_document_id: Optional[UUID] = Field( 1a

487 default=None, 

488 description="The block document defining infrastructure to use for flow runs.", 

489 ) 

490 created_by: Optional[CreatedBy] = Field( 1a

491 default=None, 

492 description="Optional information about the creator of this deployment.", 

493 ) 

494 updated_by: Optional[UpdatedBy] = Field( 1a

495 default=None, 

496 description="Optional information about the updater of this deployment.", 

497 ) 

498 work_pool_name: Optional[str] = Field( 1a

499 default=None, description="The name of the deployment's work pool." 

500 ) 

501 status: Optional[schemas.statuses.DeploymentStatus] = Field( 1a

502 default=schemas.statuses.DeploymentStatus.NOT_READY, 

503 description="Whether the deployment is ready to run flows.", 

504 ) 

505 enforce_parameter_schema: bool = Field( 1a

506 default=True, 

507 description=( 

508 "Whether or not the deployment should enforce the parameter schema." 

509 ), 

510 ) 

511 

512 @classmethod 1a

513 def model_validate( 1a

514 cls: Type[Self], 

515 obj: Any, 

516 *, 

517 strict: Optional[bool] = None, 

518 from_attributes: Optional[bool] = None, 

519 context: Optional[dict[str, Any]] = None, 

520 ) -> Self: 

521 response = super().model_validate( 1bdc

522 obj, strict=strict, from_attributes=from_attributes, context=context 

523 ) 

524 

525 if from_attributes: 525 ↛ 532line 525 didn't jump to line 532 because the condition on line 525 was always true1bdc

526 if obj.work_queue: 1bdc

527 response.work_queue_id = obj.work_queue.id 

528 response.work_queue_name = obj.work_queue.name 

529 if obj.work_queue.work_pool: 529 ↛ 532line 529 didn't jump to line 532 because the condition on line 529 was always true

530 response.work_pool_name = obj.work_queue.work_pool.name 

531 

532 return response 1bdc

533 

534 

535class WorkQueueResponse(schemas.core.WorkQueue): 1a

536 work_pool_name: Optional[str] = Field( 1a

537 default=None, 

538 description="The name of the work pool the work pool resides within.", 

539 ) 

540 status: Optional[schemas.statuses.WorkQueueStatus] = Field( 1a

541 default=None, description="The queue status." 

542 ) 

543 

544 @classmethod 1a

545 def model_validate( 1a

546 cls: Type[Self], 

547 obj: Any, 

548 *, 

549 strict: Optional[bool] = None, 

550 from_attributes: Optional[bool] = None, 

551 context: Optional[dict[str, Any]] = None, 

552 ) -> Self: 

553 response = super().model_validate( 1b

554 obj, strict=strict, from_attributes=from_attributes, context=context 

555 ) 

556 

557 if from_attributes: 557 ↛ 561line 557 didn't jump to line 561 because the condition on line 557 was always true1b

558 if obj.work_pool: 558 ↛ 561line 558 didn't jump to line 561 because the condition on line 558 was always true1b

559 response.work_pool_name = obj.work_pool.name 1b

560 

561 return response 1b

562 

563 

564class WorkQueueWithStatus(WorkQueueResponse, WorkQueueStatusDetail): 1a

565 """Combines a work queue and its status details into a single object""" 

566 

567 

568DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 30 1a

569INACTIVITY_HEARTBEAT_MULTIPLE = 3 1a

570 

571 

572class WorkerResponse(schemas.core.Worker): 1a

573 status: schemas.statuses.WorkerStatus = Field( 1a

574 schemas.statuses.WorkerStatus.OFFLINE, 

575 description="Current status of the worker.", 

576 ) 

577 

578 @classmethod 1a

579 def model_validate( 1a

580 cls: Type[Self], 

581 obj: Any, 

582 *, 

583 strict: Optional[bool] = None, 

584 from_attributes: Optional[bool] = None, 

585 context: Optional[dict[str, Any]] = None, 

586 ) -> Self: 

587 worker = super().model_validate( 

588 obj, strict=strict, from_attributes=from_attributes, context=context 

589 ) 

590 

591 if from_attributes: 

592 offline_horizon = datetime.datetime.now( 

593 tz=datetime.timezone.utc 

594 ) - datetime.timedelta( 

595 seconds=( 

596 worker.heartbeat_interval_seconds 

597 or DEFAULT_HEARTBEAT_INTERVAL_SECONDS 

598 ) 

599 * INACTIVITY_HEARTBEAT_MULTIPLE 

600 ) 

601 if worker.last_heartbeat_time > offline_horizon: 

602 worker.status = schemas.statuses.WorkerStatus.ONLINE 

603 else: 

604 worker.status = schemas.statuses.WorkerStatus.OFFLINE 

605 

606 return worker 

607 

608 

609class GlobalConcurrencyLimitResponse(ORMBaseModel): 1a

610 """ 

611 A response object for global concurrency limits. 

612 """ 

613 

614 active: bool = Field( 1a

615 default=True, description="Whether the global concurrency limit is active." 

616 ) 

617 name: str = Field( 1a

618 default=..., description="The name of the global concurrency limit." 

619 ) 

620 limit: int = Field(default=..., description="The concurrency limit.") 1a

621 active_slots: int = Field(default=..., description="The number of active slots.") 1a

622 slot_decay_per_second: float = Field( 1a

623 default=2.0, 

624 description="The decay rate for active slots when used as a rate limit.", 

625 ) 

626 

627 

628class FlowPaginationResponse(BaseModel): 1a

629 results: list[schemas.core.Flow] 1a

630 count: int 1a

631 limit: int 1a

632 pages: int 1a

633 page: int 1a

634 

635 

636class FlowRunPaginationResponse(BaseModel): 1a

637 results: list[FlowRunResponse] 1a

638 count: int 1a

639 limit: int 1a

640 pages: int 1a

641 page: int 1a

642 

643 

644class TaskRunPaginationResponse(BaseModel): 1a

645 results: list[TaskRunResponse] 1a

646 count: int 1a

647 limit: int 1a

648 pages: int 1a

649 page: int 1a

650 

651 

652class DeploymentPaginationResponse(BaseModel): 1a

653 results: list[DeploymentResponse] 1a

654 count: int 1a

655 limit: int 1a

656 pages: int 1a

657 page: int 1a

658 

659 

660class SchemaValuePropertyError(BaseModel): 1a

661 property: str 1a

662 errors: List["SchemaValueError"] 1a

663 

664 

665class SchemaValueIndexError(BaseModel): 1a

666 index: int 1a

667 errors: List["SchemaValueError"] 1a

668 

669 

670SchemaValueError = Union[str, SchemaValuePropertyError, SchemaValueIndexError] 1a

671 

672 

673class SchemaValuesValidationResponse(BaseModel): 1a

674 errors: List[SchemaValueError] 1a

675 valid: bool 1a