Coverage for /usr/local/lib/python3.12/site-packages/prefect/states.py: 13%

361 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import asyncio 1a

4import datetime 1a

5import sys 1a

6import traceback 1a

7import uuid 1a

8from collections import Counter 1a

9from types import GeneratorType, TracebackType 1a

10from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Type 1a

11 

12import anyio 1a

13import httpx 1a

14from opentelemetry import propagate 1a

15from typing_extensions import TypeGuard 1a

16 

17from prefect._internal.compatibility.async_dispatch import async_dispatch 1a

18from prefect.client.schemas.objects import State, StateDetails, StateType 1a

19from prefect.exceptions import ( 1a

20 CancelledRun, 

21 CrashedRun, 

22 FailedRun, 

23 MissingContextError, 

24 MissingResult, 

25 PausedRun, 

26 TerminationSignal, 

27 UnfinishedRun, 

28) 

29from prefect.logging.loggers import get_logger, get_run_logger 1a

30from prefect.types._datetime import now 1a

31from prefect.utilities.annotations import BaseAnnotation 1a

32from prefect.utilities.collections import ensure_iterable 1a

33 

34if TYPE_CHECKING: 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true1a

35 import logging 

36 

37 from prefect.client.schemas.actions import StateCreate 

38 from prefect.results import ( 

39 R, 

40 ResultStore, 

41 ) 

42 

43logger: "logging.Logger" = get_logger("states") 1a

44 

45 

46def to_state_create(state: State) -> "StateCreate": 1a

47 """ 

48 Convert the state to a `StateCreate` type which can be used to set the state of 

49 a run in the API. 

50 

51 This method will drop this state's `data` if it is not a result type. Only 

52 results should be sent to the API. Other data is only available locally. 

53 """ 

54 from prefect.client.schemas.actions import StateCreate 

55 from prefect.results import ( 

56 ResultRecord, 

57 should_persist_result, 

58 ) 

59 

60 if isinstance(state.data, ResultRecord) and should_persist_result(): 

61 data = state.data.metadata # pyright: ignore[reportUnknownMemberType] unable to narrow ResultRecord type 

62 else: 

63 data = None 

64 

65 return StateCreate( 

66 type=state.type, 

67 name=state.name, 

68 message=state.message, 

69 data=data, 

70 state_details=state.state_details, 

71 ) 

72 

73 

74async def get_state_result( 1a

75 state: "State[R]", 

76 raise_on_failure: bool = True, 

77 retry_result_failure: bool = True, 

78) -> "R": 

79 """ 

80 Get the result from a state. 

81 

82 See `State.result()` 

83 """ 

84 

85 return await _get_state_result( 

86 state, 

87 raise_on_failure=raise_on_failure, 

88 retry_result_failure=retry_result_failure, 

89 ) 

90 

91 

92RESULT_READ_MAXIMUM_ATTEMPTS = 10 1a

93RESULT_READ_RETRY_DELAY = 0.25 1a

94 

95 

96async def _get_state_result_data_with_retries( 1a

97 state: "State[R]", retry_result_failure: bool = True 

98) -> "R": 

99 # Results may be written asynchronously, possibly after their corresponding 

100 # state has been written and events have been emitted, so we should give some 

101 # grace here about missing results. The exception below could come in the form 

102 # of a missing file, a short read, or other types of errors depending on the 

103 # result storage backend. 

104 from prefect._result_records import ( 

105 ResultRecordMetadata, 

106 ) 

107 from prefect.results import ResultStore 

108 

109 if retry_result_failure is False: 

110 max_attempts = 1 

111 else: 

112 max_attempts = RESULT_READ_MAXIMUM_ATTEMPTS 

113 

114 for i in range(1, max_attempts + 1): 

115 try: 

116 if isinstance(state.data, ResultRecordMetadata): 

117 record = await ResultStore._from_metadata(state.data) 

118 return record.result 

119 else: 

120 return await state.data.get() 

121 except Exception as e: 

122 if i == max_attempts: 

123 raise 

124 logger.debug( 

125 "Exception %r while reading result, retry %s/%s in %ss...", 

126 e, 

127 i, 

128 max_attempts, 

129 RESULT_READ_RETRY_DELAY, 

130 ) 

131 await asyncio.sleep(RESULT_READ_RETRY_DELAY) 

132 

133 

134async def _get_state_result( 1a

135 state: "State[R]", raise_on_failure: bool, retry_result_failure: bool = True 

136) -> "R": 

137 """ 

138 Internal implementation for `get_state_result` without async backwards compatibility 

139 """ 

140 from prefect.results import ( 

141 ResultRecord, 

142 ResultRecordMetadata, 

143 ) 

144 

145 if state.is_paused(): 

146 # Paused states are not truly terminal and do not have results associated with them 

147 raise PausedRun("Run is paused, its result is not available.", state=state) 

148 

149 if not state.is_final(): 

150 raise UnfinishedRun( 

151 f"Run is in {state.type.name} state, its result is not available." 

152 ) 

153 

154 if raise_on_failure and ( 

155 state.is_crashed() or state.is_failed() or state.is_cancelled() 

156 ): 

157 raise await aget_state_exception(state) 

158 

159 if isinstance(state.data, ResultRecordMetadata): 

160 result = await _get_state_result_data_with_retries( 

161 state, retry_result_failure=retry_result_failure 

162 ) 

163 elif isinstance(state.data, ResultRecord): 

164 result = state.data.result 

165 

166 elif state.data is None: 

167 if state.is_failed() or state.is_crashed() or state.is_cancelled(): 

168 return await aget_state_exception(state) 

169 else: 

170 raise MissingResult( 

171 "State data is missing. " 

172 "Typically, this occurs when result persistence is disabled and the " 

173 "state has been retrieved from the API." 

174 ) 

175 

176 else: 

177 # The result is attached directly 

178 result = state.data 

179 

180 return result 

181 

182 

183def format_exception(exc: BaseException, tb: TracebackType = None) -> str: 1a

184 exc_type = type(exc) 

185 if tb is not None: 

186 formatted = "".join(list(traceback.format_exception(exc_type, exc, tb=tb))) 

187 else: 

188 formatted = f"{exc_type.__name__}: {exc}" 

189 

190 # Trim `prefect` module paths from our exception types 

191 if exc_type.__module__.startswith("prefect."): 

192 formatted = formatted.replace( 

193 f"{exc_type.__module__}.{exc_type.__name__}", exc_type.__name__ 

194 ) 

195 

196 return formatted 

197 

198 

199async def exception_to_crashed_state( 1a

200 exc: BaseException, 

201 result_store: Optional["ResultStore"] = None, 

202) -> State: 

203 """ 

204 Takes an exception that occurs _outside_ of user code and converts it to a 

205 'Crash' exception with a 'Crashed' state. 

206 """ 

207 state_message = None 

208 

209 if isinstance(exc, anyio.get_cancelled_exc_class()): 

210 state_message = "Execution was cancelled by the runtime environment." 

211 

212 elif isinstance(exc, KeyboardInterrupt): 

213 state_message = "Execution was aborted by an interrupt signal." 

214 

215 elif isinstance(exc, TerminationSignal): 

216 state_message = "Execution was aborted by a termination signal." 

217 

218 elif isinstance(exc, SystemExit): 

219 state_message = "Execution was aborted by Python system exit call." 

220 

221 elif isinstance(exc, (httpx.TimeoutException, httpx.ConnectError)): 

222 try: 

223 request: httpx.Request = exc.request 

224 except RuntimeError: 

225 # The request property is not set 

226 state_message = ( 

227 "Request failed while attempting to contact the server:" 

228 f" {format_exception(exc)}" 

229 ) 

230 else: 

231 # TODO: We can check if this is actually our API url 

232 state_message = f"Request to {request.url} failed: {format_exception(exc)}." 

233 

234 else: 

235 state_message = ( 

236 "Execution was interrupted by an unexpected exception:" 

237 f" {format_exception(exc)}" 

238 ) 

239 

240 if result_store: 

241 key = uuid.uuid4().hex 

242 data = result_store.create_result_record(exc, key=key) 

243 else: 

244 # Attach the exception for local usage, will not be available when retrieved 

245 # from the API 

246 data = exc 

247 

248 return Crashed(message=state_message, data=data) 

249 

250 

251async def exception_to_failed_state( 1a

252 exc: Optional[BaseException] = None, 

253 result_store: Optional["ResultStore"] = None, 

254 write_result: bool = False, 

255 **kwargs: Any, 

256) -> State[BaseException]: 

257 """ 

258 Convenience function for creating `Failed` states from exceptions 

259 """ 

260 try: 

261 local_logger = get_run_logger() 

262 except MissingContextError: 

263 local_logger = logger 

264 

265 if not exc: 

266 _, exc, _ = sys.exc_info() 

267 if exc is None: 

268 raise ValueError( 

269 "Exception was not passed and no active exception could be found." 

270 ) 

271 else: 

272 pass 

273 

274 if result_store: 

275 key = uuid.uuid4().hex 

276 data = result_store.create_result_record(exc, key=key) 

277 if write_result: 

278 try: 

279 await result_store.apersist_result_record(data) 

280 except Exception as nested_exc: 

281 local_logger.warning( 

282 "Failed to write result: %s Execution will continue, but the result has not been written", 

283 nested_exc, 

284 ) 

285 else: 

286 # Attach the exception for local usage, will not be available when retrieved 

287 # from the API 

288 data = exc 

289 

290 existing_message = kwargs.pop("message", "") 

291 if existing_message and not existing_message.endswith(" "): 

292 existing_message += " " 

293 

294 # TODO: Consider if we want to include traceback information, it is intentionally 

295 # excluded from messages for now 

296 message = existing_message + format_exception(exc) 

297 

298 state = Failed(data=data, message=message, **kwargs) 

299 state.state_details.retriable = False 

300 

301 return state 

302 

303 

304async def return_value_to_state( 1a

305 retval: "R", 

306 result_store: "ResultStore", 

307 key: Optional[str] = None, 

308 expiration: Optional[datetime.datetime] = None, 

309 write_result: bool = False, 

310) -> "State[R]": 

311 """ 

312 Given a return value from a user's function, create a `State` the run should 

313 be placed in. 

314 

315 - If data is returned, we create a 'COMPLETED' state with the data 

316 - If a single, manually created state is returned, we use that state as given 

317 (manual creation is determined by the lack of ids) 

318 - If an upstream state or iterable of upstream states is returned, we apply the 

319 aggregate rule 

320 

321 The aggregate rule says that given multiple states we will determine the final state 

322 such that: 

323 

324 - If any states are not COMPLETED the final state is FAILED 

325 - If all of the states are COMPLETED the final state is COMPLETED 

326 - The states will be placed in the final state `data` attribute 

327 

328 Callers should resolve all futures into states before passing return values to this 

329 function. 

330 """ 

331 from prefect.results import ( 

332 ResultRecord, 

333 ResultRecordMetadata, 

334 ) 

335 

336 try: 

337 local_logger = get_run_logger() 

338 except MissingContextError: 

339 local_logger = logger 

340 

341 if ( 

342 isinstance(retval, State) 

343 # Check for manual creation 

344 and not retval.state_details.flow_run_id 

345 and not retval.state_details.task_run_id 

346 ): 

347 state = retval 

348 # Unless the user has already constructed a result explicitly, use the store 

349 # to update the data to the correct type 

350 if not isinstance(state.data, (ResultRecord, ResultRecordMetadata)): 

351 result_record = result_store.create_result_record( 

352 state.data, 

353 key=key, 

354 expiration=expiration, 

355 ) 

356 if write_result: 

357 try: 

358 await result_store.apersist_result_record(result_record) 

359 except Exception as exc: 

360 local_logger.warning( 

361 "Encountered an error while persisting result: %s Execution will continue, but the result has not been persisted", 

362 exc, 

363 ) 

364 state.data = result_record 

365 return state 

366 

367 # Determine a new state from the aggregate of contained states 

368 if isinstance(retval, State) or is_state_iterable(retval): 

369 states = StateGroup(ensure_iterable(retval)) 

370 

371 # Determine the new state type 

372 if states.all_completed(): 

373 new_state_type = StateType.COMPLETED 

374 elif states.any_cancelled(): 

375 new_state_type = StateType.CANCELLED 

376 elif states.any_paused(): 

377 new_state_type = StateType.PAUSED 

378 else: 

379 new_state_type = StateType.FAILED 

380 

381 # Generate a nice message for the aggregate 

382 if states.all_completed(): 

383 message = "All states completed." 

384 elif states.any_cancelled(): 

385 message = f"{states.cancelled_count}/{states.total_count} states cancelled." 

386 elif states.any_paused(): 

387 message = f"{states.paused_count}/{states.total_count} states paused." 

388 elif states.any_failed(): 

389 message = f"{states.fail_count}/{states.total_count} states failed." 

390 elif not states.all_final(): 

391 message = ( 

392 f"{states.not_final_count}/{states.total_count} states are not final." 

393 ) 

394 else: 

395 message = "Given states: " + states.counts_message() 

396 

397 # TODO: We may actually want to set the data to a `StateGroup` object and just 

398 # allow it to be unpacked into a tuple and such so users can interact with 

399 # it 

400 result_record = result_store.create_result_record( 

401 retval, 

402 key=key, 

403 expiration=expiration, 

404 ) 

405 if write_result: 

406 try: 

407 await result_store.apersist_result_record(result_record) 

408 except Exception as exc: 

409 local_logger.warning( 

410 "Encountered an error while persisting result: %s Execution will continue, but the result has not been persisted", 

411 exc, 

412 ) 

413 return State( 

414 type=new_state_type, 

415 message=message, 

416 data=result_record, 

417 ) 

418 

419 # Generators aren't portable, implicitly convert them to a list. 

420 if isinstance(retval, GeneratorType): 

421 data = list(retval) 

422 else: 

423 data = retval 

424 

425 # Otherwise, they just gave data and this is a completed retval 

426 if isinstance(data, ResultRecord): 

427 return Completed(data=data) 

428 else: 

429 result_record = result_store.create_result_record( 

430 data, 

431 key=key, 

432 expiration=expiration, 

433 ) 

434 if write_result: 

435 try: 

436 await result_store.apersist_result_record(result_record) 

437 except Exception as exc: 

438 local_logger.warning( 

439 "Encountered an error while persisting result: %s Execution will continue, but the result has not been persisted", 

440 exc, 

441 ) 

442 return Completed(data=result_record) 

443 

444 

445async def aget_state_exception(state: State) -> BaseException: 1a

446 """ 

447 Get the exception from a state asynchronously. 

448 

449 If not given a FAILED or CRASHED state, this raise a value error. 

450 

451 If the state result is a state, its exception will be returned. 

452 

453 If the state result is an iterable of states, the exception of the first failure 

454 will be returned. 

455 

456 If the state result is a string, a wrapper exception will be returned with the 

457 string as the message. 

458 

459 If the state result is null, a wrapper exception will be returned with the state 

460 message attached. 

461 

462 If the state result is not of a known type, a `TypeError` will be returned. 

463 

464 When a wrapper exception is returned, the type will be: 

465 - `FailedRun` if the state type is FAILED. 

466 - `CrashedRun` if the state type is CRASHED. 

467 - `CancelledRun` if the state type is CANCELLED. 

468 """ 

469 from prefect._result_records import ( 

470 ResultRecord, 

471 ResultRecordMetadata, 

472 ) 

473 from prefect.results import ResultStore 

474 

475 if state.is_failed(): 

476 wrapper = FailedRun 

477 default_message = "Run failed." 

478 elif state.is_crashed(): 

479 wrapper = CrashedRun 

480 default_message = "Run crashed." 

481 elif state.is_cancelled(): 

482 wrapper = CancelledRun 

483 default_message = "Run cancelled." 

484 else: 

485 raise ValueError(f"Expected failed or crashed state got {state!r}.") 

486 

487 if isinstance(state.data, ResultRecord): 

488 result = state.data.result 

489 elif isinstance(state.data, ResultRecordMetadata): 

490 record = await ResultStore._from_metadata(state.data) 

491 result = record.result 

492 elif state.data is None: 

493 result = None 

494 else: 

495 result = state.data 

496 

497 if result is None: 

498 return wrapper(state.message or default_message) 

499 

500 if isinstance(result, Exception): 

501 return result 

502 

503 elif isinstance(result, BaseException): 

504 return result 

505 

506 elif isinstance(result, str): 

507 return wrapper(result) 

508 

509 elif isinstance(result, State): 

510 # Return the exception from the inner state 

511 return await aget_state_exception(result) 

512 

513 elif is_state_iterable(result): 

514 # Return the first failure 

515 for state in result: 

516 if state.is_failed() or state.is_crashed() or state.is_cancelled(): 

517 return await aget_state_exception(state) 

518 

519 raise ValueError( 

520 "Failed state result was an iterable of states but none were failed." 

521 ) 

522 

523 else: 

524 raise TypeError( 

525 f"Unexpected result for failed state: {result!r} —— " 

526 f"{type(result).__name__} cannot be resolved into an exception" 

527 ) 

528 

529 

530@async_dispatch(aget_state_exception) 1a

531def get_state_exception(state: State) -> BaseException: 1a

532 """ 

533 Get the exception from a state. 

534 

535 If not given a FAILED or CRASHED state, this raise a value error. 

536 

537 If the state result is a state, its exception will be returned. 

538 

539 If the state result is an iterable of states, the exception of the first failure 

540 will be returned. 

541 

542 If the state result is a string, a wrapper exception will be returned with the 

543 string as the message. 

544 

545 If the state result is null, a wrapper exception will be returned with the state 

546 message attached. 

547 

548 If the state result is not of a known type, a `TypeError` will be returned. 

549 

550 When a wrapper exception is returned, the type will be: 

551 - `FailedRun` if the state type is FAILED. 

552 - `CrashedRun` if the state type is CRASHED. 

553 - `CancelledRun` if the state type is CANCELLED. 

554 """ 

555 from prefect._result_records import ( 

556 ResultRecord, 

557 ResultRecordMetadata, 

558 ) 

559 from prefect.results import ResultStore, resolve_result_storage 

560 

561 if state.is_failed(): 

562 wrapper = FailedRun 

563 default_message = "Run failed." 

564 elif state.is_crashed(): 

565 wrapper = CrashedRun 

566 default_message = "Run crashed." 

567 elif state.is_cancelled(): 

568 wrapper = CancelledRun 

569 default_message = "Run cancelled." 

570 else: 

571 raise ValueError(f"Expected failed or crashed state got {state!r}.") 

572 

573 if isinstance(state.data, ResultRecord): 

574 result = state.data.result 

575 elif isinstance(state.data, ResultRecordMetadata): 

576 # Inline sync version of _from_metadata 

577 metadata = state.data 

578 if metadata.storage_block_id is None: 

579 storage_block = None 

580 else: 

581 storage_block = resolve_result_storage( 

582 metadata.storage_block_id, _sync=True 

583 ) 

584 store = ResultStore( 

585 result_storage=storage_block, serializer=metadata.serializer 

586 ) 

587 if metadata.storage_key is None: 

588 raise ValueError( 

589 "storage_key is required to hydrate a result record from metadata" 

590 ) 

591 record = store.read(metadata.storage_key) 

592 result = record.result 

593 elif state.data is None: 

594 result = None 

595 else: 

596 result = state.data 

597 

598 if result is None: 

599 return wrapper(state.message or default_message) 

600 

601 if isinstance(result, Exception): 

602 return result 

603 

604 elif isinstance(result, BaseException): 

605 return result 

606 

607 elif isinstance(result, str): 

608 return wrapper(result) 

609 

610 elif isinstance(result, State): 

611 # Return the exception from the inner state 

612 return get_state_exception(result) 

613 

614 elif is_state_iterable(result): 

615 # Return the first failure 

616 for state in result: 

617 if state.is_failed() or state.is_crashed() or state.is_cancelled(): 

618 return get_state_exception(state) 

619 

620 raise ValueError( 

621 "Failed state result was an iterable of states but none were failed." 

622 ) 

623 

624 else: 

625 raise TypeError( 

626 f"Unexpected result for failed state: {result!r} —— " 

627 f"{type(result).__name__} cannot be resolved into an exception" 

628 ) 

629 

630 

631async def araise_state_exception(state: State) -> None: 1a

632 """ 

633 Given a FAILED or CRASHED state, raise the contained exception asynchronously. 

634 """ 

635 if not (state.is_failed() or state.is_crashed() or state.is_cancelled()): 

636 return None 

637 

638 raise await aget_state_exception(state) 

639 

640 

641@async_dispatch(araise_state_exception) 1a

642def raise_state_exception(state: State) -> None: 1a

643 """ 

644 Given a FAILED or CRASHED state, raise the contained exception. 

645 """ 

646 if not (state.is_failed() or state.is_crashed() or state.is_cancelled()): 

647 return None 

648 

649 raise get_state_exception(state) 

650 

651 

652def is_state_iterable(obj: Any) -> TypeGuard[Iterable[State]]: 1a

653 """ 

654 Check if a the given object is an iterable of states types 

655 

656 Supported iterables are: 

657 - set 

658 - list 

659 - tuple 

660 

661 Other iterables will return `False` even if they contain states. 

662 """ 

663 # We do not check for arbitrary iterables because this is not intended to be used 

664 # for things like dictionaries, dataframes, or pydantic models 

665 if ( 

666 not isinstance(obj, BaseAnnotation) 

667 and isinstance(obj, (list, set, tuple)) 

668 and obj 

669 ): 

670 return all([isinstance(o, State) for o in obj]) 

671 else: 

672 return False 

673 

674 

675class StateGroup: 1a

676 def __init__(self, states: list[State]) -> None: 1a

677 self.states: list[State] = states 

678 self.type_counts: dict[StateType, int] = self._get_type_counts(states) 

679 self.total_count: int = len(states) 

680 self.cancelled_count: int = self.type_counts[StateType.CANCELLED] 

681 self.final_count: int = sum(state.is_final() for state in states) 

682 self.not_final_count: int = self.total_count - self.final_count 

683 self.paused_count: int = self.type_counts[StateType.PAUSED] 

684 

685 @property 1a

686 def fail_count(self) -> int: 1a

687 return self.type_counts[StateType.FAILED] + self.type_counts[StateType.CRASHED] 

688 

689 def all_completed(self) -> bool: 1a

690 return self.type_counts[StateType.COMPLETED] == self.total_count 

691 

692 def any_cancelled(self) -> bool: 1a

693 return self.cancelled_count > 0 

694 

695 def any_failed(self) -> bool: 1a

696 return ( 

697 self.type_counts[StateType.FAILED] > 0 

698 or self.type_counts[StateType.CRASHED] > 0 

699 ) 

700 

701 def any_paused(self) -> bool: 1a

702 return self.paused_count > 0 

703 

704 def all_final(self) -> bool: 1a

705 return self.final_count == self.total_count 

706 

707 def counts_message(self) -> str: 1a

708 count_messages = [f"total={self.total_count}"] 

709 if self.not_final_count: 

710 count_messages.append(f"not_final={self.not_final_count}") 

711 

712 count_messages += [ 

713 f"{state_type.value!r}={count}" 

714 for state_type, count in self.type_counts.items() 

715 if count 

716 ] 

717 

718 return ", ".join(count_messages) 

719 

720 @staticmethod 1a

721 def _get_type_counts(states: Iterable[State]) -> Dict[StateType, int]: 1a

722 return Counter(state.type for state in states) 

723 

724 def __repr__(self) -> str: 1a

725 return f"StateGroup<{self.counts_message()}>" 

726 

727 

728def _traced(cls: Type["State[R]"], **kwargs: Any) -> "State[R]": 1a

729 state_details = StateDetails.model_validate(kwargs.pop("state_details", {})) 

730 

731 carrier = {} 

732 propagate.inject(carrier) 

733 state_details.traceparent = carrier.get("traceparent") 

734 

735 return cls(**kwargs, state_details=state_details) 

736 

737 

738def Scheduled( 1a

739 cls: Type["State[R]"] = State, 

740 scheduled_time: Optional[datetime.datetime] = None, 

741 **kwargs: Any, 

742) -> "State[R]": 

743 """Convenience function for creating `Scheduled` states. 

744 

745 Returns: 

746 State: a Scheduled state 

747 """ 

748 state_details = StateDetails.model_validate(kwargs.pop("state_details", {})) 

749 if scheduled_time is None: 

750 scheduled_time = now() 

751 elif state_details.scheduled_time: 

752 raise ValueError("An extra scheduled_time was provided in state_details") 

753 state_details.scheduled_time = scheduled_time 

754 

755 return _traced(cls, type=StateType.SCHEDULED, state_details=state_details, **kwargs) 

756 

757 

758def Completed(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a

759 """Convenience function for creating `Completed` states. 

760 

761 Returns: 

762 State: a Completed state 

763 """ 

764 

765 return _traced(cls, type=StateType.COMPLETED, **kwargs) 

766 

767 

768def Running(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a

769 """Convenience function for creating `Running` states. 

770 

771 Returns: 

772 State: a Running state 

773 """ 

774 return _traced(cls, type=StateType.RUNNING, **kwargs) 

775 

776 

777def Failed(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a

778 """Convenience function for creating `Failed` states. 

779 

780 Returns: 

781 State: a Failed state 

782 """ 

783 return _traced(cls, type=StateType.FAILED, **kwargs) 

784 

785 

786def Crashed(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a

787 """Convenience function for creating `Crashed` states. 

788 

789 Returns: 

790 State: a Crashed state 

791 """ 

792 return _traced(cls, type=StateType.CRASHED, **kwargs) 

793 

794 

795def Cancelling(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a

796 """Convenience function for creating `Cancelling` states. 

797 

798 Returns: 

799 State: a Cancelling state 

800 """ 

801 return _traced(cls, type=StateType.CANCELLING, **kwargs) 

802 

803 

804def Cancelled(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a

805 """Convenience function for creating `Cancelled` states. 

806 

807 Returns: 

808 State: a Cancelled state 

809 """ 

810 return _traced(cls, type=StateType.CANCELLED, **kwargs) 

811 

812 

813def Pending(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a

814 """Convenience function for creating `Pending` states. 

815 

816 Returns: 

817 State: a Pending state 

818 """ 

819 return _traced(cls, type=StateType.PENDING, **kwargs) 

820 

821 

822def Paused( 1a

823 cls: Type["State[R]"] = State, 

824 timeout_seconds: Optional[int] = None, 

825 pause_expiration_time: Optional[datetime.datetime] = None, 

826 reschedule: bool = False, 

827 pause_key: Optional[str] = None, 

828 **kwargs: Any, 

829) -> "State[R]": 

830 """Convenience function for creating `Paused` states. 

831 

832 Returns: 

833 State: a Paused state 

834 """ 

835 state_details = StateDetails.model_validate(kwargs.pop("state_details", {})) 

836 

837 if state_details.pause_timeout: 

838 raise ValueError("An extra pause timeout was provided in state_details") 

839 

840 if pause_expiration_time is not None and timeout_seconds is not None: 

841 raise ValueError( 

842 "Cannot supply both a pause_expiration_time and timeout_seconds" 

843 ) 

844 

845 if pause_expiration_time is None and timeout_seconds is None: 

846 pass 

847 else: 

848 state_details.pause_timeout = ( 

849 pause_expiration_time 

850 if pause_expiration_time 

851 else now() + datetime.timedelta(seconds=timeout_seconds or 0) 

852 ) 

853 

854 state_details.pause_reschedule = reschedule 

855 state_details.pause_key = pause_key 

856 

857 return _traced(cls, type=StateType.PAUSED, state_details=state_details, **kwargs) 

858 

859 

860def Suspended( 1a

861 cls: Type["State[R]"] = State, 

862 timeout_seconds: Optional[int] = None, 

863 pause_expiration_time: Optional[datetime.datetime] = None, 

864 pause_key: Optional[str] = None, 

865 **kwargs: Any, 

866) -> "State[R]": 

867 """Convenience function for creating `Suspended` states. 

868 

869 Returns: 

870 State: a Suspended state 

871 """ 

872 return Paused( 

873 cls=cls, 

874 name="Suspended", 

875 reschedule=True, 

876 timeout_seconds=timeout_seconds, 

877 pause_expiration_time=pause_expiration_time, 

878 pause_key=pause_key, 

879 **kwargs, 

880 ) 

881 

882 

883def AwaitingRetry( 1a

884 cls: Type["State[R]"] = State, 

885 scheduled_time: Optional[datetime.datetime] = None, 

886 **kwargs: Any, 

887) -> "State[R]": 

888 """Convenience function for creating `AwaitingRetry` states. 

889 

890 Returns: 

891 State: an AwaitingRetry state 

892 """ 

893 return Scheduled( 

894 cls=cls, scheduled_time=scheduled_time, name="AwaitingRetry", **kwargs 

895 ) 

896 

897 

898def AwaitingConcurrencySlot( 1a

899 cls: Type["State[R]"] = State, 

900 scheduled_time: Optional[datetime.datetime] = None, 

901 **kwargs: Any, 

902) -> "State[R]": 

903 """Convenience function for creating `AwaitingConcurrencySlot` states. 

904 

905 Returns: 

906 State: an AwaitingConcurrencySlot state 

907 """ 

908 return Scheduled( 

909 cls=cls, scheduled_time=scheduled_time, name="AwaitingConcurrencySlot", **kwargs 

910 ) 

911 

912 

913def Retrying(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a

914 """Convenience function for creating `Retrying` states. 

915 

916 Returns: 

917 State: a Retrying state 

918 """ 

919 return _traced(cls, type=StateType.RUNNING, name="Retrying", **kwargs) 

920 

921 

922def Late( 1a

923 cls: Type["State[R]"] = State, 

924 scheduled_time: Optional[datetime.datetime] = None, 

925 **kwargs: Any, 

926) -> "State[R]": 

927 """Convenience function for creating `Late` states. 

928 

929 Returns: 

930 State: a Late state 

931 """ 

932 return Scheduled(cls=cls, scheduled_time=scheduled_time, name="Late", **kwargs)