Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/rules.py: 33%

276 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Prefect's flow and task-run orchestration machinery. 

3 

4This module contains all the core concepts necessary to implement Prefect's state 

5orchestration engine. These states correspond to intuitive descriptions of all the 

6points that a Prefect flow or task can observe executing user code and intervene, if 

7necessary. A detailed description of states can be found in our concept 

8[documentation](https://docs.prefect.io/v3/concepts/states). 

9 

10Prefect's orchestration engine operates under the assumption that no governed user code 

11will execute without first requesting Prefect REST API validate a change in state and record 

12metadata about the run. With all attempts to run user code being checked against a 

13Prefect instance, the Prefect REST API database becomes the unambiguous source of truth for managing 

14the execution of complex interacting workflows. Orchestration rules can be implemented 

15as discrete units of logic that operate against each state transition and can be fully 

16observable, extensible, and customizable -- all without needing to store or parse a 

17single line of user code. 

18""" 

19 

20from __future__ import annotations 1a

21 

22import contextlib 1a

23from types import TracebackType 1a

24from typing import ( 1a

25 TYPE_CHECKING, 

26 Any, 

27 ClassVar, 

28 Generic, 

29 Iterable, 

30 Optional, 

31 TypeVar, 

32 Union, 

33) 

34 

35from pydantic import ConfigDict, Field 1a

36from sqlalchemy.ext.asyncio import AsyncSession 1a

37from typing_extensions import Self 1a

38 

39from prefect.logging import get_logger 1a

40from prefect.server.database import PrefectDBInterface, inject_db, orm_models 1a

41from prefect.server.database.dependencies import db_injector 1a

42from prefect.server.exceptions import OrchestrationError 1a

43from prefect.server.models import artifacts, flow_runs 1a

44from prefect.server.schemas import core, states 1a

45from prefect.server.schemas.responses import ( 1a

46 SetStateStatus, 

47 StateAbortDetails, 

48 StateAcceptDetails, 

49 StateRejectDetails, 

50 StateResponseDetails, 

51 StateWaitDetails, 

52) 

53from prefect.server.utilities.schemas import PrefectBaseModel 1a

54 

55if TYPE_CHECKING: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true1a

56 from logging import Logger 

57 

58 

59# all valid state types in the context of a task- or flow- run transition 

60ALL_ORCHESTRATION_STATES: set[states.StateType | None] = {*states.StateType, None} 1a

61 

62# all terminal states 

63TERMINAL_STATES = set(states.TERMINAL_STATES) 1a

64 

65logger: "Logger" = get_logger("server") 1a

66 

67T = TypeVar("T", bound=orm_models.Run) 1a

68RP = TypeVar("RP", bound=Union[core.FlowRunPolicy, core.TaskRunPolicy]) 1a

69 

70 

71class OrchestrationContext(PrefectBaseModel, Generic[T, RP]): 1a

72 """ 

73 A container for a state transition, governed by orchestration rules. 

74 

75 Note: 

76 An `OrchestrationContext` should not be instantiated directly, instead 

77 use the flow- or task- specific subclasses, `FlowOrchestrationContext` and 

78 `TaskOrchestrationContext`. 

79 

80 When a flow- or task- run attempts to change state, Prefect REST API has an opportunity 

81 to decide whether this transition can proceed. All the relevant information 

82 associated with the state transition is stored in an `OrchestrationContext`, 

83 which is subsequently governed by nested orchestration rules implemented using 

84 the `BaseOrchestrationRule` ABC. 

85 

86 `OrchestrationContext` introduces the concept of a state being `None` in the 

87 context of an intended state transition. An initial state can be `None` if a run 

88 is is attempting to set a state for the first time. The proposed state might be 

89 `None` if a rule governing the transition determines that no state change 

90 should occur at all and nothing is written to the database. 

91 

92 Attributes: 

93 session: a SQLAlchemy database session 

94 initial_state: the initial state of a run 

95 proposed_state: the proposed state a run is transitioning into 

96 validated_state: a proposed state that has committed to the database 

97 rule_signature: a record of rules that have fired on entry into a 

98 managed context, currently only used for debugging purposes 

99 finalization_signature: a record of rules that have fired on exit from a 

100 managed context, currently only used for debugging purposes 

101 response_status: a SetStateStatus object used to build the API response 

102 response_details:a StateResponseDetails object use to build the API response 

103 

104 Args: 

105 session: a SQLAlchemy database session 

106 initial_state: the initial state of a run 

107 proposed_state: the proposed state a run is transitioning into 

108 """ 

109 

110 model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True) 1a

111 

112 session: AsyncSession 1a

113 initial_state: Optional[states.State] = None 1a

114 proposed_state: Optional[states.State] = None 1a

115 validated_state: Optional[states.State] = Field(default=None) 1a

116 rule_signature: list[str] = Field(default_factory=list) 1a

117 finalization_signature: list[str] = Field(default_factory=list) 1a

118 response_status: SetStateStatus = Field(default=SetStateStatus.ACCEPT) 1a

119 response_details: StateResponseDetails = Field(default_factory=StateAcceptDetails) 1a

120 orchestration_error: Optional[Exception] = Field(default=None) 1a

121 parameters: dict[Any, Any] = Field(default_factory=dict) 1a

122 client_version: Optional[str] = None 1a

123 run: T 1a

124 

125 @property 1a

126 def initial_state_type(self) -> Optional[states.StateType]: 1a

127 """The state type of `self.initial_state` if it exists.""" 

128 

129 return self.initial_state.type if self.initial_state else None 

130 

131 @property 1a

132 def proposed_state_type(self) -> Optional[states.StateType]: 1a

133 """The state type of `self.proposed_state` if it exists.""" 

134 

135 return self.proposed_state.type if self.proposed_state else None 

136 

137 @property 1a

138 def validated_state_type(self) -> Optional[states.StateType]: 1a

139 """The state type of `self.validated_state` if it exists.""" 

140 return self.validated_state.type if self.validated_state else None 

141 

142 @property 1a

143 def run_settings(self) -> RP: 1a

144 """Run-level settings used to orchestrate the state transition.""" 

145 raise NotImplementedError( 

146 "Run-level settings are not supported for this context" 

147 ) 

148 

149 def safe_copy(self) -> Self: 1a

150 """ 

151 Creates a mostly-mutation-safe copy for use in orchestration rules. 

152 

153 Orchestration rules govern state transitions using information stored in 

154 an `OrchestrationContext`. However, mutating objects stored on the context 

155 directly can have unintended side-effects. To guard against this, 

156 `self.safe_copy` can be used to pass information to orchestration rules 

157 without risking mutation. 

158 

159 Returns: 

160 A mutation-safe copy of the `OrchestrationContext` 

161 """ 

162 

163 safe_copy = self.model_copy() 

164 

165 safe_copy.initial_state = ( 

166 self.initial_state.model_copy() if self.initial_state else None 

167 ) 

168 safe_copy.proposed_state = ( 

169 self.proposed_state.model_copy() if self.proposed_state else None 

170 ) 

171 safe_copy.validated_state = ( 

172 self.validated_state.model_copy() if self.validated_state else None 

173 ) 

174 safe_copy.parameters = self.parameters.copy() 

175 return safe_copy 

176 

177 def entry_context( 1a

178 self, 

179 ) -> tuple[Optional[states.State], Optional[states.State], Self]: 

180 """ 

181 A convenience method that generates input parameters for orchestration rules. 

182 

183 An `OrchestrationContext` defines a state transition that is managed by 

184 orchestration rules which can fire hooks before a transition has been committed 

185 to the database. These hooks have a consistent interface which can be generated 

186 with this method. 

187 """ 

188 

189 safe_context = self.safe_copy() 

190 return safe_context.initial_state, safe_context.proposed_state, safe_context 

191 

192 def exit_context( 1a

193 self, 

194 ) -> tuple[Optional[states.State], Optional[states.State], Self]: 

195 """ 

196 A convenience method that generates input parameters for orchestration rules. 

197 

198 An `OrchestrationContext` defines a state transition that is managed by 

199 orchestration rules which can fire hooks after a transition has been committed 

200 to the database. These hooks have a consistent interface which can be generated 

201 with this method. 

202 """ 

203 

204 safe_context = self.safe_copy() 

205 return safe_context.initial_state, safe_context.validated_state, safe_context 

206 

207 async def flow_run(self) -> orm_models.FlowRun | None: 1a

208 raise NotImplementedError("Flow run is not supported for this context") 

209 

210 

211class FlowOrchestrationContext( 1a

212 OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

213): 

214 """ 

215 A container for a flow run state transition, governed by orchestration rules. 

216 

217 When a flow- run attempts to change state, Prefect REST API has an opportunity 

218 to decide whether this transition can proceed. All the relevant information 

219 associated with the state transition is stored in an `OrchestrationContext`, 

220 which is subsequently governed by nested orchestration rules implemented using 

221 the `BaseOrchestrationRule` ABC. 

222 

223 `FlowOrchestrationContext` introduces the concept of a state being `None` in the 

224 context of an intended state transition. An initial state can be `None` if a run 

225 is is attempting to set a state for the first time. The proposed state might be 

226 `None` if a rule governing the transition determines that no state change 

227 should occur at all and nothing is written to the database. 

228 

229 Attributes: 

230 session: a SQLAlchemy database session 

231 run: the flow run attempting to change state 

232 initial_state: the initial state of the run 

233 proposed_state: the proposed state the run is transitioning into 

234 validated_state: a proposed state that has committed to the database 

235 rule_signature: a record of rules that have fired on entry into a 

236 managed context, currently only used for debugging purposes 

237 finalization_signature: a record of rules that have fired on exit from a 

238 managed context, currently only used for debugging purposes 

239 response_status: a SetStateStatus object used to build the API response 

240 response_details:a StateResponseDetails object use to build the API response 

241 

242 Args: 

243 session: a SQLAlchemy database session 

244 run: the flow run attempting to change state 

245 initial_state: the initial state of a run 

246 proposed_state: the proposed state a run is transitioning into 

247 """ 

248 

249 run: orm_models.FlowRun 1a

250 

251 @inject_db 1a

252 async def validate_proposed_state( 1a

253 self, 

254 db: PrefectDBInterface, 

255 ): 

256 """ 

257 Validates a proposed state by committing it to the database. 

258 

259 After the `FlowOrchestrationContext` is governed by orchestration rules, the 

260 proposed state can be validated: the proposed state is added to the current 

261 SQLAlchemy session and is flushed. `self.validated_state` set to the flushed 

262 state. The state on the run is set to the validated state as well. 

263 

264 If the proposed state is `None` when this method is called, no state will be 

265 written and `self.validated_state` will be set to the run's current state. 

266 

267 Returns: 

268 None 

269 """ 

270 # (circular import) 

271 from prefect.server.api.server import is_client_retryable_exception 

272 

273 try: 

274 await self._validate_proposed_state() 

275 return 

276 except Exception as exc: 

277 logger.exception("Encountered error during state validation") 

278 self.proposed_state: states.State[Any] | None = None 

279 

280 if is_client_retryable_exception(exc): 

281 # Do not capture retryable database exceptions, this exception will be 

282 # raised as a 503 in the API layer 

283 raise 

284 

285 reason = f"Error validating state: {exc!r}" 

286 self.response_status = SetStateStatus.ABORT 

287 self.response_details: StateResponseDetails = StateAbortDetails( 

288 reason=reason 

289 ) 

290 

291 @db_injector 1a

292 async def _validate_proposed_state( 1a

293 self, 

294 db: PrefectDBInterface, 

295 ): 

296 if self.proposed_state is None: 

297 validated_orm_state = self.run.state 

298 state_data = None 

299 if ( 

300 self.run.state is not None 

301 and self.run.state.result_artifact_id is not None 

302 ): 

303 # We cannot access `self.run.state.data` directly for unknown reasons 

304 artifact = await artifacts.read_artifact( 

305 self.session, self.run.state.result_artifact_id 

306 ) 

307 state_data = artifact.data if artifact else None 

308 else: 

309 state_payload = self.proposed_state.model_dump_for_orm() 

310 state_data: dict[str, Any] | Any | None = state_payload.pop("data", None) 

311 

312 if state_data is not None and not ( 

313 isinstance(state_data, dict) and state_data.get("type") == "unpersisted" 

314 ): 

315 state_result_artifact = core.Artifact.from_result(state_data) 

316 state_result_artifact.flow_run_id = self.run.id 

317 await artifacts.create_artifact(self.session, state_result_artifact) 

318 state_payload["result_artifact_id"] = state_result_artifact.id 

319 

320 validated_orm_state = db.FlowRunState( 

321 flow_run_id=self.run.id, 

322 **state_payload, 

323 ) 

324 

325 if validated_orm_state: 

326 self.session.add(validated_orm_state) 

327 self.run.set_state(validated_orm_state) 

328 

329 await self.session.flush() 

330 self.validated_state = states.State.from_orm_without_result( 

331 validated_orm_state, with_data=state_data 

332 ) 

333 else: 

334 self.validated_state = None 

335 

336 def safe_copy(self) -> Self: 1a

337 """ 

338 Creates a mostly-mutation-safe copy for use in orchestration rules. 

339 

340 Orchestration rules govern state transitions using information stored in 

341 an `OrchestrationContext`. However, mutating objects stored on the context 

342 directly can have unintended side-effects. To guard against this, 

343 `self.safe_copy` can be used to pass information to orchestration rules 

344 without risking mutation. 

345 

346 Note: 

347 `self.run` is an ORM model, and even when copied is unsafe to mutate 

348 

349 Returns: 

350 A mutation-safe copy of `FlowOrchestrationContext` 

351 """ 

352 

353 return super().safe_copy() 

354 

355 @property 1a

356 def run_settings(self) -> core.FlowRunPolicy: 1a

357 """Run-level settings used to orchestrate the state transition.""" 

358 

359 return self.run.empirical_policy 

360 

361 async def task_run(self) -> None: 1a

362 return None 

363 

364 async def flow_run(self) -> orm_models.FlowRun: 1a

365 return self.run 

366 

367 

368class TaskOrchestrationContext( 1a

369 OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy] 

370): 

371 """ 

372 A container for a task run state transition, governed by orchestration rules. 

373 

374 When a task- run attempts to change state, Prefect REST API has an opportunity 

375 to decide whether this transition can proceed. All the relevant information 

376 associated with the state transition is stored in an `OrchestrationContext`, 

377 which is subsequently governed by nested orchestration rules implemented using 

378 the `BaseOrchestrationRule` ABC. 

379 

380 `TaskOrchestrationContext` introduces the concept of a state being `None` in the 

381 context of an intended state transition. An initial state can be `None` if a run 

382 is is attempting to set a state for the first time. The proposed state might be 

383 `None` if a rule governing the transition determines that no state change 

384 should occur at all and nothing is written to the database. 

385 

386 Attributes: 

387 session: a SQLAlchemy database session 

388 run: the task run attempting to change state 

389 initial_state: the initial state of the run 

390 proposed_state: the proposed state the run is transitioning into 

391 validated_state: a proposed state that has committed to the database 

392 rule_signature: a record of rules that have fired on entry into a 

393 managed context, currently only used for debugging purposes 

394 finalization_signature: a record of rules that have fired on exit from a 

395 managed context, currently only used for debugging purposes 

396 response_status: a SetStateStatus object used to build the API response 

397 response_details:a StateResponseDetails object use to build the API response 

398 

399 Args: 

400 session: a SQLAlchemy database session 

401 run: the task run attempting to change state 

402 initial_state: the initial state of a run 

403 proposed_state: the proposed state a run is transitioning into 

404 """ 

405 

406 run: orm_models.TaskRun 1a

407 

408 @inject_db 1a

409 async def validate_proposed_state( 1a

410 self, 

411 db: PrefectDBInterface, 

412 ): 

413 """ 

414 Validates a proposed state by committing it to the database. 

415 

416 After the `TaskOrchestrationContext` is governed by orchestration rules, the 

417 proposed state can be validated: the proposed state is added to the current 

418 SQLAlchemy session and is flushed. `self.validated_state` set to the flushed 

419 state. The state on the run is set to the validated state as well. 

420 

421 If the proposed state is `None` when this method is called, no state will be 

422 written and `self.validated_state` will be set to the run's current state. 

423 

424 Returns: 

425 None 

426 """ 

427 # (circular import) 

428 from prefect.server.api.server import is_client_retryable_exception 

429 

430 try: 

431 await self._validate_proposed_state() 

432 return 

433 except Exception as exc: 

434 logger.exception("Encountered error during state validation") 

435 self.proposed_state: states.State | None = None 

436 

437 if is_client_retryable_exception(exc): 

438 # Do not capture retryable database exceptions, this exception will be 

439 # raised as a 503 in the API layer 

440 raise 

441 

442 reason = f"Error validating state: {exc!r}" 

443 self.response_status = SetStateStatus.ABORT 

444 self.response_details: StateResponseDetails = StateAbortDetails( 

445 reason=reason 

446 ) 

447 

448 @db_injector 1a

449 async def _validate_proposed_state( 1a

450 self, 

451 db: PrefectDBInterface, 

452 ): 

453 if self.proposed_state is None: 

454 validated_orm_state = self.run.state 

455 state_data = None 

456 if ( 

457 self.run.state is not None 

458 and self.run.state.result_artifact_id is not None 

459 ): 

460 # We cannot access `self.run.state.data` directly for unknown reasons 

461 artifact = await artifacts.read_artifact( 

462 self.session, self.run.state.result_artifact_id 

463 ) 

464 state_data = artifact.data if artifact else None 

465 else: 

466 state_payload = self.proposed_state.model_dump_for_orm() 

467 state_data: dict[str, Any] | Any | None = state_payload.pop("data", None) 

468 

469 if state_data is not None and not ( 

470 isinstance(state_data, dict) and state_data.get("type") == "unpersisted" 

471 ): 

472 state_result_artifact = core.Artifact.from_result(state_data) 

473 state_result_artifact.task_run_id = self.run.id 

474 

475 if self.run.flow_run_id is not None: 

476 state_result_artifact.flow_run_id = self.run.flow_run_id 

477 

478 await artifacts.create_artifact(self.session, state_result_artifact) 

479 state_payload["result_artifact_id"] = state_result_artifact.id 

480 

481 validated_orm_state = db.TaskRunState( 

482 task_run_id=self.run.id, 

483 **state_payload, 

484 ) 

485 

486 if validated_orm_state: 

487 self.session.add(validated_orm_state) 

488 self.run.set_state(validated_orm_state) 

489 

490 await self.session.flush() 

491 self.validated_state = states.State.from_orm_without_result( 

492 validated_orm_state, with_data=state_data 

493 ) 

494 else: 

495 self.validated_state = None 

496 

497 def safe_copy(self) -> Self: 1a

498 """ 

499 Creates a mostly-mutation-safe copy for use in orchestration rules. 

500 

501 Orchestration rules govern state transitions using information stored in 

502 an `OrchestrationContext`. However, mutating objects stored on the context 

503 directly can have unintended side-effects. To guard against this, 

504 `self.safe_copy` can be used to pass information to orchestration rules 

505 without risking mutation. 

506 

507 Note: 

508 `self.run` is an ORM model, and even when copied is unsafe to mutate 

509 

510 Returns: 

511 A mutation-safe copy of `TaskOrchestrationContext` 

512 """ 

513 

514 return super().safe_copy() 

515 

516 @property 1a

517 def run_settings(self) -> core.TaskRunPolicy: 1a

518 """Run-level settings used to orchestrate the state transition.""" 

519 

520 return self.run.empirical_policy 

521 

522 async def task_run(self) -> orm_models.TaskRun: 1a

523 return self.run 

524 

525 async def flow_run(self) -> orm_models.FlowRun | None: 1a

526 if self.run.flow_run_id is None: 

527 return None 

528 return await flow_runs.read_flow_run( 

529 session=self.session, 

530 flow_run_id=self.run.flow_run_id, 

531 ) 

532 

533 

534class BaseOrchestrationRule( 1a

535 contextlib.AbstractAsyncContextManager[OrchestrationContext[T, RP]] 

536): 

537 """ 

538 An abstract base class used to implement a discrete piece of orchestration logic. 

539 

540 An `OrchestrationRule` is a stateful context manager that directly governs a state 

541 transition. Complex orchestration is achieved by nesting multiple rules. 

542 Each rule runs against an `OrchestrationContext` that contains the transition 

543 details; this context is then passed to subsequent rules. The context can be 

544 modified by hooks that fire before and after a new state is validated and committed 

545 to the database. These hooks will fire as long as the state transition is 

546 considered "valid" and govern a transition by either modifying the proposed state 

547 before it is validated or by producing a side-effect. 

548 

549 A state transition occurs whenever a flow- or task- run changes state, prompting 

550 Prefect REST API to decide whether or not this transition can proceed. The current state of 

551 the run is referred to as the "initial state", and the state a run is 

552 attempting to transition into is the "proposed state". Together, the initial state 

553 transitioning into the proposed state is the intended transition that is governed 

554 by these orchestration rules. After using rules to enter a runtime context, the 

555 `OrchestrationContext` will contain a proposed state that has been governed by 

556 each rule, and at that point can validate the proposed state and commit it to 

557 the database. The validated state will be set on the context as 

558 `context.validated_state`, and rules will call the `self.after_transition` hook 

559 upon exiting the managed context. 

560 

561 Examples: 

562 

563 Create a rule: 

564 

565 ```python 

566 class BasicRule(BaseOrchestrationRule): 

567 # allowed initial state types 

568 FROM_STATES = [StateType.RUNNING] 

569 # allowed proposed state types 

570 TO_STATES = [StateType.COMPLETED, StateType.FAILED] 

571 

572 async def before_transition(initial_state, proposed_state, ctx): 

573 # side effects and proposed state mutation can happen here 

574 ... 

575 

576 async def after_transition(initial_state, validated_state, ctx): 

577 # operations on states that have been validated can happen here 

578 ... 

579 

580 async def cleanup(intitial_state, validated_state, ctx): 

581 # reverts side effects generated by `before_transition` if necessary 

582 ... 

583 ``` 

584 

585 Use a rule: 

586 

587 ```python 

588 intended_transition = (StateType.RUNNING, StateType.COMPLETED) 

589 async with BasicRule(context, *intended_transition): 

590 # context.proposed_state has been governed by BasicRule 

591 ... 

592 ``` 

593 

594 Use multiple rules: 

595 

596 ```python 

597 rules = [BasicRule, BasicRule] 

598 intended_transition = (StateType.RUNNING, StateType.COMPLETED) 

599 async with contextlib.AsyncExitStack() as stack: 

600 for rule in rules: 

601 stack.enter_async_context(rule(context, *intended_transition)) 

602 

603 # context.proposed_state has been governed by all rules 

604 ... 

605 ``` 

606 

607 Attributes: 

608 FROM_STATES: list of valid initial state types this rule governs 

609 TO_STATES: list of valid proposed state types this rule governs 

610 context: the orchestration context 

611 from_state_type: the state type a run is currently in 

612 to_state_type: the intended proposed state type prior to any orchestration 

613 

614 Args: 

615 context: A `FlowOrchestrationContext` or `TaskOrchestrationContext` that is 

616 passed between rules 

617 from_state_type: The state type of the initial state of a run, if this 

618 state type is not contained in `FROM_STATES`, no hooks will fire 

619 to_state_type: The state type of the proposed state before orchestration, if 

620 this state type is not contained in `TO_STATES`, no hooks will fire 

621 """ 

622 

623 FROM_STATES: set[states.StateType | None] = set() 1a

624 TO_STATES: set[states.StateType | None] = set() 1a

625 

626 def __init__( 1a

627 self, 

628 context: OrchestrationContext[T, RP], 

629 from_state_type: Optional[states.StateType], 

630 to_state_type: Optional[states.StateType], 

631 ): 

632 self.context = context 

633 self.from_state_type = from_state_type 

634 self.to_state_type = to_state_type 

635 self._invalid_on_entry = None 

636 

637 async def __aenter__(self) -> OrchestrationContext[T, RP]: 1a

638 """ 

639 Enter an async runtime context governed by this rule. 

640 

641 The `with` statement will bind a governed `OrchestrationContext` to the target 

642 specified by the `as` clause. If the transition proposed by the 

643 `OrchestrationContext` is considered invalid on entry, entering this context 

644 will do nothing. Otherwise, `self.before_transition` will fire. 

645 """ 

646 if await self.invalid(): 

647 pass 

648 else: 

649 try: 

650 entry_context = self.context.entry_context() 

651 await self.before_transition(*entry_context) 

652 self.context.rule_signature.append(str(self.__class__)) 

653 except Exception as before_transition_error: 

654 reason = ( 

655 f"Aborting orchestration due to error in {self.__class__!r}:" 

656 f" !{before_transition_error!r}" 

657 ) 

658 logger.exception( 

659 f"Error running before-transition hook in rule {self.__class__!r}:" 

660 f" !{before_transition_error!r}" 

661 ) 

662 

663 self.context.proposed_state = None 

664 self.context.response_status = SetStateStatus.ABORT 

665 self.context.response_details = StateAbortDetails(reason=reason) 

666 self.context.orchestration_error = before_transition_error 

667 

668 return self.context 

669 

670 async def __aexit__( 1a

671 self, 

672 exc_type: type[BaseException] | None, 

673 exc_val: BaseException | None, 

674 exc_tb: TracebackType | None, 

675 ) -> None: 

676 """ 

677 Exit the async runtime context governed by this rule. 

678 

679 One of three outcomes can happen upon exiting this rule's context depending on 

680 the state of the rule. If the rule was found to be invalid on entry, nothing 

681 happens. If the rule was valid on entry and continues to be valid on exit, 

682 `self.after_transition` will fire. If the rule was valid on entry but invalid 

683 on exit, the rule will "fizzle" and `self.cleanup` will fire in order to revert 

684 any side-effects produced by `self.before_transition`. 

685 """ 

686 

687 exit_context = self.context.exit_context() 

688 if await self.invalid(): 

689 pass 

690 elif await self.fizzled(): 

691 await self.cleanup(*exit_context) 

692 else: 

693 await self.after_transition(*exit_context) 

694 self.context.finalization_signature.append(str(self.__class__)) 

695 

696 async def before_transition( 1a

697 self, 

698 initial_state: Optional[states.State], 

699 proposed_state: Optional[states.State], 

700 context: OrchestrationContext[T, RP], 

701 ) -> None: 

702 """ 

703 Implements a hook that can fire before a state is committed to the database. 

704 

705 This hook may produce side-effects or mutate the proposed state of a 

706 transition using one of four methods: `self.reject_transition`, 

707 `self.delay_transition`, `self.abort_transition`, and `self.rename_state`. 

708 

709 Note: 

710 As currently implemented, the `before_transition` hook is not 

711 perfectly isolated from mutating the transition. It is a standard instance 

712 method that has access to `self`, and therefore `self.context`. This should 

713 never be modified directly. Furthermore, `context.run` is an ORM model, and 

714 mutating the run can also cause unintended writes to the database. 

715 

716 Args: 

717 initial_state: The initial state of a transition 

718 proposed_state: The proposed state of a transition 

719 context: A safe copy of the `OrchestrationContext`, with the exception of 

720 `context.run`, mutating this context will have no effect on the broader 

721 orchestration environment. 

722 

723 Returns: 

724 None 

725 """ 

726 

727 async def after_transition( 1a

728 self, 

729 initial_state: Optional[states.State], 

730 validated_state: Optional[states.State], 

731 context: OrchestrationContext[T, RP], 

732 ) -> None: 

733 """ 

734 Implements a hook that can fire after a state is committed to the database. 

735 

736 Args: 

737 initial_state: The initial state of a transition 

738 validated_state: The governed state that has been committed to the database 

739 context: A safe copy of the `OrchestrationContext`, with the exception of 

740 `context.run`, mutating this context will have no effect on the broader 

741 orchestration environment. 

742 

743 Returns: 

744 None 

745 """ 

746 

747 async def cleanup( 1a

748 self, 

749 initial_state: Optional[states.State], 

750 validated_state: Optional[states.State], 

751 context: OrchestrationContext[T, RP], 

752 ) -> None: 

753 """ 

754 Implements a hook that can fire after a state is committed to the database. 

755 

756 The intended use of this method is to revert side-effects produced by 

757 `self.before_transition` when the transition is found to be invalid on exit. 

758 This allows multiple rules to be gracefully run in sequence, without logic that 

759 keeps track of all other rules that might govern a transition. 

760 

761 Args: 

762 initial_state: The initial state of a transition 

763 validated_state: The governed state that has been committed to the database 

764 context: A safe copy of the `OrchestrationContext`, with the exception of 

765 `context.run`, mutating this context will have no effect on the broader 

766 orchestration environment. 

767 

768 Returns: 

769 None 

770 """ 

771 

772 async def invalid(self) -> bool: 1a

773 """ 

774 Determines if a rule is invalid. 

775 

776 Invalid rules do nothing and no hooks fire upon entering or exiting a governed 

777 context. Rules are invalid if the transition states types are not contained in 

778 `self.FROM_STATES` and `self.TO_STATES`, or if the context is proposing 

779 a transition that differs from the transition the rule was instantiated with. 

780 

781 Returns: 

782 True if the rules in invalid, False otherwise. 

783 """ 

784 # invalid and fizzled states are mutually exclusive, 

785 # `_invalid_on_entry` holds this statefulness 

786 if self.from_state_type not in self.FROM_STATES: 

787 self._invalid_on_entry = True 

788 if self.to_state_type not in self.TO_STATES: 

789 self._invalid_on_entry = True 

790 

791 if self._invalid_on_entry is None: 

792 self._invalid_on_entry = await self.invalid_transition() 

793 return self._invalid_on_entry 

794 

795 async def fizzled(self) -> bool: 1a

796 """ 

797 Determines if a rule is fizzled and side-effects need to be reverted. 

798 

799 Rules are fizzled if the transitions were valid on entry (thus firing 

800 `self.before_transition`) but are invalid upon exiting the governed context, 

801 most likely caused by another rule mutating the transition. 

802 

803 Returns: 

804 True if the rule is fizzled, False otherwise. 

805 """ 

806 

807 if self._invalid_on_entry: 

808 return False 

809 return await self.invalid_transition() 

810 

811 async def invalid_transition(self) -> bool: 1a

812 """ 

813 Determines if the transition proposed by the `OrchestrationContext` is invalid. 

814 

815 If the `OrchestrationContext` is attempting to manage a transition with this 

816 rule that differs from the transition the rule was instantiated with, the 

817 transition is considered to be invalid. Depending on the context, a rule with an 

818 invalid transition is either "invalid" or "fizzled". 

819 

820 Returns: 

821 True if the transition is invalid, False otherwise. 

822 """ 

823 

824 initial_state_type = self.context.initial_state_type 

825 proposed_state_type = self.context.proposed_state_type 

826 return (self.from_state_type != initial_state_type) or ( 

827 self.to_state_type != proposed_state_type 

828 ) 

829 

830 async def reject_transition( 1a

831 self, state: Optional[states.State], reason: str 

832 ) -> None: 

833 """ 

834 Rejects a proposed transition before the transition is validated. 

835 

836 This method will reject a proposed transition, mutating the proposed state to 

837 the provided `state`. A reason for rejecting the transition is also passed on 

838 to the `OrchestrationContext`. Rules that reject the transition will not fizzle, 

839 despite the proposed state type changing. 

840 

841 Args: 

842 state: The new proposed state. If `None`, the current run state will be 

843 returned in the result instead. 

844 reason: The reason for rejecting the transition 

845 """ 

846 

847 # don't run if the transition is already validated 

848 if self.context.validated_state: 

849 raise RuntimeError("The transition is already validated") 

850 

851 # the current state will be used if a new one is not provided 

852 if state is None: 

853 if self.from_state_type is None: 

854 raise OrchestrationError( 

855 "The current run has no state; this transition cannot be " 

856 "rejected without providing a new state." 

857 ) 

858 self.to_state_type = None 

859 self.context.proposed_state = None 

860 else: 

861 # a rule that mutates state should not fizzle itself 

862 self.to_state_type = state.type 

863 self.context.proposed_state = state 

864 

865 self.context.response_status = SetStateStatus.REJECT 

866 self.context.response_details = StateRejectDetails(reason=reason) 

867 

868 async def delay_transition( 1a

869 self, 

870 delay_seconds: int, 

871 reason: str, 

872 ) -> None: 

873 """ 

874 Delays a proposed transition before the transition is validated. 

875 

876 This method will delay a proposed transition, setting the proposed state to 

877 `None`, signaling to the `OrchestrationContext` that no state should be 

878 written to the database. The number of seconds a transition should be delayed is 

879 passed to the `OrchestrationContext`. A reason for delaying the transition is 

880 also provided. Rules that delay the transition will not fizzle, despite the 

881 proposed state type changing. 

882 

883 Args: 

884 delay_seconds: The number of seconds the transition should be delayed 

885 reason: The reason for delaying the transition 

886 """ 

887 

888 # don't run if the transition is already validated 

889 if self.context.validated_state: 

890 raise RuntimeError("The transition is already validated") 

891 

892 # a rule that mutates state should not fizzle itself 

893 self.to_state_type = None 

894 self.context.proposed_state = None 

895 self.context.response_status = SetStateStatus.WAIT 

896 self.context.response_details = StateWaitDetails( 

897 delay_seconds=delay_seconds, reason=reason 

898 ) 

899 

900 async def abort_transition(self, reason: str) -> None: 1a

901 """ 

902 Aborts a proposed transition before the transition is validated. 

903 

904 This method will abort a proposed transition, expecting no further action to 

905 occur for this run. The proposed state is set to `None`, signaling to the 

906 `OrchestrationContext` that no state should be written to the database. A 

907 reason for aborting the transition is also provided. Rules that abort the 

908 transition will not fizzle, despite the proposed state type changing. 

909 

910 Args: 

911 reason: The reason for aborting the transition 

912 """ 

913 

914 # don't run if the transition is already validated 

915 if self.context.validated_state: 

916 raise RuntimeError("The transition is already validated") 

917 

918 # a rule that mutates state should not fizzle itself 

919 self.to_state_type = None 

920 self.context.proposed_state = None 

921 self.context.response_status = SetStateStatus.ABORT 

922 self.context.response_details = StateAbortDetails(reason=reason) 

923 

924 async def rename_state(self, state_name: str) -> None: 1a

925 """ 

926 Sets the "name" attribute on a proposed state. 

927 

928 The name of a state is an annotation intended to provide rich, human-readable 

929 context for how a run is progressing. This method only updates the name and not 

930 the canonical state TYPE, and will not fizzle or invalidate any other rules 

931 that might govern this state transition. 

932 """ 

933 if self.context.proposed_state is not None: 

934 self.context.proposed_state.name = state_name 

935 

936 async def update_context_parameters(self, key: str, value: Any) -> None: 1a

937 """ 

938 Updates the "parameters" dictionary attribute with the specified key-value pair. 

939 

940 This mechanism streamlines the process of passing messages and information 

941 between orchestration rules if necessary and is simpler and more ephemeral than 

942 message-passing via the database or some other side-effect. This mechanism can 

943 be used to break up large rules for ease of testing or comprehension, but note 

944 that any rules coupled this way (or any other way) are no longer independent and 

945 the order in which they appear in the orchestration policy priority will matter. 

946 """ 

947 

948 self.context.parameters.update({key: value}) 

949 

950 

951class FlowRunOrchestrationRule( 1a

952 BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy] 

953): 

954 pass 1a

955 

956 

957class TaskRunOrchestrationRule( 1a

958 BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy] 

959): 

960 pass 1a

961 

962 

963class GenericOrchestrationRule( 1a

964 BaseOrchestrationRule[orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]] 

965): 

966 pass 1a

967 

968 

969class BaseUniversalTransform( 1a

970 contextlib.AbstractAsyncContextManager[OrchestrationContext[T, RP]] 

971): 

972 """ 

973 An abstract base class used to implement privileged bookkeeping logic. 

974 

975 Warning: 

976 In almost all cases, use the `BaseOrchestrationRule` base class instead. 

977 

978 Beyond the orchestration rules implemented with the `BaseOrchestrationRule` ABC, 

979 Universal transforms are not stateful, and fire their before- and after- transition 

980 hooks on every state transition unless the proposed state is `None`, indicating that 

981 no state should be written to the database. Because there are no guardrails in place 

982 to prevent directly mutating state or other parts of the orchestration context, 

983 universal transforms should only be used with care. 

984 

985 Attributes: 

986 FROM_STATES: for compatibility with `BaseOrchestrationPolicy` 

987 TO_STATES: for compatibility with `BaseOrchestrationPolicy` 

988 context: the orchestration context 

989 from_state_type: the state type a run is currently in 

990 to_state_type: the intended proposed state type prior to any orchestration 

991 

992 Args: 

993 context: A `FlowOrchestrationContext` or `TaskOrchestrationContext` that is 

994 passed between transforms 

995 """ 

996 

997 # `BaseUniversalTransform` will always fire on non-null transitions 

998 FROM_STATES: Iterable[states.StateType | None] = ALL_ORCHESTRATION_STATES 1a

999 TO_STATES: Iterable[states.StateType | None] = ALL_ORCHESTRATION_STATES 1a

1000 

1001 def __init__( 1a

1002 self, 

1003 context: OrchestrationContext[T, RP], 

1004 from_state_type: Optional[states.StateType], 

1005 to_state_type: Optional[states.StateType], 

1006 ): 

1007 self.context = context 

1008 self.from_state_type = from_state_type 

1009 self.to_state_type = to_state_type 

1010 

1011 async def __aenter__(self) -> OrchestrationContext[T, RP]: 1a

1012 """ 

1013 Enter an async runtime context governed by this transform. 

1014 

1015 The `with` statement will bind a governed `OrchestrationContext` to the target 

1016 specified by the `as` clause. If the transition proposed by the 

1017 `OrchestrationContext` has been nullified on entry and `context.proposed_state` 

1018 is `None`, entering this context will do nothing. Otherwise 

1019 `self.before_transition` will fire. 

1020 """ 

1021 

1022 await self.before_transition(self.context) 

1023 self.context.rule_signature.append(str(self.__class__)) 

1024 return self.context 

1025 

1026 async def __aexit__( 1a

1027 self, 

1028 exc_type: type[BaseException] | None, 

1029 exc_val: BaseException | None, 

1030 exc_tb: TracebackType | None, 

1031 ) -> None: 

1032 """ 

1033 Exit the async runtime context governed by this transform. 

1034 

1035 If the transition has been nullified or errorred upon exiting this transforms's context, 

1036 nothing happens. Otherwise, `self.after_transition` will fire on every non-null 

1037 proposed state. 

1038 """ 

1039 

1040 if not self.exception_in_transition(): 

1041 await self.after_transition(self.context) 

1042 self.context.finalization_signature.append(str(self.__class__)) 

1043 

1044 async def before_transition(self, context: OrchestrationContext[T, RP]) -> None: 1a

1045 """ 

1046 Implements a hook that fires before a state is committed to the database. 

1047 

1048 Args: 

1049 context: the `OrchestrationContext` that contains transition details 

1050 

1051 Returns: 

1052 None 

1053 """ 

1054 

1055 async def after_transition(self, context: OrchestrationContext[T, RP]) -> None: 1a

1056 """ 

1057 Implements a hook that can fire after a state is committed to the database. 

1058 

1059 Args: 

1060 context: the `OrchestrationContext` that contains transition details 

1061 

1062 Returns: 

1063 None 

1064 """ 

1065 

1066 def nullified_transition(self) -> bool: 1a

1067 """ 

1068 Determines if the transition has been nullified. 

1069 

1070 Transitions are nullified if the proposed state is `None`, indicating that 

1071 nothing should be written to the database. 

1072 

1073 Returns: 

1074 True if the transition is nullified, False otherwise. 

1075 """ 

1076 

1077 return self.context.proposed_state is None 

1078 

1079 def exception_in_transition(self) -> bool: 1a

1080 """ 

1081 Determines if the transition has encountered an exception. 

1082 

1083 Returns: 

1084 True if the transition is encountered an exception, False otherwise. 

1085 """ 

1086 

1087 return self.context.orchestration_error is not None 

1088 

1089 

1090class TaskRunUniversalTransform( 1a

1091 BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy] 

1092): 

1093 pass 1a

1094 

1095 

1096class FlowRunUniversalTransform( 1a

1097 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

1098): 

1099 pass 1a

1100 

1101 

1102class GenericUniversalTransform( 1a

1103 BaseUniversalTransform[ 

1104 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

1105 ] 

1106): 

1107 pass 1a

1108 

1109 

1110GenericOrchestrationContext = OrchestrationContext[ 1a

1111 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

1112]