Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/schemas/objects.py: 79%

573 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import datetime 1a

4import warnings 1a

5from collections.abc import Callable, Mapping 1a

6from enum import Enum 1a

7from functools import partial 1a

8from typing import ( 1a

9 TYPE_CHECKING, 

10 Annotated, 

11 Any, 

12 ClassVar, 

13 Generic, 

14 Optional, 

15 Union, 

16 overload, 

17) 

18from uuid import UUID 1a

19 

20import orjson 1a

21from pydantic import ( 1a

22 AfterValidator, 

23 ConfigDict, 

24 Discriminator, 

25 Field, 

26 HttpUrl, 

27 IPvAnyNetwork, 

28 SerializationInfo, 

29 SerializerFunctionWrapHandler, 

30 Tag, 

31 field_validator, 

32 model_serializer, 

33 model_validator, 

34) 

35from typing_extensions import Literal, Self, TypeVar 1a

36 

37from prefect._internal.compatibility.async_dispatch import async_dispatch 1a

38from prefect._internal.compatibility.migration import getattr_migration 1a

39from prefect._internal.schemas.bases import ( 1a

40 ObjectBaseModel, 

41 PrefectBaseModel, 

42 TimeSeriesBaseModel, 

43) 

44from prefect._internal.schemas.fields import CreatedBy, UpdatedBy 1a

45from prefect._internal.schemas.validators import ( 1a

46 get_or_create_run_name, 

47 list_length_50_or_less, 

48 set_run_policy_deprecated_fields, 

49 validate_default_queue_id_not_none, 

50 validate_max_metadata_length, 

51 validate_name_present_on_nonanonymous_blocks, 

52 validate_not_negative, 

53 validate_parent_and_ref_diff, 

54) 

55from prefect._internal.uuid7 import uuid7 1a

56from prefect._result_records import ResultRecordMetadata 1a

57from prefect.client.schemas.schedules import SCHEDULE_TYPES 1a

58from prefect.settings import PREFECT_CLOUD_API_URL, PREFECT_CLOUD_UI_URL 1a

59from prefect.types import ( 1a

60 MAX_VARIABLE_NAME_LENGTH, 

61 KeyValueLabelsField, 

62 Name, 

63 NonNegativeInteger, 

64 PositiveInteger, 

65 StrictVariableValue, 

66) 

67from prefect.types._datetime import DateTime, now 1a

68from prefect.types.names import ( 1a

69 BlockDocumentName, 

70 raise_on_name_alphanumeric_dashes_only, 

71) 

72from prefect.utilities.asyncutils import run_coro_as_sync 1a

73from prefect.utilities.collections import AutoEnum, visit_collection 1a

74from prefect.utilities.names import generate_slug 1a

75from prefect.utilities.pydantic import handle_secret_render 1a

76 

77R = TypeVar("R", default=Any) 1a

78 

79 

80DEFAULT_BLOCK_SCHEMA_VERSION: Literal["non-versioned"] = "non-versioned" 1a

81DEFAULT_AGENT_WORK_POOL_NAME: Literal["default-agent-pool"] = "default-agent-pool" 1a

82 

83 

84class RunType(AutoEnum): 1a

85 FLOW_RUN = "flow_run" 1a

86 TASK_RUN = "task_run" 1a

87 

88 

89class StateType(AutoEnum): 1a

90 """Enumeration of state types.""" 

91 

92 SCHEDULED = AutoEnum.auto() 1a

93 PENDING = AutoEnum.auto() 1a

94 RUNNING = AutoEnum.auto() 1a

95 COMPLETED = AutoEnum.auto() 1a

96 FAILED = AutoEnum.auto() 1a

97 CANCELLED = AutoEnum.auto() 1a

98 CRASHED = AutoEnum.auto() 1a

99 PAUSED = AutoEnum.auto() 1a

100 CANCELLING = AutoEnum.auto() 1a

101 

102 

103TERMINAL_STATES: set[StateType] = { 1a

104 StateType.COMPLETED, 

105 StateType.CANCELLED, 

106 StateType.FAILED, 

107 StateType.CRASHED, 

108} 

109 

110 

111class WorkPoolStatus(AutoEnum): 1a

112 """Enumeration of work pool statuses.""" 

113 

114 READY = AutoEnum.auto() 1a

115 NOT_READY = AutoEnum.auto() 1a

116 PAUSED = AutoEnum.auto() 1a

117 

118 @property 1a

119 def display_name(self) -> str: 1a

120 return self.name.replace("_", " ").capitalize() 

121 

122 

123class WorkerStatus(AutoEnum): 1a

124 """Enumeration of worker statuses.""" 

125 

126 ONLINE = AutoEnum.auto() 1a

127 OFFLINE = AutoEnum.auto() 1a

128 

129 

130class DeploymentStatus(AutoEnum): 1a

131 """Enumeration of deployment statuses.""" 

132 

133 READY = AutoEnum.auto() 1a

134 NOT_READY = AutoEnum.auto() 1a

135 

136 

137class WorkQueueStatus(AutoEnum): 1a

138 """Enumeration of work queue statuses.""" 

139 

140 READY = AutoEnum.auto() 1a

141 NOT_READY = AutoEnum.auto() 1a

142 PAUSED = AutoEnum.auto() 1a

143 

144 

145class ConcurrencyLimitStrategy(AutoEnum): 1a

146 """Enumeration of concurrency limit strategies.""" 

147 

148 ENQUEUE = AutoEnum.auto() 1a

149 CANCEL_NEW = AutoEnum.auto() 1a

150 

151 

152class ConcurrencyOptions(PrefectBaseModel): 1a

153 """ 

154 Class for storing the concurrency config in database. 

155 """ 

156 

157 collision_strategy: ConcurrencyLimitStrategy 1a

158 

159 

160class ConcurrencyLimitConfig(PrefectBaseModel): 1a

161 """ 

162 Class for storing the concurrency limit config in database. 

163 """ 

164 

165 limit: int 1a

166 collision_strategy: ConcurrencyLimitStrategy = ConcurrencyLimitStrategy.ENQUEUE 1a

167 

168 

169class ConcurrencyLeaseHolder(PrefectBaseModel): 1a

170 """Model for validating concurrency lease holder information.""" 

171 

172 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

173 

174 type: Literal["flow_run", "task_run", "deployment"] 1a

175 id: UUID 1a

176 

177 

178class StateDetails(PrefectBaseModel): 1a

179 flow_run_id: Optional[UUID] = None 1a

180 task_run_id: Optional[UUID] = None 1a

181 child_flow_run_id: Optional[UUID] = None 1a

182 scheduled_time: Optional[DateTime] = None 1a

183 cache_key: Optional[str] = None 1a

184 cache_expiration: Optional[DateTime] = None 1a

185 deferred: Optional[bool] = None 1a

186 untrackable_result: bool = False 1a

187 pause_timeout: Optional[DateTime] = None 1a

188 pause_reschedule: bool = False 1a

189 pause_key: Optional[str] = None 1a

190 run_input_keyset: Optional[dict[str, str]] = None 1a

191 refresh_cache: Optional[bool] = None 1a

192 retriable: Optional[bool] = None 1a

193 transition_id: Optional[UUID] = None 1a

194 task_parameters_id: Optional[UUID] = None 1a

195 # Captures the trace_id and span_id of the span where this state was created 

196 traceparent: Optional[str] = None 1a

197 # The ID of the lease that is currently holding the deployment concurrency slot 

198 # for this run. 

199 deployment_concurrency_lease_id: Optional[UUID] = None 1a

200 

201 def to_run_result( 1a

202 self, run_type: RunType 

203 ) -> Optional[Union[FlowRunResult, TaskRunResult]]: 

204 if run_type == run_type.FLOW_RUN and self.flow_run_id: 

205 return FlowRunResult(id=self.flow_run_id) 

206 elif run_type == run_type.TASK_RUN and self.task_run_id: 

207 return TaskRunResult(id=self.task_run_id) 

208 else: 

209 return None 

210 

211 

212def data_discriminator(x: Any) -> str: 1a

213 if isinstance(x, dict) and "storage_key" in x: 

214 return "ResultRecordMetadata" 

215 return "Any" 

216 

217 

218class State(TimeSeriesBaseModel, ObjectBaseModel, Generic[R]): 1a

219 """ 

220 The state of a run. 

221 """ 

222 

223 type: StateType 1a

224 name: Optional[str] = Field(default=None) 1a

225 timestamp: datetime.datetime = Field(default_factory=lambda: now("UTC")) 1a

226 message: Optional[str] = Field(default=None, examples=["Run started"]) 1a

227 state_details: StateDetails = Field(default_factory=StateDetails) 1a

228 data: Annotated[ 1a

229 Union[ 

230 Annotated["ResultRecordMetadata", Tag("ResultRecordMetadata")], 

231 Annotated[Any, Tag("Any")], 

232 ], 

233 Discriminator(data_discriminator), 

234 ] = Field(default=None) 

235 

236 @overload 1a

237 async def aresult( 237 ↛ exitline 237 didn't return from function 'aresult' because 1a

238 self: "State[R]", 

239 raise_on_failure: Literal[True] = ..., 

240 retry_result_failure: bool = ..., 

241 ) -> R: ... 

242 

243 @overload 1a

244 async def aresult( 244 ↛ exitline 244 didn't return from function 'aresult' because 1a

245 self: "State[R]", 

246 raise_on_failure: Literal[False] = False, 

247 retry_result_failure: bool = ..., 

248 ) -> Union[R, Exception]: ... 

249 

250 @overload 1a

251 async def aresult( 251 ↛ exitline 251 didn't return from function 'aresult' because 1a

252 self: "State[R]", 

253 raise_on_failure: bool = ..., 

254 retry_result_failure: bool = ..., 

255 ) -> Union[R, Exception]: ... 

256 

257 async def aresult( 1a

258 self, 

259 raise_on_failure: bool = True, 

260 retry_result_failure: bool = True, 

261 ) -> Union[R, Exception]: 

262 """ 

263 Retrieve the result attached to this state. 

264 """ 

265 from prefect.states import get_state_result 

266 

267 return await get_state_result( 

268 self, 

269 raise_on_failure=raise_on_failure, 

270 retry_result_failure=retry_result_failure, 

271 ) 

272 

273 @overload 1a

274 def result( 274 ↛ exitline 274 didn't return from function 'result' because 1a

275 self: "State[R]", 

276 raise_on_failure: Literal[True] = ..., 

277 retry_result_failure: bool = ..., 

278 ) -> R: ... 

279 

280 @overload 1a

281 def result( 281 ↛ exitline 281 didn't return from function 'result' because 1a

282 self: "State[R]", 

283 raise_on_failure: Literal[False] = False, 

284 retry_result_failure: bool = ..., 

285 ) -> Union[R, Exception]: ... 

286 

287 @overload 1a

288 def result( 288 ↛ exitline 288 didn't return from function 'result' because 1a

289 self: "State[R]", 

290 raise_on_failure: bool = ..., 

291 retry_result_failure: bool = ..., 

292 ) -> Union[R, Exception]: ... 

293 

294 @async_dispatch(aresult) 1a

295 def result( 1a

296 self, 

297 raise_on_failure: bool = True, 

298 retry_result_failure: bool = True, 

299 ) -> Union[R, Exception]: 

300 """ 

301 Retrieve the result attached to this state. 

302 

303 Args: 

304 raise_on_failure: a boolean specifying whether to raise an exception 

305 if the state is of type `FAILED` and the underlying data is an exception. When flow 

306 was run in a different memory space (using `run_deployment`), this will only raise 

307 if `fetch` is `True`. 

308 retry_result_failure: a boolean specifying whether to retry on failures to 

309 load the result from result storage 

310 

311 Raises: 

312 TypeError: If the state is failed but the result is not an exception. 

313 

314 Returns: 

315 The result of the run 

316 

317 Examples: 

318 Get the result from a flow state 

319 

320 ```python 

321 @flow 

322 def my_flow(): 

323 return "hello" 

324 my_flow(return_state=True).result() 

325 # hello 

326 ``` 

327 

328 Get the result from a failed state 

329 

330 ```python 

331 @flow 

332 def my_flow(): 

333 raise ValueError("oh no!") 

334 state = my_flow(return_state=True) # Error is wrapped in FAILED state 

335 state.result() # Raises `ValueError` 

336 ``` 

337 

338 Get the result from a failed state without erroring 

339 

340 ```python 

341 @flow 

342 def my_flow(): 

343 raise ValueError("oh no!") 

344 state = my_flow(return_state=True) 

345 result = state.result(raise_on_failure=False) 

346 print(result) 

347 # ValueError("oh no!") 

348 ``` 

349 

350 

351 Get the result from a flow state in an async context 

352 

353 ```python 

354 @flow 

355 async def my_flow(): 

356 return "hello" 

357 state = await my_flow(return_state=True) 

358 await state.result() 

359 # hello 

360 ``` 

361 

362 Get the result with `raise_on_failure` from a flow run in a different memory space 

363 

364 ```python 

365 @flow 

366 async def my_flow(): 

367 raise ValueError("oh no!") 

368 my_flow.deploy("my_deployment/my_flow") 

369 flow_run = run_deployment("my_deployment/my_flow") 

370 await flow_run.state.result(raise_on_failure=True) # Raises `ValueError("oh no!")` 

371 ``` 

372 """ 

373 from prefect.states import get_state_result 

374 

375 return run_coro_as_sync( 

376 get_state_result( 

377 self, 

378 raise_on_failure=raise_on_failure, 

379 retry_result_failure=retry_result_failure, 

380 ) 

381 ) 

382 

383 @model_validator(mode="after") 1a

384 def default_name_from_type(self) -> Self: 1a

385 """If a name is not provided, use the type""" 

386 # if `type` is not in `values` it means the `type` didn't pass its own 

387 # validation check and an error will be raised after this function is called 

388 name = self.name 

389 if name is None and self.type: 

390 self.name = " ".join([v.capitalize() for v in self.type.split("_")]) 

391 return self 

392 

393 @model_validator(mode="after") 1a

394 def default_scheduled_start_time(self) -> Self: 1a

395 if self.type == StateType.SCHEDULED: 

396 if not self.state_details.scheduled_time: 

397 self.state_details.scheduled_time = now("UTC") # pyright: ignore[reportAttributeAccessIssue] DateTime is split into two types depending on Python version 

398 return self 

399 

400 @model_validator(mode="after") 1a

401 def set_unpersisted_results_to_none(self) -> Self: 1a

402 if isinstance(self.data, dict) and self.data.get("type") == "unpersisted": # pyright: ignore[reportUnknownMemberType] unable to narrow dict type 

403 self.data = None 

404 return self 

405 

406 def is_scheduled(self) -> bool: 1a

407 return self.type == StateType.SCHEDULED 

408 

409 def is_pending(self) -> bool: 1a

410 return self.type == StateType.PENDING 

411 

412 def is_running(self) -> bool: 1a

413 return self.type == StateType.RUNNING 

414 

415 def is_completed(self) -> bool: 1a

416 return self.type == StateType.COMPLETED 

417 

418 def is_failed(self) -> bool: 1a

419 return self.type == StateType.FAILED 

420 

421 def is_crashed(self) -> bool: 1a

422 return self.type == StateType.CRASHED 

423 

424 def is_cancelled(self) -> bool: 1a

425 return self.type == StateType.CANCELLED 

426 

427 def is_cancelling(self) -> bool: 1a

428 return self.type == StateType.CANCELLING 

429 

430 def is_final(self) -> bool: 1a

431 return self.type in TERMINAL_STATES 

432 

433 def is_paused(self) -> bool: 1a

434 return self.type == StateType.PAUSED 

435 

436 def model_copy( 1a

437 self, *, update: Optional[Mapping[str, Any]] = None, deep: bool = False 

438 ) -> Self: 

439 """ 

440 Copying API models should return an object that could be inserted into the 

441 database again. The 'timestamp' is reset using the default factory. 

442 """ 

443 update = { 

444 "timestamp": type(self).model_fields["timestamp"].get_default(), 

445 **(update or {}), 

446 } 

447 return super().model_copy(update=update, deep=deep) 

448 

449 def fresh_copy(self, **kwargs: Any) -> Self: 1a

450 """ 

451 Return a fresh copy of the state with a new ID. 

452 """ 

453 return self.model_copy( 

454 update={ 

455 "id": uuid7(), 

456 "created": now("UTC"), 

457 "updated": now("UTC"), 

458 "timestamp": now("UTC"), 

459 }, 

460 **kwargs, 

461 ) 

462 

463 def __repr__(self) -> str: 1a

464 """ 

465 Generates a complete state representation appropriate for introspection 

466 and debugging, including the result: 

467 

468 `MyCompletedState(message="my message", type=COMPLETED, result=...)` 

469 """ 

470 result = self.data 

471 

472 display = dict( 

473 message=repr(self.message), 

474 type=str(self.type.value), 

475 result=repr(result), 

476 ) 

477 

478 return f"{self.name}({', '.join(f'{k}={v}' for k, v in display.items())})" 

479 

480 def __str__(self) -> str: 1a

481 """ 

482 Generates a simple state representation appropriate for logging: 

483 

484 `MyCompletedState("my message", type=COMPLETED)` 

485 """ 

486 

487 display: list[str] = [] 

488 

489 if self.message: 

490 display.append(repr(self.message)) 

491 

492 if TYPE_CHECKING: 

493 assert self.name is not None 

494 if self.type.lower() != self.name.lower(): 

495 display.append(f"type={self.type.value}") 

496 

497 return f"{self.name}({', '.join(display)})" 

498 

499 def __hash__(self) -> int: 1a

500 return hash( 

501 ( 

502 getattr(self.state_details, "flow_run_id", None), 

503 getattr(self.state_details, "task_run_id", None), 

504 self.timestamp, 

505 self.type, 

506 ) 

507 ) 

508 

509 

510class FlowRunPolicy(PrefectBaseModel): 1a

511 """Defines of how a flow run should be orchestrated.""" 

512 

513 max_retries: int = Field( 1a

514 default=0, 

515 description=( 

516 "The maximum number of retries. Field is not used. Please use `retries`" 

517 " instead." 

518 ), 

519 deprecated=True, 

520 ) 

521 retry_delay_seconds: float = Field( 1a

522 default=0, 

523 description=( 

524 "The delay between retries. Field is not used. Please use `retry_delay`" 

525 " instead." 

526 ), 

527 deprecated=True, 

528 ) 

529 retries: Optional[int] = Field(default=None, description="The number of retries.") 1a

530 retry_delay: Optional[int] = Field( 1a

531 default=None, description="The delay time between retries, in seconds." 

532 ) 

533 pause_keys: Optional[set[str]] = Field( 1a

534 default_factory=set, description="Tracks pauses this run has observed." 

535 ) 

536 resuming: Optional[bool] = Field( 1a

537 default=False, description="Indicates if this run is resuming from a pause." 

538 ) 

539 retry_type: Optional[Literal["in_process", "reschedule"]] = Field( 1a

540 default=None, description="The type of retry this run is undergoing." 

541 ) 

542 

543 @model_validator(mode="before") 1a

544 @classmethod 1a

545 def populate_deprecated_fields(cls, values: Any) -> Any: 1a

546 if isinstance(values, dict): 

547 return set_run_policy_deprecated_fields(values) # pyright: ignore[reportUnknownVariableType, reportUnknownArgumentType] unable to narrow dict type 

548 return values 

549 

550 

551class FlowRun(TimeSeriesBaseModel, ObjectBaseModel): 1a

552 name: str = Field( 1a

553 default_factory=lambda: generate_slug(2), 

554 description=( 

555 "The name of the flow run. Defaults to a random slug if not specified." 

556 ), 

557 examples=["my-flow-run"], 

558 ) 

559 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a

560 state_id: Optional[UUID] = Field( 1a

561 default=None, description="The id of the flow run's current state." 

562 ) 

563 deployment_id: Optional[UUID] = Field( 1a

564 default=None, 

565 description=( 

566 "The id of the deployment associated with this flow run, if available." 

567 ), 

568 ) 

569 deployment_version: Optional[str] = Field( 1a

570 default=None, 

571 description="The version of the deployment associated with this flow run.", 

572 examples=["1.0"], 

573 ) 

574 work_queue_name: Optional[str] = Field( 1a

575 default=None, description="The work queue that handled this flow run." 

576 ) 

577 flow_version: Optional[str] = Field( 1a

578 default=None, 

579 description="The version of the flow executed in this flow run.", 

580 examples=["1.0"], 

581 ) 

582 parameters: dict[str, Any] = Field( 1a

583 default_factory=dict, description="Parameters for the flow run." 

584 ) 

585 idempotency_key: Optional[str] = Field( 1a

586 default=None, 

587 description=( 

588 "An optional idempotency key for the flow run. Used to ensure the same flow" 

589 " run is not created multiple times." 

590 ), 

591 ) 

592 context: dict[str, Any] = Field( 1a

593 default_factory=dict, 

594 description="Additional context for the flow run.", 

595 examples=[{"my_var": "my_val"}], 

596 ) 

597 empirical_policy: FlowRunPolicy = Field( 1a

598 default_factory=FlowRunPolicy, 

599 ) 

600 tags: list[str] = Field( 1a

601 default_factory=list, 

602 description="A list of tags on the flow run", 

603 examples=[["tag-1", "tag-2"]], 

604 ) 

605 labels: KeyValueLabelsField = Field(default_factory=dict) 1a

606 parent_task_run_id: Optional[UUID] = Field( 1a

607 default=None, 

608 description=( 

609 "If the flow run is a subflow, the id of the 'dummy' task in the parent" 

610 " flow used to track subflow state." 

611 ), 

612 ) 

613 run_count: int = Field( 1a

614 default=0, description="The number of times the flow run was executed." 

615 ) 

616 expected_start_time: Optional[DateTime] = Field( 1a

617 default=None, 

618 description="The flow run's expected start time.", 

619 ) 

620 next_scheduled_start_time: Optional[DateTime] = Field( 1a

621 default=None, 

622 description="The next time the flow run is scheduled to start.", 

623 ) 

624 start_time: Optional[DateTime] = Field( 1a

625 default=None, description="The actual start time." 

626 ) 

627 end_time: Optional[DateTime] = Field( 1a

628 default=None, description="The actual end time." 

629 ) 

630 total_run_time: datetime.timedelta = Field( 1a

631 default=datetime.timedelta(0), 

632 description=( 

633 "Total run time. If the flow run was executed multiple times, the time of" 

634 " each run will be summed." 

635 ), 

636 ) 

637 estimated_run_time: datetime.timedelta = Field( 1a

638 default=datetime.timedelta(0), 

639 description="A real-time estimate of the total run time.", 

640 ) 

641 estimated_start_time_delta: datetime.timedelta = Field( 1a

642 default=datetime.timedelta(0), 

643 description="The difference between actual and expected start time.", 

644 ) 

645 auto_scheduled: bool = Field( 1a

646 default=False, 

647 description="Whether or not the flow run was automatically scheduled.", 

648 ) 

649 infrastructure_document_id: Optional[UUID] = Field( 1a

650 default=None, 

651 description="The block document defining infrastructure to use this flow run.", 

652 ) 

653 infrastructure_pid: Optional[str] = Field( 1a

654 default=None, 

655 description="The id of the flow run as returned by an infrastructure block.", 

656 ) 

657 created_by: Optional[CreatedBy] = Field( 1a

658 default=None, 

659 description="Optional information about the creator of this flow run.", 

660 ) 

661 work_queue_id: Optional[UUID] = Field( 1a

662 default=None, description="The id of the run's work pool queue." 

663 ) 

664 

665 work_pool_id: Optional[UUID] = Field( 1a

666 default=None, description="The work pool with which the queue is associated." 

667 ) 

668 work_pool_name: Optional[str] = Field( 1a

669 default=None, 

670 description="The name of the flow run's work pool.", 

671 examples=["my-work-pool"], 

672 ) 

673 state: Optional[State] = Field( 1a

674 default=None, 

675 description="The state of the flow run.", 

676 examples=["State(type=StateType.COMPLETED)"], 

677 ) 

678 job_variables: Optional[dict[str, Any]] = Field( 1a

679 default=None, 

680 description="Job variables for the flow run.", 

681 ) 

682 

683 # These are server-side optimizations and should not be present on client models 

684 # TODO: Deprecate these fields 

685 

686 state_type: Optional[StateType] = Field( 1a

687 default=None, description="The type of the current flow run state." 

688 ) 

689 state_name: Optional[str] = Field( 1a

690 default=None, description="The name of the current flow run state." 

691 ) 

692 

693 def __eq__(self, other: Any) -> bool: 1a

694 """ 

695 Check for "equality" to another flow run schema 

696 

697 Estimates times are rolling and will always change with repeated queries for 

698 a flow run so we ignore them during equality checks. 

699 """ 

700 if isinstance(other, FlowRun): 

701 exclude_fields = {"estimated_run_time", "estimated_start_time_delta"} 

702 return self.model_dump(exclude=exclude_fields) == other.model_dump( 

703 exclude=exclude_fields 

704 ) 

705 return super().__eq__(other) 

706 

707 @field_validator("name", mode="before") 1a

708 @classmethod 1a

709 def set_default_name(cls, name: Optional[str]) -> str: 1a

710 return get_or_create_run_name(name) 

711 

712 

713class TaskRunPolicy(PrefectBaseModel): 1a

714 """Defines of how a task run should retry.""" 

715 

716 max_retries: int = Field( 1a

717 default=0, 

718 description=( 

719 "The maximum number of retries. Field is not used. Please use `retries`" 

720 " instead." 

721 ), 

722 deprecated=True, 

723 ) 

724 retry_delay_seconds: float = Field( 1a

725 default=0, 

726 description=( 

727 "The delay between retries. Field is not used. Please use `retry_delay`" 

728 " instead." 

729 ), 

730 deprecated=True, 

731 ) 

732 retries: Optional[int] = Field(default=None, description="The number of retries.") 1a

733 retry_delay: Union[None, int, float, list[int], list[float]] = Field( 1a

734 default=None, 

735 description="A delay time or list of delay times between retries, in seconds.", 

736 ) 

737 retry_jitter_factor: Optional[float] = Field( 1a

738 default=None, description="Determines the amount a retry should jitter" 

739 ) 

740 

741 @model_validator(mode="after") 1a

742 def populate_deprecated_fields(self): 1a

743 """ 

744 If deprecated fields are provided, populate the corresponding new fields 

745 to preserve orchestration behavior. 

746 """ 

747 # We have marked these fields as deprecated, so we need to filter out the 

748 # deprecation warnings _we're_ generating here 

749 with warnings.catch_warnings(): 

750 warnings.simplefilter("ignore", DeprecationWarning) 

751 

752 if not self.retries and self.max_retries != 0: 

753 self.retries = self.max_retries 

754 

755 if not self.retry_delay and self.retry_delay_seconds != 0: 

756 self.retry_delay = int(self.retry_delay_seconds) 

757 

758 return self 

759 

760 @field_validator("retry_delay") 1a

761 @classmethod 1a

762 def validate_configured_retry_delays( 1a

763 cls, v: Optional[int | float | list[int] | list[float]] 

764 ) -> Optional[int | float | list[int] | list[float]]: 

765 return list_length_50_or_less(v) 

766 

767 @field_validator("retry_jitter_factor") 1a

768 @classmethod 1a

769 def validate_jitter_factor(cls, v: Optional[float]) -> Optional[float]: 1a

770 return validate_not_negative(v) 

771 

772 

773class RunInput(PrefectBaseModel): 1a

774 """ 

775 Base class for classes that represent inputs to task runs, which 

776 could include, constants, parameters, or other task runs. 

777 """ 

778 

779 model_config: ClassVar[ConfigDict] = ConfigDict(frozen=True) 1a

780 

781 if not TYPE_CHECKING: 781 ↛ exitline 781 didn't exit class 'RunInput' because the condition on line 781 was always true1a

782 # subclasses provide the concrete type for this field 

783 input_type: str 1a

784 

785 

786class TaskRunResult(RunInput): 1a

787 """Represents a task run result input to another task run.""" 

788 

789 input_type: Literal["task_run"] = "task_run" 1a

790 id: UUID 1a

791 

792 

793class FlowRunResult(RunInput): 1a

794 input_type: Literal["flow_run"] = "flow_run" 1a

795 id: UUID 1a

796 

797 

798class Parameter(RunInput): 1a

799 """Represents a parameter input to a task run.""" 

800 

801 input_type: Literal["parameter"] = "parameter" 1a

802 name: str 1a

803 

804 

805class Constant(RunInput): 1a

806 """Represents constant input value to a task run.""" 

807 

808 input_type: Literal["constant"] = "constant" 1a

809 type: str 1a

810 

811 

812class TaskRun(TimeSeriesBaseModel, ObjectBaseModel): 1a

813 name: str = Field( 1a

814 default_factory=lambda: generate_slug(2), examples=["my-task-run"] 

815 ) 

816 flow_run_id: Optional[UUID] = Field( 1a

817 default=None, description="The flow run id of the task run." 

818 ) 

819 task_key: str = Field( 1a

820 default=..., description="A unique identifier for the task being run." 

821 ) 

822 dynamic_key: str = Field( 1a

823 default=..., 

824 description=( 

825 "A dynamic key used to differentiate between multiple runs of the same task" 

826 " within the same flow run." 

827 ), 

828 ) 

829 cache_key: Optional[str] = Field( 1a

830 default=None, 

831 description=( 

832 "An optional cache key. If a COMPLETED state associated with this cache key" 

833 " is found, the cached COMPLETED state will be used instead of executing" 

834 " the task run." 

835 ), 

836 ) 

837 cache_expiration: Optional[DateTime] = Field( 1a

838 default=None, description="Specifies when the cached state should expire." 

839 ) 

840 task_version: Optional[str] = Field( 1a

841 default=None, description="The version of the task being run." 

842 ) 

843 empirical_policy: TaskRunPolicy = Field( 1a

844 default_factory=TaskRunPolicy, 

845 ) 

846 tags: list[str] = Field( 1a

847 default_factory=list, 

848 description="A list of tags for the task run.", 

849 examples=[["tag-1", "tag-2"]], 

850 ) 

851 labels: KeyValueLabelsField = Field(default_factory=dict) 1a

852 state_id: Optional[UUID] = Field( 1a

853 default=None, description="The id of the current task run state." 

854 ) 

855 task_inputs: dict[ 1a

856 str, list[Union[TaskRunResult, FlowRunResult, Parameter, Constant]] 

857 ] = Field( 

858 default_factory=dict, 

859 description=( 

860 "Tracks the source of inputs to a task run. Used for internal bookkeeping. " 

861 "Note the special __parents__ key, used to indicate a parent/child " 

862 "relationship that may or may not include an input or wait_for semantic." 

863 ), 

864 ) 

865 state_type: Optional[StateType] = Field( 1a

866 default=None, description="The type of the current task run state." 

867 ) 

868 state_name: Optional[str] = Field( 1a

869 default=None, description="The name of the current task run state." 

870 ) 

871 run_count: int = Field( 1a

872 default=0, description="The number of times the task run has been executed." 

873 ) 

874 flow_run_run_count: int = Field( 1a

875 default=0, 

876 description=( 

877 "If the parent flow has retried, this indicates the flow retry this run is" 

878 " associated with." 

879 ), 

880 ) 

881 expected_start_time: Optional[DateTime] = Field( 1a

882 default=None, 

883 description="The task run's expected start time.", 

884 ) 

885 

886 # the next scheduled start time will be populated 

887 # whenever the run is in a scheduled state 

888 next_scheduled_start_time: Optional[DateTime] = Field( 1a

889 default=None, 

890 description="The next time the task run is scheduled to start.", 

891 ) 

892 start_time: Optional[DateTime] = Field( 1a

893 default=None, description="The actual start time." 

894 ) 

895 end_time: Optional[DateTime] = Field( 1a

896 default=None, description="The actual end time." 

897 ) 

898 total_run_time: datetime.timedelta = Field( 1a

899 default=datetime.timedelta(0), 

900 description=( 

901 "Total run time. If the task run was executed multiple times, the time of" 

902 " each run will be summed." 

903 ), 

904 ) 

905 estimated_run_time: datetime.timedelta = Field( 1a

906 default=datetime.timedelta(0), 

907 description="A real-time estimate of total run time.", 

908 ) 

909 estimated_start_time_delta: datetime.timedelta = Field( 1a

910 default=datetime.timedelta(0), 

911 description="The difference between actual and expected start time.", 

912 ) 

913 

914 state: Optional[State] = Field( 1a

915 default=None, 

916 description="The state of the task run.", 

917 examples=["State(type=StateType.COMPLETED)"], 

918 ) 

919 

920 @field_validator("name", mode="before") 1a

921 @classmethod 1a

922 def set_default_name(cls, name: Optional[str]) -> Name: 1a

923 return get_or_create_run_name(name) 

924 

925 

926class Workspace(PrefectBaseModel): 1a

927 """ 

928 A Prefect Cloud workspace. 

929 

930 Expected payload for each workspace returned by the `me/workspaces` route. 

931 """ 

932 

933 account_id: UUID = Field(..., description="The account id of the workspace.") 1a

934 account_name: str = Field(..., description="The account name.") 1a

935 account_handle: str = Field(..., description="The account's unique handle.") 1a

936 workspace_id: UUID = Field(..., description="The workspace id.") 1a

937 workspace_name: str = Field(..., description="The workspace name.") 1a

938 workspace_description: str = Field(..., description="Description of the workspace.") 1a

939 workspace_handle: str = Field(..., description="The workspace's unique handle.") 1a

940 model_config: ClassVar[ConfigDict] = ConfigDict(extra="ignore") 1a

941 

942 @property 1a

943 def handle(self) -> str: 1a

944 """ 

945 The full handle of the workspace as `account_handle` / `workspace_handle` 

946 """ 

947 return self.account_handle + "/" + self.workspace_handle 

948 

949 def api_url(self) -> str: 1a

950 """ 

951 Generate the API URL for accessing this workspace 

952 """ 

953 return ( 

954 f"{PREFECT_CLOUD_API_URL.value()}" 

955 f"/accounts/{self.account_id}" 

956 f"/workspaces/{self.workspace_id}" 

957 ) 

958 

959 def ui_url(self) -> str: 1a

960 """ 

961 Generate the UI URL for accessing this workspace 

962 """ 

963 return ( 

964 f"{PREFECT_CLOUD_UI_URL.value()}" 

965 f"/account/{self.account_id}" 

966 f"/workspace/{self.workspace_id}" 

967 ) 

968 

969 def __hash__(self) -> int: 1a

970 return hash(self.handle) 

971 

972 

973class IPAllowlistEntry(PrefectBaseModel): 1a

974 ip_network: IPvAnyNetwork 1a

975 enabled: bool 1a

976 description: Optional[str] = Field( 1a

977 default=None, description="A description of the IP entry." 

978 ) 

979 last_seen: Optional[str] = Field( 1a

980 default=None, 

981 description="The last time this IP was seen accessing Prefect Cloud.", 

982 ) 

983 

984 

985class IPAllowlist(PrefectBaseModel): 1a

986 """ 

987 A Prefect Cloud IP allowlist. 

988 

989 Expected payload for an IP allowlist from the Prefect Cloud API. 

990 """ 

991 

992 entries: list[IPAllowlistEntry] 1a

993 

994 

995class IPAllowlistMyAccessResponse(PrefectBaseModel): 1a

996 """Expected payload for an IP allowlist access response from the Prefect Cloud API.""" 

997 

998 allowed: bool 1a

999 detail: str 1a

1000 

1001 

1002class BlockType(ObjectBaseModel): 1a

1003 """An ORM representation of a block type""" 

1004 

1005 name: Name = Field(default=..., description="A block type's name") 1a

1006 slug: str = Field(default=..., description="A block type's slug") 1a

1007 logo_url: Optional[HttpUrl] = Field( 1a

1008 default=None, description="Web URL for the block type's logo" 

1009 ) 

1010 documentation_url: Optional[HttpUrl] = Field( 1a

1011 default=None, description="Web URL for the block type's documentation" 

1012 ) 

1013 description: Optional[str] = Field( 1a

1014 default=None, 

1015 description="A short blurb about the corresponding block's intended use", 

1016 ) 

1017 code_example: Optional[str] = Field( 1a

1018 default=None, 

1019 description="A code snippet demonstrating use of the corresponding block", 

1020 ) 

1021 is_protected: bool = Field( 1a

1022 default=False, description="Protected block types cannot be modified via API." 

1023 ) 

1024 

1025 

1026class BlockSchema(ObjectBaseModel): 1a

1027 """A representation of a block schema.""" 

1028 

1029 checksum: str = Field(default=..., description="The block schema's unique checksum") 1a

1030 fields: dict[str, Any] = Field( 1a

1031 default_factory=dict, description="The block schema's field schema" 

1032 ) 

1033 block_type_id: Optional[UUID] = Field(default=..., description="A block type ID") 1a

1034 block_type: Optional[BlockType] = Field( 1a

1035 default=None, description="The associated block type" 

1036 ) 

1037 capabilities: list[str] = Field( 1a

1038 default_factory=list, 

1039 description="A list of Block capabilities", 

1040 ) 

1041 version: str = Field( 1a

1042 default=DEFAULT_BLOCK_SCHEMA_VERSION, 

1043 description="Human readable identifier for the block schema", 

1044 ) 

1045 

1046 

1047class BlockDocument(ObjectBaseModel): 1a

1048 """An ORM representation of a block document.""" 

1049 

1050 name: Optional[BlockDocumentName] = Field( 1a

1051 default=None, 

1052 description=( 

1053 "The block document's name. Not required for anonymous block documents." 

1054 ), 

1055 ) 

1056 data: dict[str, Any] = Field( 1a

1057 default_factory=dict, description="The block document's data" 

1058 ) 

1059 block_schema_id: UUID = Field(default=..., description="A block schema ID") 1a

1060 block_schema: Optional[BlockSchema] = Field( 1a

1061 default=None, description="The associated block schema" 

1062 ) 

1063 block_type_id: UUID = Field(default=..., description="A block type ID") 1a

1064 block_type_name: Optional[str] = Field(None, description="A block type name") 1a

1065 block_type: Optional[BlockType] = Field( 1a

1066 default=None, description="The associated block type" 

1067 ) 

1068 block_document_references: dict[str, dict[str, Any]] = Field( 1a

1069 default_factory=dict, description="Record of the block document's references" 

1070 ) 

1071 is_anonymous: bool = Field( 1a

1072 default=False, 

1073 description=( 

1074 "Whether the block is anonymous (anonymous blocks are usually created by" 

1075 " Prefect automatically)" 

1076 ), 

1077 ) 

1078 

1079 @model_validator(mode="before") 1a

1080 @classmethod 1a

1081 def validate_name_is_present_if_not_anonymous( 1a

1082 cls, values: dict[str, Any] 

1083 ) -> dict[str, Any]: 

1084 return validate_name_present_on_nonanonymous_blocks(values) 

1085 

1086 @model_serializer(mode="wrap") 1a

1087 def serialize_data( 1a

1088 self, handler: SerializerFunctionWrapHandler, info: SerializationInfo 

1089 ) -> Any: 

1090 self.data = visit_collection( 

1091 self.data, 

1092 visit_fn=partial(handle_secret_render, context=info.context or {}), 

1093 return_data=True, 

1094 ) 

1095 return handler(self) 

1096 

1097 

1098class Flow(ObjectBaseModel): 1a

1099 """An ORM representation of flow data.""" 

1100 

1101 name: Name = Field( 1a

1102 default=..., description="The name of the flow", examples=["my-flow"] 

1103 ) 

1104 tags: list[str] = Field( 1a

1105 default_factory=list, 

1106 description="A list of flow tags", 

1107 examples=[["tag-1", "tag-2"]], 

1108 ) 

1109 labels: KeyValueLabelsField 1a

1110 

1111 

1112class DeploymentSchedule(ObjectBaseModel): 1a

1113 deployment_id: Optional[UUID] = Field( 1a

1114 default=None, 

1115 description="The deployment id associated with this schedule.", 

1116 ) 

1117 schedule: SCHEDULE_TYPES = Field( 1a

1118 default=..., description="The schedule for the deployment." 

1119 ) 

1120 active: bool = Field( 1a

1121 default=True, description="Whether or not the schedule is active." 

1122 ) 

1123 max_scheduled_runs: Optional[PositiveInteger] = Field( 1a

1124 default=None, 

1125 description="The maximum number of scheduled runs for the schedule.", 

1126 ) 

1127 parameters: dict[str, Any] = Field( 1a

1128 default_factory=dict, 

1129 description="Parameter overrides for the schedule.", 

1130 ) 

1131 slug: Optional[str] = Field( 1a

1132 default=None, 

1133 description="A unique identifier for the schedule.", 

1134 ) 

1135 

1136 

1137class VersionInfo(PrefectBaseModel, extra="allow"): 1a

1138 type: str = Field(default=..., description="The type of version info.") 1a

1139 version: str = Field(default=..., description="The version of the deployment.") 1a

1140 

1141 

1142class BranchingScheduleHandling(str, Enum): 1a

1143 KEEP = "keep" 1a

1144 REMOVE = "remove" 1a

1145 INACTIVE = "inactive" 1a

1146 

1147 

1148class DeploymentBranchingOptions(ObjectBaseModel): 1a

1149 schedule_handling: BranchingScheduleHandling = Field( 1a

1150 default=BranchingScheduleHandling.REMOVE, 

1151 description="Whether to keep, remove, or set inactive the existing schedules when branching", 

1152 ) 

1153 

1154 

1155class Deployment(ObjectBaseModel): 1a

1156 """An ORM representation of deployment data.""" 

1157 

1158 name: Name = Field(default=..., description="The name of the deployment.") 1a

1159 version: Optional[str] = Field( 1a

1160 default=None, description="An optional version for the deployment." 

1161 ) 

1162 version_id: Optional[UUID] = Field( 1a

1163 default=None, description="The ID of the current version of the deployment." 

1164 ) 

1165 version_info: Optional[VersionInfo] = Field( 1a

1166 default=None, description="A description of this version of the deployment." 

1167 ) 

1168 description: Optional[str] = Field( 1a

1169 default=None, description="A description for the deployment." 

1170 ) 

1171 flow_id: UUID = Field( 1a

1172 default=..., description="The flow id associated with the deployment." 

1173 ) 

1174 paused: bool = Field( 1a

1175 default=False, description="Whether or not the deployment is paused." 

1176 ) 

1177 concurrency_limit: Optional[int] = Field( 1a

1178 default=None, description="The concurrency limit for the deployment." 

1179 ) 

1180 schedules: list[DeploymentSchedule] = Field( 1a

1181 default_factory=lambda: [], 

1182 description="A list of schedules for the deployment.", 

1183 ) 

1184 job_variables: dict[str, Any] = Field( 1a

1185 default_factory=dict, 

1186 description="Overrides to apply to flow run infrastructure at runtime.", 

1187 ) 

1188 parameters: dict[str, Any] = Field( 1a

1189 default_factory=dict, 

1190 description="Parameters for flow runs scheduled by the deployment.", 

1191 ) 

1192 pull_steps: Optional[list[dict[str, Any]]] = Field( 1a

1193 default=None, 

1194 description="Pull steps for cloning and running this deployment.", 

1195 ) 

1196 tags: list[str] = Field( 1a

1197 default_factory=list, 

1198 description="A list of tags for the deployment", 

1199 examples=[["tag-1", "tag-2"]], 

1200 ) 

1201 labels: KeyValueLabelsField 1a

1202 work_queue_name: Optional[str] = Field( 1a

1203 default=None, 

1204 description=( 

1205 "The work queue for the deployment. If no work queue is set, work will not" 

1206 " be scheduled." 

1207 ), 

1208 ) 

1209 last_polled: Optional[DateTime] = Field( 1a

1210 default=None, 

1211 description="The last time the deployment was polled for status updates.", 

1212 ) 

1213 parameter_openapi_schema: Optional[dict[str, Any]] = Field( 1a

1214 default=None, 

1215 description="The parameter schema of the flow, including defaults.", 

1216 ) 

1217 path: Optional[str] = Field( 1a

1218 default=None, 

1219 description=( 

1220 "The path to the working directory for the workflow, relative to remote" 

1221 " storage or an absolute path." 

1222 ), 

1223 ) 

1224 entrypoint: Optional[str] = Field( 1a

1225 default=None, 

1226 description=( 

1227 "The path to the entrypoint for the workflow, relative to the `path`." 

1228 ), 

1229 ) 

1230 storage_document_id: Optional[UUID] = Field( 1a

1231 default=None, 

1232 description="The block document defining storage used for this flow.", 

1233 ) 

1234 infrastructure_document_id: Optional[UUID] = Field( 1a

1235 default=None, 

1236 description="The block document defining infrastructure to use for flow runs.", 

1237 ) 

1238 created_by: Optional[CreatedBy] = Field( 1a

1239 default=None, 

1240 description="Optional information about the creator of this deployment.", 

1241 ) 

1242 updated_by: Optional[UpdatedBy] = Field( 1a

1243 default=None, 

1244 description="Optional information about the updater of this deployment.", 

1245 ) 

1246 work_queue_id: Optional[UUID] = Field( 1a

1247 default=None, 

1248 description=( 

1249 "The id of the work pool queue to which this deployment is assigned." 

1250 ), 

1251 ) 

1252 enforce_parameter_schema: bool = Field( 1a

1253 default=True, 

1254 description=( 

1255 "Whether or not the deployment should enforce the parameter schema." 

1256 ), 

1257 ) 

1258 

1259 

1260class ConcurrencyLimit(ObjectBaseModel): 1a

1261 """An ORM representation of a concurrency limit.""" 

1262 

1263 tag: str = Field( 1a

1264 default=..., description="A tag the concurrency limit is applied to." 

1265 ) 

1266 concurrency_limit: int = Field(default=..., description="The concurrency limit.") 1a

1267 active_slots: list[UUID] = Field( 1a

1268 default_factory=lambda: [], 

1269 description="A list of active run ids using a concurrency slot", 

1270 ) 

1271 

1272 

1273class BlockSchemaReference(ObjectBaseModel): 1a

1274 """An ORM representation of a block schema reference.""" 

1275 

1276 parent_block_schema_id: UUID = Field( 1a

1277 default=..., description="ID of block schema the reference is nested within" 

1278 ) 

1279 parent_block_schema: Optional[BlockSchema] = Field( 1a

1280 default=None, description="The block schema the reference is nested within" 

1281 ) 

1282 reference_block_schema_id: UUID = Field( 1a

1283 default=..., description="ID of the nested block schema" 

1284 ) 

1285 reference_block_schema: Optional[BlockSchema] = Field( 1a

1286 default=None, description="The nested block schema" 

1287 ) 

1288 name: str = Field( 1a

1289 default=..., description="The name that the reference is nested under" 

1290 ) 

1291 

1292 

1293class BlockDocumentReference(ObjectBaseModel): 1a

1294 """An ORM representation of a block document reference.""" 

1295 

1296 parent_block_document_id: UUID = Field( 1a

1297 default=..., description="ID of block document the reference is nested within" 

1298 ) 

1299 parent_block_document: Optional[BlockDocument] = Field( 1a

1300 default=None, description="The block document the reference is nested within" 

1301 ) 

1302 reference_block_document_id: UUID = Field( 1a

1303 default=..., description="ID of the nested block document" 

1304 ) 

1305 reference_block_document: Optional[BlockDocument] = Field( 1a

1306 default=None, description="The nested block document" 

1307 ) 

1308 name: str = Field( 1a

1309 default=..., description="The name that the reference is nested under" 

1310 ) 

1311 

1312 @model_validator(mode="before") 1a

1313 @classmethod 1a

1314 def validate_parent_and_ref_are_different(cls, values: Any) -> Any: 1a

1315 if isinstance(values, dict): 

1316 return validate_parent_and_ref_diff(values) # pyright: ignore[reportUnknownVariableType, reportUnknownArgumentType] unable to narrow dict type 

1317 return values 

1318 

1319 

1320class Configuration(ObjectBaseModel): 1a

1321 """An ORM representation of account info.""" 

1322 

1323 key: str = Field(default=..., description="Account info key") 1a

1324 value: dict[str, Any] = Field(default=..., description="Account info") 1a

1325 

1326 

1327class SavedSearchFilter(PrefectBaseModel): 1a

1328 """A filter for a saved search model. Intended for use by the Prefect UI.""" 

1329 

1330 object: str = Field(default=..., description="The object over which to filter.") 1a

1331 property: str = Field( 1a

1332 default=..., description="The property of the object on which to filter." 

1333 ) 

1334 type: str = Field(default=..., description="The type of the property.") 1a

1335 operation: str = Field( 1a

1336 default=..., 

1337 description="The operator to apply to the object. For example, `equals`.", 

1338 ) 

1339 value: Any = Field( 1a

1340 default=..., description="A JSON-compatible value for the filter." 

1341 ) 

1342 

1343 

1344class SavedSearch(ObjectBaseModel): 1a

1345 """An ORM representation of saved search data. Represents a set of filter criteria.""" 

1346 

1347 name: str = Field(default=..., description="The name of the saved search.") 1a

1348 filters: list[SavedSearchFilter] = Field( 1a

1349 default_factory=lambda: [], 

1350 description="The filter set for the saved search.", 

1351 ) 

1352 

1353 

1354class Log(TimeSeriesBaseModel, ObjectBaseModel): 1a

1355 """An ORM representation of log data.""" 

1356 

1357 name: str = Field(default=..., description="The logger name.") 1a

1358 level: int = Field(default=..., description="The log level.") 1a

1359 message: str = Field(default=..., description="The log message.") 1a

1360 timestamp: DateTime = Field(default=..., description="The log timestamp.") 1a

1361 flow_run_id: Optional[UUID] = Field( 1a

1362 default=None, description="The flow run ID associated with the log." 

1363 ) 

1364 task_run_id: Optional[UUID] = Field( 1a

1365 default=None, description="The task run ID associated with the log." 

1366 ) 

1367 

1368 

1369class QueueFilter(PrefectBaseModel): 1a

1370 """Filter criteria definition for a work queue.""" 

1371 

1372 tags: Optional[list[str]] = Field( 1a

1373 default=None, 

1374 description="Only include flow runs with these tags in the work queue.", 

1375 ) 

1376 deployment_ids: Optional[list[UUID]] = Field( 1a

1377 default=None, 

1378 description="Only include flow runs from these deployments in the work queue.", 

1379 ) 

1380 

1381 

1382class WorkQueue(ObjectBaseModel): 1a

1383 """An ORM representation of a work queue""" 

1384 

1385 name: Name = Field(default=..., description="The name of the work queue.") 1a

1386 description: Optional[str] = Field( 1a

1387 default="", description="An optional description for the work queue." 

1388 ) 

1389 is_paused: bool = Field( 1a

1390 default=False, description="Whether or not the work queue is paused." 

1391 ) 

1392 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a

1393 default=None, description="An optional concurrency limit for the work queue." 

1394 ) 

1395 priority: PositiveInteger = Field( 1a

1396 default=1, 

1397 description=( 

1398 "The queue's priority. Lower values are higher priority (1 is the highest)." 

1399 ), 

1400 ) 

1401 work_pool_name: Optional[str] = Field(default=None) 1a

1402 # Will be required after a future migration 

1403 work_pool_id: Optional[UUID] = Field( 1a

1404 description="The work pool with which the queue is associated." 

1405 ) 

1406 filter: Optional[QueueFilter] = Field( 1a

1407 default=None, 

1408 description="DEPRECATED: Filter criteria for the work queue.", 

1409 deprecated=True, 

1410 ) 

1411 last_polled: Optional[DateTime] = Field( 1a

1412 default=None, description="The last time an agent polled this queue for work." 

1413 ) 

1414 status: Optional[WorkQueueStatus] = Field( 1a

1415 default=None, description="The queue status." 

1416 ) 

1417 

1418 

1419class WorkQueueHealthPolicy(PrefectBaseModel): 1a

1420 maximum_late_runs: Optional[int] = Field( 1a

1421 default=0, 

1422 description=( 

1423 "The maximum number of late runs in the work queue before it is deemed" 

1424 " unhealthy. Defaults to `0`." 

1425 ), 

1426 ) 

1427 maximum_seconds_since_last_polled: Optional[int] = Field( 1a

1428 default=60, 

1429 description=( 

1430 "The maximum number of time in seconds elapsed since work queue has been" 

1431 " polled before it is deemed unhealthy. Defaults to `60`." 

1432 ), 

1433 ) 

1434 

1435 def evaluate_health_status( 1a

1436 self, late_runs_count: int, last_polled: datetime.datetime | None = None 

1437 ) -> bool: 

1438 """ 

1439 Given empirical information about the state of the work queue, evaluate its health status. 

1440 

1441 Args: 

1442 late_runs: the count of late runs for the work queue. 

1443 last_polled: the last time the work queue was polled, if available. 

1444 

1445 Returns: 

1446 bool: whether or not the work queue is healthy. 

1447 """ 

1448 healthy = True 

1449 if ( 

1450 self.maximum_late_runs is not None 

1451 and late_runs_count > self.maximum_late_runs 

1452 ): 

1453 healthy = False 

1454 

1455 if self.maximum_seconds_since_last_polled is not None: 

1456 if ( 

1457 last_polled is None 

1458 or (now("UTC") - last_polled).total_seconds() 

1459 > self.maximum_seconds_since_last_polled 

1460 ): 

1461 healthy = False 

1462 

1463 return healthy 

1464 

1465 

1466class WorkQueueStatusDetail(PrefectBaseModel): 1a

1467 healthy: bool = Field(..., description="Whether or not the work queue is healthy.") 1a

1468 late_runs_count: int = Field( 1a

1469 default=0, description="The number of late flow runs in the work queue." 

1470 ) 

1471 last_polled: Optional[DateTime] = Field( 1a

1472 default=None, description="The last time an agent polled this queue for work." 

1473 ) 

1474 health_check_policy: WorkQueueHealthPolicy = Field( 1a

1475 ..., 

1476 description=( 

1477 "The policy used to determine whether or not the work queue is healthy." 

1478 ), 

1479 ) 

1480 

1481 

1482class Agent(ObjectBaseModel): 1a

1483 """An ORM representation of an agent""" 

1484 

1485 name: str = Field( 1a

1486 default_factory=lambda: generate_slug(2), 

1487 description=( 

1488 "The name of the agent. If a name is not provided, it will be" 

1489 " auto-generated." 

1490 ), 

1491 ) 

1492 work_queue_id: UUID = Field( 1a

1493 default=..., description="The work queue with which the agent is associated." 

1494 ) 

1495 last_activity_time: Optional[DateTime] = Field( 1a

1496 default=None, description="The last time this agent polled for work." 

1497 ) 

1498 

1499 

1500class WorkPoolStorageConfiguration(PrefectBaseModel): 1a

1501 """A work pool storage configuration""" 

1502 

1503 bundle_upload_step: Optional[dict[str, Any]] = Field( 1a

1504 default=None, description="The bundle upload step for the work pool." 

1505 ) 

1506 bundle_execution_step: Optional[dict[str, Any]] = Field( 1a

1507 default=None, description="The bundle execution step for the work pool." 

1508 ) 

1509 default_result_storage_block_id: Optional[UUID] = Field( 1a

1510 default=None, 

1511 description="The block document ID of the default result storage block.", 

1512 ) 

1513 

1514 

1515class WorkPool(ObjectBaseModel): 1a

1516 """An ORM representation of a work pool""" 

1517 

1518 name: Name = Field( 1a

1519 description="The name of the work pool.", 

1520 ) 

1521 description: Optional[str] = Field( 1a

1522 default=None, description="A description of the work pool." 

1523 ) 

1524 type: str = Field(description="The work pool type.") 1a

1525 base_job_template: dict[str, Any] = Field( 1a

1526 default_factory=dict, description="The work pool's base job template." 

1527 ) 

1528 is_paused: bool = Field( 1a

1529 default=False, 

1530 description="Pausing the work pool stops the delivery of all work.", 

1531 ) 

1532 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a

1533 default=None, description="A concurrency limit for the work pool." 

1534 ) 

1535 status: Optional[WorkPoolStatus] = Field( 1a

1536 default=None, description="The current status of the work pool." 

1537 ) 

1538 

1539 storage_configuration: WorkPoolStorageConfiguration = Field( 1a

1540 default_factory=WorkPoolStorageConfiguration, 

1541 description="The storage configuration for the work pool.", 

1542 ) 

1543 

1544 # this required field has a default of None so that the custom validator 

1545 # below will be called and produce a more helpful error message. Because 

1546 # the field metadata is attached via an annotation, the default is hidden 

1547 # from type checkers. 

1548 default_queue_id: Annotated[ 1a

1549 UUID, Field(default=None, description="The id of the pool's default queue.") 

1550 ] 

1551 

1552 @property 1a

1553 def is_push_pool(self) -> bool: 1a

1554 return self.type.endswith(":push") 

1555 

1556 @property 1a

1557 def is_managed_pool(self) -> bool: 1a

1558 return self.type.endswith(":managed") 

1559 

1560 @field_validator("default_queue_id") 1a

1561 @classmethod 1a

1562 def helpful_error_for_missing_default_queue_id(cls, v: Optional[UUID]) -> UUID: 1a

1563 return validate_default_queue_id_not_none(v) 

1564 

1565 

1566class Worker(ObjectBaseModel): 1a

1567 """An ORM representation of a worker""" 

1568 

1569 name: str = Field(description="The name of the worker.") 1a

1570 work_pool_id: UUID = Field( 1a

1571 description="The work pool with which the queue is associated." 

1572 ) 

1573 last_heartbeat_time: Optional[datetime.datetime] = Field( 1a

1574 default=None, description="The last time the worker process sent a heartbeat." 

1575 ) 

1576 heartbeat_interval_seconds: Optional[int] = Field( 1a

1577 default=None, 

1578 description=( 

1579 "The number of seconds to expect between heartbeats sent by the worker." 

1580 ), 

1581 ) 

1582 status: WorkerStatus = Field( 1a

1583 WorkerStatus.OFFLINE, 

1584 description="Current status of the worker.", 

1585 ) 

1586 

1587 

1588Flow.model_rebuild() 1a

1589# FlowRun.model_rebuild() 

1590 

1591 

1592class Artifact(ObjectBaseModel): 1a

1593 key: Optional[str] = Field( 1a

1594 default=None, description="An optional unique reference key for this artifact." 

1595 ) 

1596 type: Optional[str] = Field( 1a

1597 default=None, 

1598 description=( 

1599 "An identifier that describes the shape of the data field. e.g. 'result'," 

1600 " 'table', 'markdown'" 

1601 ), 

1602 ) 

1603 description: Optional[str] = Field( 1a

1604 default=None, description="A markdown-enabled description of the artifact." 

1605 ) 

1606 # data will eventually be typed as `Optional[Union[Result, Any]]` 

1607 data: Optional[Union[dict[str, Any], Any]] = Field( 1a

1608 default=None, 

1609 description=( 

1610 "Data associated with the artifact, e.g. a result.; structure depends on" 

1611 " the artifact type." 

1612 ), 

1613 ) 

1614 metadata_: Optional[dict[str, str]] = Field( 1a

1615 default=None, 

1616 description=( 

1617 "User-defined artifact metadata. Content must be string key and value" 

1618 " pairs." 

1619 ), 

1620 ) 

1621 flow_run_id: Optional[UUID] = Field( 1a

1622 default=None, description="The flow run associated with the artifact." 

1623 ) 

1624 task_run_id: Optional[UUID] = Field( 1a

1625 default=None, description="The task run associated with the artifact." 

1626 ) 

1627 

1628 @field_validator("metadata_") 1a

1629 @classmethod 1a

1630 def validate_metadata_length( 1a

1631 cls, v: Optional[dict[str, str]] 

1632 ) -> Optional[dict[str, str]]: 

1633 return validate_max_metadata_length(v) 

1634 

1635 

1636class ArtifactCollection(ObjectBaseModel): 1a

1637 key: str = Field(description="An optional unique reference key for this artifact.") 1a

1638 latest_id: UUID = Field( 1a

1639 description="The latest artifact ID associated with the key." 

1640 ) 

1641 type: Optional[str] = Field( 1a

1642 default=None, 

1643 description=( 

1644 "An identifier that describes the shape of the data field. e.g. 'result'," 

1645 " 'table', 'markdown'" 

1646 ), 

1647 ) 

1648 description: Optional[str] = Field( 1a

1649 default=None, description="A markdown-enabled description of the artifact." 

1650 ) 

1651 data: Optional[Union[dict[str, Any], Any]] = Field( 1a

1652 default=None, 

1653 description=( 

1654 "Data associated with the artifact, e.g. a result.; structure depends on" 

1655 " the artifact type." 

1656 ), 

1657 ) 

1658 metadata_: Optional[dict[str, str]] = Field( 1a

1659 default=None, 

1660 description=( 

1661 "User-defined artifact metadata. Content must be string key and value" 

1662 " pairs." 

1663 ), 

1664 ) 

1665 flow_run_id: Optional[UUID] = Field( 1a

1666 default=None, description="The flow run associated with the artifact." 

1667 ) 

1668 task_run_id: Optional[UUID] = Field( 1a

1669 default=None, description="The task run associated with the artifact." 

1670 ) 

1671 

1672 

1673class Variable(ObjectBaseModel): 1a

1674 name: str = Field( 1a

1675 default=..., 

1676 description="The name of the variable", 

1677 examples=["my_variable"], 

1678 max_length=MAX_VARIABLE_NAME_LENGTH, 

1679 ) 

1680 value: StrictVariableValue = Field( 1a

1681 default=..., 

1682 description="The value of the variable", 

1683 examples=["my_value"], 

1684 ) 

1685 tags: list[str] = Field( 1a

1686 default_factory=list, 

1687 description="A list of variable tags", 

1688 examples=[["tag-1", "tag-2"]], 

1689 ) 

1690 

1691 

1692class FlowRunInput(ObjectBaseModel): 1a

1693 flow_run_id: UUID = Field(description="The flow run ID associated with the input.") 1a

1694 key: Annotated[str, AfterValidator(raise_on_name_alphanumeric_dashes_only)] = Field( 1a

1695 description="The key of the input." 

1696 ) 

1697 value: str = Field(description="The value of the input.") 1a

1698 sender: Optional[str] = Field(default=None, description="The sender of the input.") 1a

1699 

1700 @property 1a

1701 def decoded_value(self) -> Any: 1a

1702 """ 

1703 Decode the value of the input. 

1704 

1705 Returns: 

1706 Any: the decoded value 

1707 """ 

1708 return orjson.loads(self.value) 

1709 

1710 

1711class GlobalConcurrencyLimit(ObjectBaseModel): 1a

1712 """An ORM representation of a global concurrency limit""" 

1713 

1714 name: str = Field(description="The name of the global concurrency limit.") 1a

1715 limit: int = Field( 1a

1716 description=( 

1717 "The maximum number of slots that can be occupied on this concurrency" 

1718 " limit." 

1719 ) 

1720 ) 

1721 active: Optional[bool] = Field( 1a

1722 default=True, 

1723 description="Whether or not the concurrency limit is in an active state.", 

1724 ) 

1725 active_slots: Optional[int] = Field( 1a

1726 default=0, 

1727 description="Number of tasks currently using a concurrency slot.", 

1728 ) 

1729 slot_decay_per_second: Optional[float] = Field( 1a

1730 default=0.0, 

1731 description=( 

1732 "Controls the rate at which slots are released when the concurrency limit" 

1733 " is used as a rate limit." 

1734 ), 

1735 ) 

1736 

1737 

1738class CsrfToken(ObjectBaseModel): 1a

1739 token: str = Field( 1a

1740 default=..., 

1741 description="The CSRF token", 

1742 ) 

1743 client: str = Field( 1a

1744 default=..., description="The client id associated with the CSRF token" 

1745 ) 

1746 expiration: datetime.datetime = Field( 1a

1747 default=..., description="The expiration time of the CSRF token" 

1748 ) 

1749 

1750 

1751__getattr__: Callable[[str], Any] = getattr_migration(__name__) 1a

1752 

1753 

1754class Integration(PrefectBaseModel): 1a

1755 """A representation of an installed Prefect integration.""" 

1756 

1757 name: str = Field(description="The name of the Prefect integration.") 1a

1758 version: str = Field(description="The version of the Prefect integration.") 1a

1759 

1760 

1761class WorkerMetadata(PrefectBaseModel): 1a

1762 """ 

1763 Worker metadata. 

1764 

1765 We depend on the structure of `integrations`, but otherwise, worker classes 

1766 should support flexible metadata. 

1767 """ 

1768 

1769 integrations: list[Integration] = Field( 1a

1770 default=..., description="Prefect integrations installed in the worker." 

1771 ) 

1772 model_config: ClassVar[ConfigDict] = ConfigDict(extra="allow") 1a