Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/states.py: 51%

188 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2State schemas. 

3""" 

4 

5import warnings 1a

6from datetime import timedelta 1a

7from typing import ( 1a

8 TYPE_CHECKING, 

9 Any, 

10 ClassVar, 

11 Literal, 

12 Optional, 

13 TypeVar, 

14 Union, 

15 overload, 

16) 

17from uuid import UUID 1a

18 

19from pydantic import ConfigDict, Field, ValidationInfo, field_validator, model_validator 1a

20from typing_extensions import Self 1a

21 

22from prefect._internal.uuid7 import uuid7 1a

23from prefect.client.schemas import objects 1a

24from prefect.server.utilities.schemas.bases import ( 1a

25 PrefectBaseModel, 

26 TimeSeriesBaseModel, 

27) 

28from prefect.types._datetime import DateTime, now 1a

29from prefect.utilities.collections import AutoEnum 1a

30 

31if TYPE_CHECKING: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true1a

32 from prefect.server.database.orm_models import ORMFlowRunState, ORMTaskRunState 

33 from prefect.server.schemas.actions import StateCreate 

34 

35 

36R = TypeVar("R") 1a

37_State = TypeVar("_State", bound="State") 1a

38 

39 

40class StateType(AutoEnum): 1a

41 """Enumeration of state types.""" 

42 

43 SCHEDULED = AutoEnum.auto() 1a

44 PENDING = AutoEnum.auto() 1a

45 RUNNING = AutoEnum.auto() 1a

46 COMPLETED = AutoEnum.auto() 1a

47 FAILED = AutoEnum.auto() 1a

48 CANCELLED = AutoEnum.auto() 1a

49 CRASHED = AutoEnum.auto() 1a

50 PAUSED = AutoEnum.auto() 1a

51 CANCELLING = AutoEnum.auto() 1a

52 

53 

54class CountByState(PrefectBaseModel): 1a

55 COMPLETED: int = Field(default=0) 1a

56 PENDING: int = Field(default=0) 1a

57 RUNNING: int = Field(default=0) 1a

58 FAILED: int = Field(default=0) 1a

59 CANCELLED: int = Field(default=0) 1a

60 CRASHED: int = Field(default=0) 1a

61 PAUSED: int = Field(default=0) 1a

62 CANCELLING: int = Field(default=0) 1a

63 SCHEDULED: int = Field(default=0) 1a

64 

65 @field_validator("*") 1a

66 @classmethod 1a

67 def check_key(cls, value: Optional[Any], info: ValidationInfo) -> Optional[Any]: 1a

68 if info.field_name not in StateType.__members__: 

69 raise ValueError(f"{info.field_name} is not a valid StateType") 

70 return value 

71 

72 

73TERMINAL_STATES = { 1a

74 StateType.COMPLETED, 

75 StateType.CANCELLED, 

76 StateType.FAILED, 

77 StateType.CRASHED, 

78} 

79 

80 

81class StateDetails(PrefectBaseModel): 1a

82 flow_run_id: Optional[UUID] = None 1a

83 task_run_id: Optional[UUID] = None 1a

84 # for task runs that represent subflows, the subflow's run ID 

85 child_flow_run_id: Optional[UUID] = None 1a

86 scheduled_time: Optional[DateTime] = None 1a

87 cache_key: Optional[str] = None 1a

88 cache_expiration: Optional[DateTime] = None 1a

89 deferred: Optional[bool] = False 1a

90 untrackable_result: bool = False 1a

91 pause_timeout: Optional[DateTime] = None 1a

92 pause_reschedule: bool = False 1a

93 pause_key: Optional[str] = None 1a

94 run_input_keyset: Optional[dict[str, str]] = None 1a

95 refresh_cache: Optional[bool] = None 1a

96 retriable: Optional[bool] = None 1a

97 transition_id: Optional[UUID] = None 1a

98 task_parameters_id: Optional[UUID] = None 1a

99 # Captures the trace_id and span_id of the span where this state was created 

100 traceparent: Optional[str] = None 1a

101 # The ID of the lease that is currently holding the deployment concurrency slot 

102 # for this run. 

103 deployment_concurrency_lease_id: Optional[UUID] = None 1a

104 

105 

106class StateBaseModel(TimeSeriesBaseModel): 1a

107 def orm_dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: 1a

108 """ 

109 This method is used as a convenience method for constructing fixtues by first 

110 building a `State` schema object and converting it into an ORM-compatible 

111 format. Because the `data` field is not writable on ORM states, this method 

112 omits the `data` field entirely for the purposes of constructing an ORM model. 

113 If state data is required, an artifact must be created separately. 

114 """ 

115 

116 schema_dict = self.model_dump(*args, **kwargs) 

117 # remove the data field in order to construct a state ORM model 

118 schema_dict.pop("data", None) 

119 return schema_dict 

120 

121 

122class State(StateBaseModel): 1a

123 """Represents the state of a run.""" 

124 

125 model_config: ClassVar[ConfigDict] = ConfigDict(from_attributes=True) 1a

126 

127 type: StateType 1a

128 name: Optional[str] = Field(default=None) 1a

129 timestamp: DateTime = Field(default_factory=lambda: now("UTC")) 1a

130 message: Optional[str] = Field(default=None, examples=["Run started"]) 1a

131 data: Optional[Any] = Field( 1a

132 default=None, 

133 description=( 

134 "Data associated with the state, e.g. a result. " 

135 "Content must be storable as JSON." 

136 ), 

137 ) 

138 state_details: StateDetails = Field(default_factory=StateDetails) 1a

139 

140 @classmethod 1a

141 def from_orm_without_result( 1a

142 cls, 

143 orm_state: Union["ORMFlowRunState", "ORMTaskRunState"], 

144 with_data: Optional[Any] = None, 

145 ) -> Self: 

146 """ 

147 During orchestration, ORM states can be instantiated prior to inserting results 

148 into the artifact table and the `data` field will not be eagerly loaded. In 

149 these cases, sqlalchemy will attempt to lazily load the the relationship, which 

150 will fail when called within a synchronous pydantic method. 

151 

152 This method will construct a `State` object from an ORM model without a loaded 

153 artifact and attach data passed using the `with_data` argument to the `data` 

154 field. 

155 """ 

156 

157 field_keys = cls.model_fields.keys() 

158 state_data: dict[str, Any] = { 

159 field: getattr(orm_state, field, None) 

160 for field in field_keys 

161 if field != "data" 

162 } 

163 state_data["data"] = with_data 

164 return cls(**state_data) 

165 

166 @model_validator(mode="after") 1a

167 def default_name_from_type(self) -> Self: 1a

168 """If a name is not provided, use the type""" 

169 # if `type` is not in `values` it means the `type` didn't pass its own 

170 # validation check and an error will be raised after this function is called 

171 name = self.name 

172 if name is None and self.type: 

173 self.name = " ".join([v.capitalize() for v in self.type.value.split("_")]) 

174 return self 

175 

176 @model_validator(mode="after") 1a

177 def default_scheduled_start_time(self) -> Self: 1a

178 from prefect.server.schemas.states import StateType 

179 

180 if self.type == StateType.SCHEDULED: 

181 if not self.state_details.scheduled_time: 

182 self.state_details.scheduled_time = now("UTC") 

183 

184 return self 

185 

186 def is_scheduled(self) -> bool: 1a

187 return self.type == StateType.SCHEDULED 

188 

189 def is_pending(self) -> bool: 1a

190 return self.type == StateType.PENDING 

191 

192 def is_running(self) -> bool: 1a

193 return self.type == StateType.RUNNING 

194 

195 def is_completed(self) -> bool: 1a

196 return self.type == StateType.COMPLETED 

197 

198 def is_failed(self) -> bool: 1a

199 return self.type == StateType.FAILED 

200 

201 def is_crashed(self) -> bool: 1a

202 return self.type == StateType.CRASHED 

203 

204 def is_cancelled(self) -> bool: 1a

205 return self.type == StateType.CANCELLED 

206 

207 def is_cancelling(self) -> bool: 1a

208 return self.type == StateType.CANCELLING 

209 

210 def is_final(self) -> bool: 1a

211 return self.type in TERMINAL_STATES 

212 

213 def is_paused(self) -> bool: 1a

214 return self.type == StateType.PAUSED 

215 

216 def fresh_copy(self, **kwargs: Any) -> Self: 1a

217 """ 

218 Return a fresh copy of the state with a new ID. 

219 """ 

220 return self.model_copy( 

221 update={ 

222 "id": uuid7(), 

223 "created": now("UTC"), 

224 "updated": now("UTC"), 

225 "timestamp": now("UTC"), 

226 }, 

227 deep=True, 

228 **kwargs, 

229 ) 

230 

231 @overload 1a

232 def result(self, raise_on_failure: Literal[True] = ...) -> Any: ... 232 ↛ exitline 232 didn't return from function 'result' because 1a

233 

234 @overload 1a

235 def result( 235 ↛ exitline 235 didn't return from function 'result' because 1a

236 self, raise_on_failure: Literal[False] = False 

237 ) -> Union[Any, Exception]: ... 

238 

239 @overload 1a

240 def result(self, raise_on_failure: bool = ...) -> Union[Any, Exception]: ... 240 ↛ exitline 240 didn't return from function 'result' because 1a

241 

242 def result(self, raise_on_failure: bool = True) -> Union[Any, Exception]: 1a

243 # Backwards compatible `result` handling on the server-side schema 

244 from prefect.states import State 

245 

246 warnings.warn( 

247 ( 

248 "`result` is no longer supported by" 

249 " `prefect.server.schemas.states.State` and will be removed in a future" 

250 " release. When result retrieval is needed, use `prefect.states.State`." 

251 ), 

252 DeprecationWarning, 

253 stacklevel=2, 

254 ) 

255 

256 state: State[Any] = objects.State.model_validate(self) 

257 return state.result(raise_on_failure=raise_on_failure) 

258 

259 def to_state_create(self) -> "StateCreate": 1a

260 from prefect.server.schemas.actions import StateCreate 

261 

262 return StateCreate( 

263 type=self.type, 

264 name=self.name, 

265 message=self.message, 

266 data=self.data, 

267 state_details=self.state_details, 

268 ) 

269 

270 def __repr__(self) -> str: 1a

271 """ 

272 Generates a complete state representation appropriate for introspection 

273 and debugging, including the result: 

274 

275 `MyCompletedState(message="my message", type=COMPLETED, result=...)` 

276 """ 

277 result = self.data 

278 

279 display = dict( 

280 message=repr(self.message), 

281 type=str(self.type.value), 

282 result=repr(result), 

283 ) 

284 

285 return f"{self.name}({', '.join(f'{k}={v}' for k, v in display.items())})" 

286 

287 def __str__(self) -> str: 1a

288 """ 

289 Generates a simple state representation appropriate for logging: 

290 

291 `MyCompletedState("my message", type=COMPLETED)` 

292 """ 

293 

294 display: list[str] = [] 

295 

296 if self.message: 

297 display.append(repr(self.message)) 

298 

299 if self.type.value.lower() != (self.name or "").lower(): 

300 display.append(f"type={self.type.value}") 

301 

302 return f"{self.name}({', '.join(display)})" 

303 

304 def __hash__(self) -> int: 1a

305 return hash( 

306 ( 

307 getattr(self.state_details, "flow_run_id", None), 

308 getattr(self.state_details, "task_run_id", None), 

309 self.timestamp, 

310 self.type, 

311 ) 

312 ) 

313 

314 

315def Scheduled( 1a

316 scheduled_time: Optional[DateTime] = None, 

317 cls: type[_State] = State, 

318 **kwargs: Any, 

319) -> _State: 

320 """Convenience function for creating `Scheduled` states. 

321 

322 Returns: 

323 State: a Scheduled state 

324 """ 

325 # NOTE: `scheduled_time` must come first for backwards compatibility 

326 

327 state_details = StateDetails.model_validate(kwargs.pop("state_details", {})) 

328 if scheduled_time is None: 

329 scheduled_time = now("UTC") 

330 elif state_details.scheduled_time: 

331 raise ValueError("An extra scheduled_time was provided in state_details") 

332 state_details.scheduled_time = scheduled_time 

333 

334 return cls(type=StateType.SCHEDULED, state_details=state_details, **kwargs) 

335 

336 

337def Completed(cls: type[_State] = State, **kwargs: Any) -> _State: 1a

338 """Convenience function for creating `Completed` states. 

339 

340 Returns: 

341 State: a Completed state 

342 """ 

343 return cls(type=StateType.COMPLETED, **kwargs) 

344 

345 

346def Running(cls: type[_State] = State, **kwargs: Any) -> _State: 1a

347 """Convenience function for creating `Running` states. 

348 

349 Returns: 

350 State: a Running state 

351 """ 

352 return cls(type=StateType.RUNNING, **kwargs) 

353 

354 

355def Failed(cls: type[_State] = State, **kwargs: Any) -> _State: 1a

356 """Convenience function for creating `Failed` states. 

357 

358 Returns: 

359 State: a Failed state 

360 """ 

361 return cls(type=StateType.FAILED, **kwargs) 

362 

363 

364def Crashed(cls: type[_State] = State, **kwargs: Any) -> _State: 1a

365 """Convenience function for creating `Crashed` states. 

366 

367 Returns: 

368 State: a Crashed state 

369 """ 

370 return cls(type=StateType.CRASHED, **kwargs) 

371 

372 

373def Cancelling(cls: type[_State] = State, **kwargs: Any) -> _State: 1a

374 """Convenience function for creating `Cancelling` states. 

375 

376 Returns: 

377 State: a Cancelling state 

378 """ 

379 return cls(type=StateType.CANCELLING, **kwargs) 

380 

381 

382def Cancelled(cls: type[_State] = State, **kwargs: Any) -> _State: 1a

383 """Convenience function for creating `Cancelled` states. 

384 

385 Returns: 

386 State: a Cancelled state 

387 """ 

388 return cls(type=StateType.CANCELLED, **kwargs) 

389 

390 

391def Pending(cls: type[_State] = State, **kwargs: Any) -> _State: 1a

392 """Convenience function for creating `Pending` states. 

393 

394 Returns: 

395 State: a Pending state 

396 """ 

397 return cls(type=StateType.PENDING, **kwargs) 

398 

399 

400def Paused( 1a

401 cls: type[_State] = State, 

402 timeout_seconds: Optional[int] = None, 

403 pause_expiration_time: Optional[DateTime] = None, 

404 reschedule: bool = False, 

405 pause_key: Optional[str] = None, 

406 **kwargs: Any, 

407) -> _State: 

408 """Convenience function for creating `Paused` states. 

409 

410 Returns: 

411 State: a Paused state 

412 """ 

413 state_details = StateDetails.model_validate(kwargs.pop("state_details", {})) 

414 

415 if state_details.pause_timeout: 

416 raise ValueError("An extra pause timeout was provided in state_details") 

417 

418 if pause_expiration_time is not None and timeout_seconds is not None: 

419 raise ValueError( 

420 "Cannot supply both a pause_expiration_time and timeout_seconds" 

421 ) 

422 

423 if pause_expiration_time: 

424 state_details.pause_timeout = pause_expiration_time 

425 elif timeout_seconds is not None: 

426 state_details.pause_timeout = now("UTC") + timedelta(seconds=timeout_seconds) 

427 

428 state_details.pause_reschedule = reschedule 

429 state_details.pause_key = pause_key 

430 

431 return cls(type=StateType.PAUSED, state_details=state_details, **kwargs) 

432 

433 

434def Suspended( 1a

435 cls: type[_State] = State, 

436 timeout_seconds: Optional[int] = None, 

437 pause_expiration_time: Optional[DateTime] = None, 

438 pause_key: Optional[str] = None, 

439 **kwargs: Any, 

440) -> _State: 

441 """Convenience function for creating `Suspended` states. 

442 

443 Returns: 

444 State: a Suspended state 

445 """ 

446 return Paused( 

447 cls=cls, 

448 name="Suspended", 

449 reschedule=True, 

450 timeout_seconds=timeout_seconds, 

451 pause_expiration_time=pause_expiration_time, 

452 pause_key=pause_key, 

453 **kwargs, 

454 ) 

455 

456 

457def AwaitingRetry( 1a

458 cls: type[_State] = State, 

459 scheduled_time: Optional[DateTime] = None, 

460 **kwargs: Any, 

461) -> _State: 

462 """Convenience function for creating `AwaitingRetry` states. 

463 

464 Returns: 

465 State: an AwaitingRetry state 

466 """ 

467 return Scheduled( 

468 cls=cls, scheduled_time=scheduled_time, name="AwaitingRetry", **kwargs 

469 ) 

470 

471 

472def AwaitingConcurrencySlot( 1a

473 cls: type[_State] = State, 

474 scheduled_time: Optional[DateTime] = None, 

475 **kwargs: Any, 

476) -> _State: 

477 """Convenience function for creating `AwaitingConcurrencySlot` states. 

478 

479 Returns: 

480 State: an AwaitingConcurrencySlot state 

481 """ 

482 return Scheduled( 

483 cls=cls, 

484 scheduled_time=scheduled_time, 

485 name="AwaitingConcurrencySlot", 

486 **kwargs, 

487 ) 

488 

489 

490def Retrying(cls: type[_State] = State, **kwargs: Any) -> _State: 1a

491 """Convenience function for creating `Retrying` states. 

492 

493 Returns: 

494 State: a Retrying state 

495 """ 

496 return cls(type=StateType.RUNNING, name="Retrying", **kwargs) 

497 

498 

499def Late( 1a

500 cls: type[_State] = State, 

501 scheduled_time: Optional[DateTime] = None, 

502 **kwargs: Any, 

503) -> _State: 

504 """Convenience function for creating `Late` states. 

505 

506 Returns: 

507 State: a Late state 

508 """ 

509 return Scheduled(cls=cls, scheduled_time=scheduled_time, name="Late", **kwargs)