Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/schemas/responses.py: 82%

153 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import datetime 1a

2from typing import TYPE_CHECKING, Any, ClassVar, Generic, Optional, TypeVar, Union 1a

3from uuid import UUID 1a

4 

5from pydantic import ConfigDict, Field 1a

6from typing_extensions import Literal 1a

7 

8import prefect.client.schemas.objects as objects 1a

9from prefect._internal.schemas.bases import ObjectBaseModel, PrefectBaseModel 1a

10from prefect._internal.schemas.fields import CreatedBy, UpdatedBy 1a

11from prefect.types import DateTime, KeyValueLabelsField 1a

12from prefect.utilities.collections import AutoEnum 1a

13from prefect.utilities.names import generate_slug 1a

14 

15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a

16 from prefect.events.schemas.events import RelatedResource 

17 

18T = TypeVar("T") 1a

19 

20 

21class SetStateStatus(AutoEnum): 1a

22 """Enumerates return statuses for setting run states.""" 

23 

24 ACCEPT = AutoEnum.auto() 1a

25 REJECT = AutoEnum.auto() 1a

26 ABORT = AutoEnum.auto() 1a

27 WAIT = AutoEnum.auto() 1a

28 

29 

30class StateAcceptDetails(PrefectBaseModel): 1a

31 """Details associated with an ACCEPT state transition.""" 

32 

33 type: Literal["accept_details"] = Field( 1a

34 default="accept_details", 

35 description=( 

36 "The type of state transition detail. Used to ensure pydantic does not" 

37 " coerce into a different type." 

38 ), 

39 ) 

40 

41 

42class StateRejectDetails(PrefectBaseModel): 1a

43 """Details associated with a REJECT state transition.""" 

44 

45 type: Literal["reject_details"] = Field( 1a

46 default="reject_details", 

47 description=( 

48 "The type of state transition detail. Used to ensure pydantic does not" 

49 " coerce into a different type." 

50 ), 

51 ) 

52 reason: Optional[str] = Field( 1a

53 default=None, description="The reason why the state transition was rejected." 

54 ) 

55 

56 

57class StateAbortDetails(PrefectBaseModel): 1a

58 """Details associated with an ABORT state transition.""" 

59 

60 type: Literal["abort_details"] = Field( 1a

61 default="abort_details", 

62 description=( 

63 "The type of state transition detail. Used to ensure pydantic does not" 

64 " coerce into a different type." 

65 ), 

66 ) 

67 reason: Optional[str] = Field( 1a

68 default=None, description="The reason why the state transition was aborted." 

69 ) 

70 

71 

72class StateWaitDetails(PrefectBaseModel): 1a

73 """Details associated with a WAIT state transition.""" 

74 

75 type: Literal["wait_details"] = Field( 1a

76 default="wait_details", 

77 description=( 

78 "The type of state transition detail. Used to ensure pydantic does not" 

79 " coerce into a different type." 

80 ), 

81 ) 

82 delay_seconds: int = Field( 1a

83 default=..., 

84 description=( 

85 "The length of time in seconds the client should wait before transitioning" 

86 " states." 

87 ), 

88 ) 

89 reason: Optional[str] = Field( 1a

90 default=None, description="The reason why the state transition should wait." 

91 ) 

92 

93 

94class HistoryResponseState(PrefectBaseModel): 1a

95 """Represents a single state's history over an interval.""" 

96 

97 state_type: objects.StateType = Field(default=..., description="The state type.") 1a

98 state_name: str = Field(default=..., description="The state name.") 1a

99 count_runs: int = Field( 1a

100 default=..., 

101 description="The number of runs in the specified state during the interval.", 

102 ) 

103 sum_estimated_run_time: datetime.timedelta = Field( 1a

104 default=..., 

105 description="The total estimated run time of all runs during the interval.", 

106 ) 

107 sum_estimated_lateness: datetime.timedelta = Field( 1a

108 default=..., 

109 description=( 

110 "The sum of differences between actual and expected start time during the" 

111 " interval." 

112 ), 

113 ) 

114 

115 

116class HistoryResponse(PrefectBaseModel): 1a

117 """Represents a history of aggregation states over an interval""" 

118 

119 interval_start: DateTime = Field( 1a

120 default=..., description="The start date of the interval." 

121 ) 

122 interval_end: DateTime = Field( 1a

123 default=..., description="The end date of the interval." 

124 ) 

125 states: list[HistoryResponseState] = Field( 1a

126 default=..., description="A list of state histories during the interval." 

127 ) 

128 

129 

130StateResponseDetails = Union[ 1a

131 StateAcceptDetails, StateWaitDetails, StateRejectDetails, StateAbortDetails 

132] 

133 

134 

135class OrchestrationResult(PrefectBaseModel, Generic[T]): 1a

136 """ 

137 A container for the output of state orchestration. 

138 """ 

139 

140 state: Optional[objects.State[T]] 1a

141 status: SetStateStatus 1a

142 details: StateResponseDetails 1a

143 

144 

145class WorkerFlowRunResponse(PrefectBaseModel): 1a

146 model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True) 1a

147 

148 work_pool_id: UUID 1a

149 work_queue_id: UUID 1a

150 flow_run: objects.FlowRun 1a

151 

152 

153class FlowRunResponse(ObjectBaseModel): 1a

154 name: str = Field( 1a

155 default_factory=lambda: generate_slug(2), 

156 description=( 

157 "The name of the flow run. Defaults to a random slug if not specified." 

158 ), 

159 examples=["my-flow-run"], 

160 ) 

161 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a

162 state_id: Optional[UUID] = Field( 1a

163 default=None, description="The id of the flow run's current state." 

164 ) 

165 deployment_id: Optional[UUID] = Field( 1a

166 default=None, 

167 description=( 

168 "The id of the deployment associated with this flow run, if available." 

169 ), 

170 ) 

171 deployment_version: Optional[str] = Field( 1a

172 default=None, 

173 description="The version of the deployment associated with this flow run.", 

174 examples=["1.0"], 

175 ) 

176 work_queue_name: Optional[str] = Field( 1a

177 default=None, description="The work queue that handled this flow run." 

178 ) 

179 flow_version: Optional[str] = Field( 1a

180 default=None, 

181 description="The version of the flow executed in this flow run.", 

182 examples=["1.0"], 

183 ) 

184 parameters: dict[str, Any] = Field( 1a

185 default_factory=dict, description="Parameters for the flow run." 

186 ) 

187 idempotency_key: Optional[str] = Field( 1a

188 default=None, 

189 description=( 

190 "An optional idempotency key for the flow run. Used to ensure the same flow" 

191 " run is not created multiple times." 

192 ), 

193 ) 

194 context: dict[str, Any] = Field( 1a

195 default_factory=dict, 

196 description="Additional context for the flow run.", 

197 examples=[{"my_var": "my_val"}], 

198 ) 

199 empirical_policy: objects.FlowRunPolicy = Field( 1a

200 default_factory=objects.FlowRunPolicy, 

201 ) 

202 tags: list[str] = Field( 1a

203 default_factory=list, 

204 description="A list of tags on the flow run", 

205 examples=[["tag-1", "tag-2"]], 

206 ) 

207 labels: KeyValueLabelsField 1a

208 parent_task_run_id: Optional[UUID] = Field( 1a

209 default=None, 

210 description=( 

211 "If the flow run is a subflow, the id of the 'dummy' task in the parent" 

212 " flow used to track subflow state." 

213 ), 

214 ) 

215 run_count: int = Field( 1a

216 default=0, description="The number of times the flow run was executed." 

217 ) 

218 expected_start_time: Optional[DateTime] = Field( 1a

219 default=None, 

220 description="The flow run's expected start time.", 

221 ) 

222 next_scheduled_start_time: Optional[DateTime] = Field( 1a

223 default=None, 

224 description="The next time the flow run is scheduled to start.", 

225 ) 

226 start_time: Optional[DateTime] = Field( 1a

227 default=None, description="The actual start time." 

228 ) 

229 end_time: Optional[DateTime] = Field( 1a

230 default=None, description="The actual end time." 

231 ) 

232 total_run_time: datetime.timedelta = Field( 1a

233 default=datetime.timedelta(0), 

234 description=( 

235 "Total run time. If the flow run was executed multiple times, the time of" 

236 " each run will be summed." 

237 ), 

238 ) 

239 estimated_run_time: datetime.timedelta = Field( 1a

240 default=datetime.timedelta(0), 

241 description="A real-time estimate of the total run time.", 

242 ) 

243 estimated_start_time_delta: datetime.timedelta = Field( 1a

244 default=datetime.timedelta(0), 

245 description="The difference between actual and expected start time.", 

246 ) 

247 auto_scheduled: bool = Field( 1a

248 default=False, 

249 description="Whether or not the flow run was automatically scheduled.", 

250 ) 

251 infrastructure_document_id: Optional[UUID] = Field( 1a

252 default=None, 

253 description="The block document defining infrastructure to use this flow run.", 

254 ) 

255 infrastructure_pid: Optional[str] = Field( 1a

256 default=None, 

257 description="The id of the flow run as returned by an infrastructure block.", 

258 ) 

259 created_by: Optional[CreatedBy] = Field( 1a

260 default=None, 

261 description="Optional information about the creator of this flow run.", 

262 ) 

263 work_queue_id: Optional[UUID] = Field( 1a

264 default=None, description="The id of the run's work pool queue." 

265 ) 

266 

267 work_pool_id: Optional[UUID] = Field( 1a

268 description="The work pool with which the queue is associated." 

269 ) 

270 work_pool_name: Optional[str] = Field( 1a

271 default=None, 

272 description="The name of the flow run's work pool.", 

273 examples=["my-work-pool"], 

274 ) 

275 state: Optional[objects.State] = Field( 1a

276 default=None, 

277 description="The state of the flow run.", 

278 examples=["objects.State(type=objects.StateType.COMPLETED)"], 

279 ) 

280 job_variables: Optional[dict[str, Any]] = Field( 1a

281 default=None, description="Job variables for the flow run." 

282 ) 

283 

284 # These are server-side optimizations and should not be present on client models 

285 # TODO: Deprecate these fields 

286 

287 state_type: Optional[objects.StateType] = Field( 1a

288 default=None, description="The type of the current flow run state." 

289 ) 

290 state_name: Optional[str] = Field( 1a

291 default=None, description="The name of the current flow run state." 

292 ) 

293 

294 def __eq__(self, other: Any) -> bool: 1a

295 """ 

296 Check for "equality" to another flow run schema 

297 

298 Estimates times are rolling and will always change with repeated queries for 

299 a flow run so we ignore them during equality checks. 

300 """ 

301 if isinstance(other, objects.FlowRun): 

302 exclude_fields = {"estimated_run_time", "estimated_start_time_delta"} 

303 return self.model_dump(exclude=exclude_fields) == other.model_dump( 

304 exclude=exclude_fields 

305 ) 

306 return super().__eq__(other) 

307 

308 

309class DeploymentResponse(ObjectBaseModel): 1a

310 name: str = Field(default=..., description="The name of the deployment.") 1a

311 

312 # Versionining 

313 version: Optional[str] = Field( 1a

314 default=None, description="An optional version for the deployment." 

315 ) 

316 version_id: Optional[UUID] = Field( 1a

317 default=None, description="The ID of the current version of the deployment." 

318 ) 

319 version_info: Optional[objects.VersionInfo] = Field( 1a

320 default=None, description="A description of this version of the deployment." 

321 ) 

322 

323 # Branching 

324 branch: Optional[str] = Field( 1a

325 default=None, description="The branch of the deployment." 

326 ) 

327 base: Optional[UUID] = Field( 1a

328 default=None, description="The base deployment of the deployment." 

329 ) 

330 root: Optional[UUID] = Field( 1a

331 default=None, description="The root deployment of the deployment." 

332 ) 

333 

334 description: Optional[str] = Field( 1a

335 default=None, description="A description for the deployment." 

336 ) 

337 flow_id: UUID = Field( 1a

338 default=..., description="The flow id associated with the deployment." 

339 ) 

340 concurrency_limit: Optional[int] = Field( 1a

341 default=None, 

342 description="DEPRECATED: Prefer `global_concurrency_limit`. Will always be None for backwards compatibility. Will be removed after December 2024.", 

343 deprecated=True, 

344 ) 

345 global_concurrency_limit: Optional["GlobalConcurrencyLimitResponse"] = Field( 1a

346 default=None, 

347 description="The global concurrency limit object for enforcing the maximum number of flow runs that can be active at once.", 

348 ) 

349 concurrency_options: Optional[objects.ConcurrencyOptions] = Field( 1a

350 default=None, 

351 description="The concurrency options for the deployment.", 

352 ) 

353 paused: bool = Field( 1a

354 default=False, description="Whether or not the deployment is paused." 

355 ) 

356 concurrency_options: Optional[objects.ConcurrencyOptions] = Field( 1a

357 default=None, 

358 description="The concurrency options for the deployment.", 

359 ) 

360 schedules: list[objects.DeploymentSchedule] = Field( 1a

361 default_factory=list, description="A list of schedules for the deployment." 

362 ) 

363 job_variables: dict[str, Any] = Field( 1a

364 default_factory=dict, 

365 description="Overrides to apply to flow run infrastructure at runtime.", 

366 ) 

367 parameters: dict[str, Any] = Field( 1a

368 default_factory=dict, 

369 description="Parameters for flow runs scheduled by the deployment.", 

370 ) 

371 pull_steps: Optional[list[dict[str, Any]]] = Field( 1a

372 default=None, 

373 description="Pull steps for cloning and running this deployment.", 

374 ) 

375 tags: list[str] = Field( 1a

376 default_factory=list, 

377 description="A list of tags for the deployment", 

378 examples=[["tag-1", "tag-2"]], 

379 ) 

380 labels: KeyValueLabelsField 1a

381 work_queue_name: Optional[str] = Field( 1a

382 default=None, 

383 description=( 

384 "The work queue for the deployment. If no work queue is set, work will not" 

385 " be scheduled." 

386 ), 

387 ) 

388 last_polled: Optional[DateTime] = Field( 1a

389 default=None, 

390 description="The last time the deployment was polled for status updates.", 

391 ) 

392 parameter_openapi_schema: Optional[dict[str, Any]] = Field( 1a

393 default=None, 

394 description="The parameter schema of the flow, including defaults.", 

395 ) 

396 path: Optional[str] = Field( 1a

397 default=None, 

398 description=( 

399 "The path to the working directory for the workflow, relative to remote" 

400 " storage or an absolute path." 

401 ), 

402 ) 

403 entrypoint: Optional[str] = Field( 1a

404 default=None, 

405 description=( 

406 "The path to the entrypoint for the workflow, relative to the `path`." 

407 ), 

408 ) 

409 storage_document_id: Optional[UUID] = Field( 1a

410 default=None, 

411 description="The block document defining storage used for this flow.", 

412 ) 

413 infrastructure_document_id: Optional[UUID] = Field( 1a

414 default=None, 

415 description="The block document defining infrastructure to use for flow runs.", 

416 ) 

417 created_by: Optional[CreatedBy] = Field( 1a

418 default=None, 

419 description="Optional information about the creator of this deployment.", 

420 ) 

421 updated_by: Optional[UpdatedBy] = Field( 1a

422 default=None, 

423 description="Optional information about the updater of this deployment.", 

424 ) 

425 work_queue_id: Optional[UUID] = Field( 1a

426 default=None, 

427 description=( 

428 "The id of the work pool queue to which this deployment is assigned." 

429 ), 

430 ) 

431 enforce_parameter_schema: bool = Field( 1a

432 default=True, 

433 description=( 

434 "Whether or not the deployment should enforce the parameter schema." 

435 ), 

436 ) 

437 work_pool_name: Optional[str] = Field( 1a

438 default=None, 

439 description="The name of the deployment's work pool.", 

440 ) 

441 status: Optional[objects.DeploymentStatus] = Field( 1a

442 default=None, 

443 description="Current status of the deployment.", 

444 ) 

445 

446 def as_related_resource(self, role: str = "deployment") -> "RelatedResource": 1a

447 from prefect.events.schemas.events import RelatedResource 

448 

449 labels = { 

450 "prefect.resource.id": f"prefect.deployment.{self.id}", 

451 "prefect.resource.role": role, 

452 "prefect.resource.name": self.name, 

453 } 

454 

455 if self.branch: 

456 labels["prefect.deployment.branch"] = self.branch 

457 

458 if self.base: 

459 labels["prefect.deployment.base"] = f"prefect.deployment.{self.base}" 

460 

461 if self.root: 

462 labels["prefect.deployment.root"] = f"prefect.deployment.{self.root}" 

463 

464 if self.version_id and self.version_info: 

465 labels["prefect.deployment.version-id"] = str(self.version_id) 

466 labels["prefect.deployment.version-type"] = self.version_info.type 

467 labels["prefect.deployment.version"] = self.version_info.version 

468 

469 return RelatedResource(labels) 

470 

471 

472class MinimalConcurrencyLimitResponse(PrefectBaseModel): 1a

473 model_config: ClassVar[ConfigDict] = ConfigDict(extra="ignore") 1a

474 

475 id: UUID 1a

476 name: str 1a

477 limit: int 1a

478 

479 

480class ConcurrencyLimitWithLeaseResponse(PrefectBaseModel): 1a

481 model_config: ClassVar[ConfigDict] = ConfigDict(extra="ignore") 1a

482 

483 lease_id: UUID 1a

484 limits: list[MinimalConcurrencyLimitResponse] 1a

485 

486 

487class GlobalConcurrencyLimitResponse(ObjectBaseModel): 1a

488 """ 

489 A response object for global concurrency limits. 

490 """ 

491 

492 active: bool = Field( 1a

493 default=True, description="Whether the global concurrency limit is active." 

494 ) 

495 name: str = Field( 1a

496 default=..., description="The name of the global concurrency limit." 

497 ) 

498 limit: int = Field(default=..., description="The concurrency limit.") 1a

499 active_slots: int = Field(default=..., description="The number of active slots.") 1a

500 slot_decay_per_second: float = Field( 1a

501 default=2.0, 

502 description="The decay rate for active slots when used as a rate limit.", 

503 )