Coverage for /usr/local/lib/python3.12/site-packages/prefect/flow_engine.py: 14%

731 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import asyncio 1a

4import logging 1a

5import multiprocessing 1a

6import multiprocessing.context 1a

7import os 1a

8import time 1a

9from contextlib import ( 1a

10 AsyncExitStack, 

11 ExitStack, 

12 asynccontextmanager, 

13 contextmanager, 

14 nullcontext, 

15) 

16from dataclasses import dataclass, field 1a

17from functools import wraps 1a

18from typing import ( 1a

19 Any, 

20 AsyncGenerator, 

21 Coroutine, 

22 Dict, 

23 Generator, 

24 Generic, 

25 Iterable, 

26 Literal, 

27 Optional, 

28 Type, 

29 TypeVar, 

30 Union, 

31 cast, 

32) 

33from uuid import UUID 1a

34 

35from anyio import CancelScope 1a

36from opentelemetry import propagate, trace 1a

37from typing_extensions import ParamSpec 1a

38 

39from prefect import Task 1a

40from prefect.client.orchestration import PrefectClient, SyncPrefectClient, get_client 1a

41from prefect.client.schemas import FlowRun, TaskRun 1a

42from prefect.client.schemas.filters import FlowRunFilter 1a

43from prefect.client.schemas.sorting import FlowRunSort 1a

44from prefect.concurrency._leases import ( 1a

45 amaintain_concurrency_lease, 

46 maintain_concurrency_lease, 

47) 

48from prefect.concurrency.context import ConcurrencyContext 1a

49from prefect.concurrency.v1.context import ConcurrencyContext as ConcurrencyContextV1 1a

50from prefect.context import ( 1a

51 AsyncClientContext, 

52 FlowRunContext, 

53 SettingsContext, 

54 SyncClientContext, 

55 TagsContext, 

56 _deployment_id, 

57 _deployment_parameters, 

58 get_settings_context, 

59 hydrated_context, 

60 serialize_context, 

61) 

62from prefect.engine import handle_engine_signals 1a

63from prefect.exceptions import ( 1a

64 Abort, 

65 MissingFlowError, 

66 Pause, 

67 PrefectException, 

68 TerminationSignal, 

69 UpstreamTaskError, 

70) 

71from prefect.flows import ( 1a

72 Flow, 

73 load_flow_from_entrypoint, 

74 load_flow_from_flow_run, 

75 load_function_and_convert_to_flow, 

76) 

77from prefect.futures import PrefectFuture, resolve_futures_to_states 1a

78from prefect.logging.loggers import ( 1a

79 flow_run_logger, 

80 get_logger, 

81 get_run_logger, 

82 patch_print, 

83) 

84from prefect.results import ( 1a

85 ResultStore, 

86 get_result_store, 

87 should_persist_result, 

88) 

89from prefect.settings import PREFECT_DEBUG_MODE 1a

90from prefect.settings.context import get_current_settings 1a

91from prefect.settings.models.root import Settings 1a

92from prefect.states import ( 1a

93 Failed, 

94 Pending, 

95 Running, 

96 State, 

97 exception_to_crashed_state, 

98 exception_to_failed_state, 

99 return_value_to_state, 

100) 

101from prefect.telemetry.run_telemetry import ( 1a

102 LABELS_TRACEPARENT_KEY, 

103 TRACEPARENT_KEY, 

104 OTELSetter, 

105 RunTelemetry, 

106) 

107from prefect.types import KeyValueLabels 1a

108from prefect.utilities._engine import get_hook_name, resolve_custom_flow_run_name 1a

109from prefect.utilities.annotations import NotSet 1a

110from prefect.utilities.asyncutils import run_coro_as_sync 1a

111from prefect.utilities.callables import ( 1a

112 call_with_parameters, 

113 cloudpickle_wrapped_call, 

114 get_call_parameters, 

115 parameters_to_args_kwargs, 

116) 

117from prefect.utilities.collections import visit_collection 1a

118from prefect.utilities.engine import ( 1a

119 capture_sigterm, 

120 link_state_to_flow_run_result, 

121 propose_state, 

122 propose_state_sync, 

123 resolve_to_final_result, 

124) 

125from prefect.utilities.timeout import timeout, timeout_async 1a

126from prefect.utilities.urls import url_for 1a

127 

128P = ParamSpec("P") 1a

129R = TypeVar("R") 1a

130 

131 

132class FlowRunTimeoutError(TimeoutError): 1a

133 """Raised when a flow run exceeds its defined timeout.""" 

134 

135 

136def load_flow_run(flow_run_id: UUID) -> FlowRun: 1a

137 client = get_client(sync_client=True) 

138 flow_run = client.read_flow_run(flow_run_id) 

139 return flow_run 

140 

141 

142def load_flow(flow_run: FlowRun) -> Flow[..., Any]: 1a

143 entrypoint = os.environ.get("PREFECT__FLOW_ENTRYPOINT") 

144 

145 if entrypoint: 

146 # we should not accept a placeholder flow at runtime 

147 try: 

148 flow = load_flow_from_entrypoint(entrypoint, use_placeholder_flow=False) 

149 except MissingFlowError: 

150 flow = load_function_and_convert_to_flow(entrypoint) 

151 else: 

152 flow = run_coro_as_sync( 

153 load_flow_from_flow_run(flow_run, use_placeholder_flow=False) 

154 ) 

155 return flow 

156 

157 

158def load_flow_and_flow_run(flow_run_id: UUID) -> tuple[FlowRun, Flow[..., Any]]: 1a

159 flow_run = load_flow_run(flow_run_id) 

160 flow = load_flow(flow_run) 

161 return flow_run, flow 

162 

163 

164@dataclass 1a

165class BaseFlowRunEngine(Generic[P, R]): 1a

166 flow: Union[Flow[P, R], Flow[P, Coroutine[Any, Any, R]]] 1a

167 parameters: Optional[Dict[str, Any]] = None 1a

168 flow_run: Optional[FlowRun] = None 1a

169 flow_run_id: Optional[UUID] = None 1a

170 logger: logging.Logger = field(default_factory=lambda: get_logger("engine")) 1a

171 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None 1a

172 context: Optional[dict[str, Any]] = None 1a

173 # holds the return value from the user code 

174 _return_value: Union[R, Type[NotSet]] = NotSet 1a

175 # holds the exception raised by the user code, if any 

176 _raised: Union[Exception, Type[NotSet]] = NotSet 1a

177 _is_started: bool = False 1a

178 short_circuit: bool = False 1a

179 _flow_run_name_set: bool = False 1a

180 _telemetry: RunTelemetry = field(default_factory=RunTelemetry) 1a

181 

182 def __post_init__(self) -> None: 1a

183 if self.flow is None and self.flow_run_id is None: 

184 raise ValueError("Either a flow or a flow_run_id must be provided.") 

185 

186 if self.parameters is None: 

187 self.parameters = {} 

188 

189 @property 1a

190 def state(self) -> State: 1a

191 return self.flow_run.state # type: ignore 

192 

193 def is_running(self) -> bool: 1a

194 if getattr(self, "flow_run", None) is None: 

195 return False 

196 return getattr(self, "flow_run").state.is_running() 

197 

198 def is_pending(self) -> bool: 1a

199 if getattr(self, "flow_run", None) is None: 

200 return False # TODO: handle this differently? 

201 return getattr(self, "flow_run").state.is_pending() 

202 

203 def cancel_all_tasks(self) -> None: 1a

204 if hasattr(self.flow.task_runner, "cancel_all"): 

205 self.flow.task_runner.cancel_all() # type: ignore 

206 

207 def _update_otel_labels( 1a

208 self, span: trace.Span, client: Union[SyncPrefectClient, PrefectClient] 

209 ): 

210 parent_flow_run_ctx = FlowRunContext.get() 

211 

212 if parent_flow_run_ctx and parent_flow_run_ctx.flow_run: 

213 if traceparent := parent_flow_run_ctx.flow_run.labels.get( 

214 LABELS_TRACEPARENT_KEY 

215 ): 

216 carrier: KeyValueLabels = {TRACEPARENT_KEY: traceparent} 

217 propagate.get_global_textmap().inject( 

218 carrier={TRACEPARENT_KEY: traceparent}, 

219 setter=OTELSetter(), 

220 ) 

221 

222 else: 

223 carrier: KeyValueLabels = {} 

224 propagate.get_global_textmap().inject( 

225 carrier, 

226 context=trace.set_span_in_context(span), 

227 setter=OTELSetter(), 

228 ) 

229 if carrier.get(TRACEPARENT_KEY): 

230 if self.flow_run: 

231 client.update_flow_run_labels( 

232 flow_run_id=self.flow_run.id, 

233 labels={LABELS_TRACEPARENT_KEY: carrier[TRACEPARENT_KEY]}, 

234 ) 

235 else: 

236 self.logger.info( 

237 f"Tried to set traceparent {carrier[TRACEPARENT_KEY]} for flow run, but None was found" 

238 ) 

239 

240 

241@dataclass 1a

242class FlowRunEngine(BaseFlowRunEngine[P, R]): 1a

243 _client: Optional[SyncPrefectClient] = None 1a

244 flow_run: FlowRun | None = None 1a

245 parameters: dict[str, Any] | None = None 1a

246 

247 @property 1a

248 def client(self) -> SyncPrefectClient: 1a

249 if not self._is_started or self._client is None: 

250 raise RuntimeError("Engine has not started.") 

251 return self._client 

252 

253 def _resolve_parameters(self): 1a

254 if not self.parameters: 

255 return 

256 

257 resolved_parameters = {} 

258 for parameter, value in self.parameters.items(): 

259 try: 

260 resolved_parameters[parameter] = visit_collection( 

261 value, 

262 visit_fn=resolve_to_final_result, 

263 return_data=True, 

264 max_depth=-1, 

265 remove_annotations=True, 

266 context={"parameter_name": parameter}, 

267 ) 

268 except UpstreamTaskError: 

269 raise 

270 except Exception as exc: 

271 raise PrefectException( 

272 f"Failed to resolve inputs in parameter {parameter!r}. If your" 

273 " parameter type is not supported, consider using the `quote`" 

274 " annotation to skip resolution of inputs." 

275 ) from exc 

276 

277 self.parameters = resolved_parameters 

278 

279 def _wait_for_dependencies(self): 1a

280 if not self.wait_for: 

281 return 

282 

283 visit_collection( 

284 self.wait_for, 

285 visit_fn=resolve_to_final_result, 

286 return_data=False, 

287 max_depth=-1, 

288 remove_annotations=True, 

289 context={}, 

290 ) 

291 

292 def begin_run(self) -> State: 1a

293 try: 

294 self._resolve_parameters() 

295 self._wait_for_dependencies() 

296 except UpstreamTaskError as upstream_exc: 

297 state = self.set_state( 

298 Pending( 

299 name="NotReady", 

300 message=str(upstream_exc), 

301 ), 

302 # if orchestrating a run already in a pending state, force orchestration to 

303 # update the state name 

304 force=self.state.is_pending(), 

305 ) 

306 return state 

307 

308 # validate prior to context so that context receives validated params 

309 if self.flow.should_validate_parameters: 

310 try: 

311 self.parameters = self.flow.validate_parameters(self.parameters or {}) 

312 except Exception as exc: 

313 message = "Validation of flow parameters failed with error:" 

314 self.logger.error("%s %s", message, exc) 

315 self.handle_exception( 

316 exc, 

317 msg=message, 

318 result_store=get_result_store().update_for_flow( 

319 self.flow, _sync=True 

320 ), 

321 ) 

322 self.short_circuit = True 

323 

324 new_state = Running() 

325 state = self.set_state(new_state) 

326 while state.is_pending(): 

327 time.sleep(0.2) 

328 state = self.set_state(new_state) 

329 return state 

330 

331 def set_state(self, state: State, force: bool = False) -> State: 1a

332 """ """ 

333 # prevents any state-setting activity 

334 if self.short_circuit: 

335 return self.state 

336 

337 state = propose_state_sync( 

338 self.client, state, flow_run_id=self.flow_run.id, force=force 

339 ) # type: ignore 

340 self.flow_run.state = state # type: ignore 

341 self.flow_run.state_name = state.name # type: ignore 

342 self.flow_run.state_type = state.type # type: ignore 

343 

344 self._telemetry.update_state(state) 

345 self.call_hooks(state) 

346 return state 

347 

348 def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": 1a

349 if self._return_value is not NotSet and not isinstance( 

350 self._return_value, State 

351 ): 

352 _result = self._return_value 

353 link_state_to_flow_run_result(self.state, _result) 

354 

355 if asyncio.iscoroutine(_result): 

356 # getting the value for a BaseResult may return an awaitable 

357 # depending on whether the parent frame is sync or not 

358 _result = run_coro_as_sync(_result) 

359 return _result 

360 

361 if self._raised is not NotSet: 

362 if raise_on_failure: 

363 raise self._raised 

364 return self._raised 

365 

366 # This is a fall through case which leans on the existing state result mechanics to get the 

367 # return value. This is necessary because we currently will return a State object if the 

368 # the State was Prefect-created. 

369 # TODO: Remove the need to get the result from a State except in cases where the return value 

370 # is a State object. 

371 _result = self.state.result(raise_on_failure=raise_on_failure, _sync=True) # type: ignore 

372 return _result 

373 

374 def handle_success(self, result: R) -> R: 1a

375 result_store = getattr(FlowRunContext.get(), "result_store", None) 

376 if result_store is None: 

377 raise ValueError("Result store is not set") 

378 resolved_result = resolve_futures_to_states(result) 

379 terminal_state = run_coro_as_sync( 

380 return_value_to_state( 

381 resolved_result, 

382 result_store=result_store, 

383 write_result=should_persist_result(), 

384 ) 

385 ) 

386 self.set_state(terminal_state) 

387 self._return_value = resolved_result 

388 

389 link_state_to_flow_run_result(terminal_state, resolved_result) 

390 self._telemetry.end_span_on_success() 

391 

392 return result 

393 

394 def handle_exception( 1a

395 self, 

396 exc: Exception, 

397 msg: Optional[str] = None, 

398 result_store: Optional[ResultStore] = None, 

399 ) -> State: 

400 context = FlowRunContext.get() 

401 terminal_state = cast( 

402 State, 

403 run_coro_as_sync( 

404 exception_to_failed_state( 

405 exc, 

406 message=msg or "Flow run encountered an exception:", 

407 result_store=result_store or getattr(context, "result_store", None), 

408 write_result=True, 

409 ) 

410 ), 

411 ) 

412 state = self.set_state(terminal_state) 

413 if self.state.is_scheduled(): 

414 self.logger.info( 

415 ( 

416 f"Received non-final state {state.name!r} when proposing final" 

417 f" state {terminal_state.name!r} and will attempt to run again..." 

418 ), 

419 ) 

420 state = self.set_state(Running()) 

421 self._raised = exc 

422 self._telemetry.record_exception(exc) 

423 self._telemetry.end_span_on_failure(state.message) 

424 

425 return state 

426 

427 def handle_timeout(self, exc: TimeoutError) -> None: 1a

428 if isinstance(exc, FlowRunTimeoutError): 

429 message = ( 

430 f"Flow run exceeded timeout of {self.flow.timeout_seconds} second(s)" 

431 ) 

432 else: 

433 message = f"Flow run failed due to timeout: {exc!r}" 

434 self.logger.error(message) 

435 state = Failed( 

436 data=exc, 

437 message=message, 

438 name="TimedOut", 

439 ) 

440 self.set_state(state) 

441 self._raised = exc 

442 self._telemetry.record_exception(exc) 

443 self._telemetry.end_span_on_failure(message) 

444 

445 def handle_crash(self, exc: BaseException) -> None: 1a

446 state = run_coro_as_sync(exception_to_crashed_state(exc)) 

447 self.logger.error(f"Crash detected! {state.message}") 

448 self.logger.debug("Crash details:", exc_info=exc) 

449 self.set_state(state, force=True) 

450 self._raised = exc 

451 self._telemetry.record_exception(exc) 

452 self._telemetry.end_span_on_failure(state.message if state else None) 

453 

454 def load_subflow_run( 1a

455 self, 

456 parent_task_run: TaskRun, 

457 client: SyncPrefectClient, 

458 context: FlowRunContext, 

459 ) -> Union[FlowRun, None]: 

460 """ 

461 This method attempts to load an existing flow run for a subflow task 

462 run, if appropriate. 

463 

464 If the parent task run is in a final but not COMPLETED state, and not 

465 being rerun, then we attempt to load an existing flow run instead of 

466 creating a new one. This will prevent the engine from running the 

467 subflow again. 

468 

469 If no existing flow run is found, or if the subflow should be rerun, 

470 then no flow run is returned. 

471 """ 

472 

473 # check if the parent flow run is rerunning 

474 rerunning = ( 

475 context.flow_run.run_count > 1 

476 if getattr(context, "flow_run", None) 

477 and isinstance(context.flow_run, FlowRun) 

478 else False 

479 ) 

480 

481 # if the parent task run is in a final but not completed state, and 

482 # not rerunning, then retrieve the most recent flow run instead of 

483 # creating a new one. This effectively loads a cached flow run for 

484 # situations where we are confident the flow should not be run 

485 # again. 

486 assert isinstance(parent_task_run.state, State) 

487 if parent_task_run.state.is_final() and not ( 

488 rerunning and not parent_task_run.state.is_completed() 

489 ): 

490 # return the most recent flow run, if it exists 

491 flow_runs = client.read_flow_runs( 

492 flow_run_filter=FlowRunFilter( 

493 parent_task_run_id={"any_": [parent_task_run.id]} 

494 ), 

495 sort=FlowRunSort.EXPECTED_START_TIME_ASC, 

496 limit=1, 

497 ) 

498 if flow_runs: 

499 loaded_flow_run = flow_runs[-1] 

500 self._return_value = loaded_flow_run.state 

501 return loaded_flow_run 

502 

503 def create_flow_run(self, client: SyncPrefectClient) -> FlowRun: 1a

504 flow_run_ctx = FlowRunContext.get() 

505 parameters = self.parameters or {} 

506 

507 parent_task_run = None 

508 

509 # this is a subflow run 

510 if flow_run_ctx: 

511 # add a task to a parent flow run that represents the execution of a subflow run 

512 parent_task = Task( 

513 name=self.flow.name, fn=self.flow.fn, version=self.flow.version 

514 ) 

515 

516 parent_task_run = run_coro_as_sync( 

517 parent_task.create_run( 

518 flow_run_context=flow_run_ctx, 

519 parameters=self.parameters, 

520 wait_for=self.wait_for, 

521 ) 

522 ) 

523 

524 # check if there is already a flow run for this subflow 

525 if subflow_run := self.load_subflow_run( 

526 parent_task_run=parent_task_run, client=client, context=flow_run_ctx 

527 ): 

528 return subflow_run 

529 

530 return client.create_flow_run( 

531 flow=self.flow, 

532 parameters=self.flow.serialize_parameters(parameters), 

533 state=Pending(), 

534 parent_task_run_id=getattr(parent_task_run, "id", None), 

535 tags=TagsContext.get().current_tags, 

536 ) 

537 

538 def call_hooks(self, state: Optional[State] = None) -> None: 1a

539 if state is None: 

540 state = self.state 

541 flow = self.flow 

542 flow_run = self.flow_run 

543 

544 if not flow_run: 

545 raise ValueError("Flow run is not set") 

546 

547 enable_cancellation_and_crashed_hooks = ( 

548 os.environ.get( 

549 "PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS", "true" 

550 ).lower() 

551 == "true" 

552 ) 

553 

554 if state.is_failed() and flow.on_failure_hooks: 

555 hooks = flow.on_failure_hooks 

556 elif state.is_completed() and flow.on_completion_hooks: 

557 hooks = flow.on_completion_hooks 

558 elif ( 

559 enable_cancellation_and_crashed_hooks 

560 and state.is_cancelling() 

561 and flow.on_cancellation_hooks 

562 ): 

563 hooks = flow.on_cancellation_hooks 

564 elif ( 

565 enable_cancellation_and_crashed_hooks 

566 and state.is_crashed() 

567 and flow.on_crashed_hooks 

568 ): 

569 hooks = flow.on_crashed_hooks 

570 elif state.is_running() and flow.on_running_hooks: 

571 hooks = flow.on_running_hooks 

572 else: 

573 hooks = None 

574 

575 for hook in hooks or []: 

576 hook_name = get_hook_name(hook) 

577 

578 try: 

579 self.logger.info( 

580 f"Running hook {hook_name!r} in response to entering state" 

581 f" {state.name!r}" 

582 ) 

583 result = hook(flow, flow_run, state) 

584 if asyncio.iscoroutine(result): 

585 run_coro_as_sync(result) 

586 except Exception: 

587 self.logger.error( 

588 f"An error was encountered while running hook {hook_name!r}", 

589 exc_info=True, 

590 ) 

591 else: 

592 self.logger.info(f"Hook {hook_name!r} finished running successfully") 

593 

594 @contextmanager 1a

595 def setup_run_context(self, client: Optional[SyncPrefectClient] = None): 1a

596 from prefect.utilities.engine import ( 

597 should_log_prints, 

598 ) 

599 

600 if client is None: 

601 client = self.client 

602 if not self.flow_run: 

603 raise ValueError("Flow run not set") 

604 

605 self.flow_run = client.read_flow_run(self.flow_run.id) 

606 log_prints = should_log_prints(self.flow) 

607 

608 with ExitStack() as stack: 

609 # TODO: Explore closing task runner before completing the flow to 

610 # wait for futures to complete 

611 stack.enter_context(capture_sigterm()) 

612 if log_prints: 

613 stack.enter_context(patch_print()) 

614 task_runner = stack.enter_context(self.flow.task_runner.duplicate()) 

615 stack.enter_context( 

616 FlowRunContext( 

617 flow=self.flow, 

618 log_prints=log_prints, 

619 flow_run=self.flow_run, 

620 parameters=self.parameters, 

621 client=client, 

622 result_store=get_result_store().update_for_flow( 

623 self.flow, _sync=True 

624 ), 

625 task_runner=task_runner, 

626 persist_result=self.flow.persist_result 

627 if self.flow.persist_result is not None 

628 else should_persist_result(), 

629 ) 

630 ) 

631 # Set deployment context vars only if this is the top-level deployment run 

632 # (nested flows will inherit via ContextVar propagation) 

633 if self.flow_run.deployment_id and not _deployment_id.get(): 

634 id_token = _deployment_id.set(self.flow_run.deployment_id) 

635 params_token = _deployment_parameters.set(self.flow_run.parameters) 

636 stack.callback(_deployment_id.reset, id_token) 

637 stack.callback(_deployment_parameters.reset, params_token) 

638 stack.enter_context(ConcurrencyContextV1()) 

639 stack.enter_context(ConcurrencyContext()) 

640 if lease_id := self.state.state_details.deployment_concurrency_lease_id: 

641 stack.enter_context( 

642 maintain_concurrency_lease( 

643 lease_id, 300, raise_on_lease_renewal_failure=True 

644 ) 

645 ) 

646 

647 # set the logger to the flow run logger 

648 

649 self.logger: "logging.Logger" = flow_run_logger( 

650 flow_run=self.flow_run, flow=self.flow 

651 ) # type: ignore 

652 

653 # update the flow run name if necessary 

654 if not self._flow_run_name_set and self.flow.flow_run_name: 

655 flow_run_name = resolve_custom_flow_run_name( 

656 flow=self.flow, parameters=self.parameters 

657 ) 

658 self.client.set_flow_run_name( 

659 flow_run_id=self.flow_run.id, name=flow_run_name 

660 ) 

661 

662 self.logger.extra["flow_run_name"] = flow_run_name 

663 self.logger.debug( 

664 f"Renamed flow run {self.flow_run.name!r} to {flow_run_name!r}" 

665 ) 

666 self.flow_run.name = flow_run_name 

667 self._flow_run_name_set = True 

668 

669 self._telemetry.update_run_name(name=flow_run_name) 

670 

671 if self.flow_run.parent_task_run_id: 

672 _logger = get_run_logger(FlowRunContext.get()) 

673 run_type = "subflow" 

674 else: 

675 _logger = self.logger 

676 run_type = "flow" 

677 

678 _logger.info( 

679 f"Beginning {run_type} run {self.flow_run.name!r} for flow {self.flow.name!r}" 

680 ) 

681 

682 if flow_run_url := url_for(self.flow_run): 

683 self.logger.info( 

684 f"View at {flow_run_url}", extra={"send_to_api": False} 

685 ) 

686 

687 yield 

688 

689 @contextmanager 1a

690 def initialize_run(self): 1a

691 """ 

692 Enters a client context and creates a flow run if needed. 

693 """ 

694 with hydrated_context(self.context): 

695 with SyncClientContext.get_or_create() as client_ctx: 

696 self._client = client_ctx.client 

697 self._is_started = True 

698 

699 if not self.flow_run: 

700 self.flow_run = self.create_flow_run(self.client) 

701 else: 

702 # Update the empirical policy to match the flow if it is not set 

703 if self.flow_run.empirical_policy.retry_delay is None: 

704 self.flow_run.empirical_policy.retry_delay = ( 

705 self.flow.retry_delay_seconds 

706 ) 

707 

708 if self.flow_run.empirical_policy.retries is None: 

709 self.flow_run.empirical_policy.retries = self.flow.retries 

710 

711 self.client.update_flow_run( 

712 flow_run_id=self.flow_run.id, 

713 flow_version=self.flow.version, 

714 empirical_policy=self.flow_run.empirical_policy, 

715 ) 

716 

717 self._telemetry.start_span( 

718 run=self.flow_run, 

719 client=self.client, 

720 parameters=self.parameters, 

721 ) 

722 

723 try: 

724 yield self 

725 

726 except TerminationSignal as exc: 

727 self.cancel_all_tasks() 

728 self.handle_crash(exc) 

729 raise 

730 except Exception: 

731 # regular exceptions are caught and re-raised to the user 

732 raise 

733 except (Abort, Pause) as exc: 

734 if getattr(exc, "state", None): 

735 # we set attribute explicitly because 

736 # internals will have already called the state change API 

737 self.flow_run.state = exc.state 

738 raise 

739 except GeneratorExit: 

740 # Do not capture generator exits as crashes 

741 raise 

742 except BaseException as exc: 

743 # We don't want to crash a flow run if the user code finished executing 

744 if self.flow_run.state and not self.flow_run.state.is_final(): 

745 # BaseExceptions are caught and handled as crashes 

746 self.handle_crash(exc) 

747 raise 

748 else: 

749 self.logger.debug( 

750 "BaseException was raised after user code finished executing", 

751 exc_info=exc, 

752 ) 

753 finally: 

754 # If debugging, use the more complete `repr` than the usual `str` description 

755 display_state = ( 

756 repr(self.state) if PREFECT_DEBUG_MODE else str(self.state) 

757 ) 

758 self.logger.log( 

759 level=logging.INFO, 

760 msg=f"Finished in state {display_state}", 

761 ) 

762 

763 self._is_started = False 

764 self._client = None 

765 

766 # -------------------------- 

767 # 

768 # The following methods compose the main task run loop 

769 # 

770 # -------------------------- 

771 

772 @contextmanager 1a

773 def start(self) -> Generator[None, None, None]: 1a

774 with self.initialize_run(): 

775 with ( 

776 trace.use_span(self._telemetry.span) 

777 if self._telemetry.span 

778 else nullcontext() 

779 ): 

780 self.begin_run() 

781 

782 yield 

783 

784 @contextmanager 1a

785 def run_context(self): 1a

786 timeout_context = timeout_async if self.flow.isasync else timeout 

787 # reenter the run context to ensure it is up to date for every run 

788 with self.setup_run_context(): 

789 try: 

790 with timeout_context( 

791 seconds=self.flow.timeout_seconds, 

792 timeout_exc_type=FlowRunTimeoutError, 

793 ): 

794 self.logger.debug( 

795 f"Executing flow {self.flow.name!r} for flow run {self.flow_run.name!r}..." 

796 ) 

797 yield self 

798 except TimeoutError as exc: 

799 self.handle_timeout(exc) 

800 except Exception as exc: 

801 self.logger.exception("Encountered exception during execution: %r", exc) 

802 self.handle_exception(exc) 

803 

804 def call_flow_fn(self) -> Union[R, Coroutine[Any, Any, R]]: 1a

805 """ 

806 Convenience method to call the flow function. Returns a coroutine if the 

807 flow is async. 

808 """ 

809 if self.flow.isasync: 

810 

811 async def _call_flow_fn(): 

812 result = await call_with_parameters(self.flow.fn, self.parameters) 

813 self.handle_success(result) 

814 

815 return _call_flow_fn() 

816 else: 

817 result = call_with_parameters(self.flow.fn, self.parameters) 

818 self.handle_success(result) 

819 

820 

821@dataclass 1a

822class AsyncFlowRunEngine(BaseFlowRunEngine[P, R]): 1a

823 """ 

824 Async version of the flow run engine. 

825 

826 NOTE: This has not been fully asyncified yet which may lead to async flows 

827 not being fully asyncified. 

828 """ 

829 

830 _client: Optional[PrefectClient] = None 1a

831 parameters: dict[str, Any] | None = None 1a

832 flow_run: FlowRun | None = None 1a

833 

834 @property 1a

835 def client(self) -> PrefectClient: 1a

836 if not self._is_started or self._client is None: 

837 raise RuntimeError("Engine has not started.") 

838 return self._client 

839 

840 def _resolve_parameters(self): 1a

841 if not self.parameters: 

842 return 

843 

844 resolved_parameters = {} 

845 for parameter, value in self.parameters.items(): 

846 try: 

847 resolved_parameters[parameter] = visit_collection( 

848 value, 

849 visit_fn=resolve_to_final_result, 

850 return_data=True, 

851 max_depth=-1, 

852 remove_annotations=True, 

853 context={"parameter_name": parameter}, 

854 ) 

855 except UpstreamTaskError: 

856 raise 

857 except Exception as exc: 

858 raise PrefectException( 

859 f"Failed to resolve inputs in parameter {parameter!r}. If your" 

860 " parameter type is not supported, consider using the `quote`" 

861 " annotation to skip resolution of inputs." 

862 ) from exc 

863 

864 self.parameters = resolved_parameters 

865 

866 def _wait_for_dependencies(self): 1a

867 if not self.wait_for: 

868 return 

869 

870 visit_collection( 

871 self.wait_for, 

872 visit_fn=resolve_to_final_result, 

873 return_data=False, 

874 max_depth=-1, 

875 remove_annotations=True, 

876 context={}, 

877 ) 

878 

879 async def begin_run(self) -> State: 1a

880 try: 

881 self._resolve_parameters() 

882 self._wait_for_dependencies() 

883 except UpstreamTaskError as upstream_exc: 

884 state = await self.set_state( 

885 Pending( 

886 name="NotReady", 

887 message=str(upstream_exc), 

888 ), 

889 # if orchestrating a run already in a pending state, force orchestration to 

890 # update the state name 

891 force=self.state.is_pending(), 

892 ) 

893 return state 

894 

895 # validate prior to context so that context receives validated params 

896 if self.flow.should_validate_parameters: 

897 try: 

898 self.parameters = self.flow.validate_parameters(self.parameters or {}) 

899 except Exception as exc: 

900 message = "Validation of flow parameters failed with error:" 

901 self.logger.error("%s %s", message, exc) 

902 await self.handle_exception( 

903 exc, 

904 msg=message, 

905 result_store=get_result_store().update_for_flow( 

906 self.flow, _sync=True 

907 ), 

908 ) 

909 self.short_circuit = True 

910 

911 new_state = Running() 

912 state = await self.set_state(new_state) 

913 while state.is_pending(): 

914 await asyncio.sleep(0.2) 

915 state = await self.set_state(new_state) 

916 return state 

917 

918 async def set_state(self, state: State, force: bool = False) -> State: 1a

919 """ """ 

920 # prevents any state-setting activity 

921 if self.short_circuit: 

922 return self.state 

923 

924 state = await propose_state( 

925 self.client, state, flow_run_id=self.flow_run.id, force=force 

926 ) # type: ignore 

927 self.flow_run.state = state # type: ignore 

928 self.flow_run.state_name = state.name # type: ignore 

929 self.flow_run.state_type = state.type # type: ignore 

930 

931 self._telemetry.update_state(state) 

932 await self.call_hooks(state) 

933 return state 

934 

935 async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": 1a

936 if self._return_value is not NotSet and not isinstance( 

937 self._return_value, State 

938 ): 

939 _result = self._return_value 

940 link_state_to_flow_run_result(self.state, _result) 

941 

942 if asyncio.iscoroutine(_result): 

943 # getting the value for a BaseResult may return an awaitable 

944 # depending on whether the parent frame is sync or not 

945 _result = await _result 

946 return _result 

947 

948 if self._raised is not NotSet: 

949 if raise_on_failure: 

950 raise self._raised 

951 return self._raised 

952 

953 # This is a fall through case which leans on the existing state result mechanics to get the 

954 # return value. This is necessary because we currently will return a State object if the 

955 # the State was Prefect-created. 

956 # TODO: Remove the need to get the result from a State except in cases where the return value 

957 # is a State object. 

958 return await self.state.aresult(raise_on_failure=raise_on_failure) # type: ignore 

959 

960 async def handle_success(self, result: R) -> R: 1a

961 result_store = getattr(FlowRunContext.get(), "result_store", None) 

962 if result_store is None: 

963 raise ValueError("Result store is not set") 

964 resolved_result = resolve_futures_to_states(result) 

965 terminal_state = await return_value_to_state( 

966 resolved_result, 

967 result_store=result_store, 

968 write_result=should_persist_result(), 

969 ) 

970 await self.set_state(terminal_state) 

971 self._return_value = resolved_result 

972 

973 self._telemetry.end_span_on_success() 

974 

975 return result 

976 

977 async def handle_exception( 1a

978 self, 

979 exc: Exception, 

980 msg: Optional[str] = None, 

981 result_store: Optional[ResultStore] = None, 

982 ) -> State: 

983 context = FlowRunContext.get() 

984 terminal_state = cast( 

985 State, 

986 await exception_to_failed_state( 

987 exc, 

988 message=msg or "Flow run encountered an exception:", 

989 result_store=result_store or getattr(context, "result_store", None), 

990 write_result=True, 

991 ), 

992 ) 

993 state = await self.set_state(terminal_state) 

994 if self.state.is_scheduled(): 

995 self.logger.info( 

996 ( 

997 f"Received non-final state {state.name!r} when proposing final" 

998 f" state {terminal_state.name!r} and will attempt to run again..." 

999 ), 

1000 ) 

1001 state = await self.set_state(Running()) 

1002 self._raised = exc 

1003 self._telemetry.record_exception(exc) 

1004 self._telemetry.end_span_on_failure(state.message) 

1005 

1006 return state 

1007 

1008 async def handle_timeout(self, exc: TimeoutError) -> None: 1a

1009 if isinstance(exc, FlowRunTimeoutError): 

1010 message = ( 

1011 f"Flow run exceeded timeout of {self.flow.timeout_seconds} second(s)" 

1012 ) 

1013 else: 

1014 message = f"Flow run failed due to timeout: {exc!r}" 

1015 self.logger.error(message) 

1016 state = Failed( 

1017 data=exc, 

1018 message=message, 

1019 name="TimedOut", 

1020 ) 

1021 await self.set_state(state) 

1022 self._raised = exc 

1023 

1024 self._telemetry.record_exception(exc) 

1025 self._telemetry.end_span_on_failure(message) 

1026 

1027 async def handle_crash(self, exc: BaseException) -> None: 1a

1028 # need to shield from asyncio cancellation to ensure we update the state 

1029 # on the server before exiting 

1030 with CancelScope(shield=True): 

1031 state = await exception_to_crashed_state(exc) 

1032 self.logger.error(f"Crash detected! {state.message}") 

1033 self.logger.debug("Crash details:", exc_info=exc) 

1034 await self.set_state(state, force=True) 

1035 self._raised = exc 

1036 

1037 self._telemetry.record_exception(exc) 

1038 self._telemetry.end_span_on_failure(state.message) 

1039 

1040 async def load_subflow_run( 1a

1041 self, 

1042 parent_task_run: TaskRun, 

1043 client: PrefectClient, 

1044 context: FlowRunContext, 

1045 ) -> Union[FlowRun, None]: 

1046 """ 

1047 This method attempts to load an existing flow run for a subflow task 

1048 run, if appropriate. 

1049 

1050 If the parent task run is in a final but not COMPLETED state, and not 

1051 being rerun, then we attempt to load an existing flow run instead of 

1052 creating a new one. This will prevent the engine from running the 

1053 subflow again. 

1054 

1055 If no existing flow run is found, or if the subflow should be rerun, 

1056 then no flow run is returned. 

1057 """ 

1058 

1059 # check if the parent flow run is rerunning 

1060 rerunning = ( 

1061 context.flow_run.run_count > 1 

1062 if getattr(context, "flow_run", None) 

1063 and isinstance(context.flow_run, FlowRun) 

1064 else False 

1065 ) 

1066 

1067 # if the parent task run is in a final but not completed state, and 

1068 # not rerunning, then retrieve the most recent flow run instead of 

1069 # creating a new one. This effectively loads a cached flow run for 

1070 # situations where we are confident the flow should not be run 

1071 # again. 

1072 assert isinstance(parent_task_run.state, State) 

1073 if parent_task_run.state.is_final() and not ( 

1074 rerunning and not parent_task_run.state.is_completed() 

1075 ): 

1076 # return the most recent flow run, if it exists 

1077 flow_runs = await client.read_flow_runs( 

1078 flow_run_filter=FlowRunFilter( 

1079 parent_task_run_id={"any_": [parent_task_run.id]} 

1080 ), 

1081 sort=FlowRunSort.EXPECTED_START_TIME_ASC, 

1082 limit=1, 

1083 ) 

1084 if flow_runs: 

1085 loaded_flow_run = flow_runs[-1] 

1086 self._return_value = loaded_flow_run.state 

1087 return loaded_flow_run 

1088 

1089 async def create_flow_run(self, client: PrefectClient) -> FlowRun: 1a

1090 flow_run_ctx = FlowRunContext.get() 

1091 parameters = self.parameters or {} 

1092 

1093 parent_task_run = None 

1094 

1095 # this is a subflow run 

1096 if flow_run_ctx: 

1097 # add a task to a parent flow run that represents the execution of a subflow run 

1098 parent_task = Task( 

1099 name=self.flow.name, fn=self.flow.fn, version=self.flow.version 

1100 ) 

1101 

1102 parent_task_run = await parent_task.create_run( 

1103 flow_run_context=flow_run_ctx, 

1104 parameters=self.parameters, 

1105 wait_for=self.wait_for, 

1106 ) 

1107 

1108 # check if there is already a flow run for this subflow 

1109 if subflow_run := await self.load_subflow_run( 

1110 parent_task_run=parent_task_run, client=client, context=flow_run_ctx 

1111 ): 

1112 return subflow_run 

1113 

1114 return await client.create_flow_run( 

1115 flow=self.flow, 

1116 parameters=self.flow.serialize_parameters(parameters), 

1117 state=Pending(), 

1118 parent_task_run_id=getattr(parent_task_run, "id", None), 

1119 tags=TagsContext.get().current_tags, 

1120 ) 

1121 

1122 async def call_hooks(self, state: Optional[State] = None) -> None: 1a

1123 if state is None: 

1124 state = self.state 

1125 flow = self.flow 

1126 flow_run = self.flow_run 

1127 

1128 if not flow_run: 

1129 raise ValueError("Flow run is not set") 

1130 

1131 enable_cancellation_and_crashed_hooks = ( 

1132 os.environ.get( 

1133 "PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS", "true" 

1134 ).lower() 

1135 == "true" 

1136 ) 

1137 

1138 if state.is_failed() and flow.on_failure_hooks: 

1139 hooks = flow.on_failure_hooks 

1140 elif state.is_completed() and flow.on_completion_hooks: 

1141 hooks = flow.on_completion_hooks 

1142 elif ( 

1143 enable_cancellation_and_crashed_hooks 

1144 and state.is_cancelling() 

1145 and flow.on_cancellation_hooks 

1146 ): 

1147 hooks = flow.on_cancellation_hooks 

1148 elif ( 

1149 enable_cancellation_and_crashed_hooks 

1150 and state.is_crashed() 

1151 and flow.on_crashed_hooks 

1152 ): 

1153 hooks = flow.on_crashed_hooks 

1154 elif state.is_running() and flow.on_running_hooks: 

1155 hooks = flow.on_running_hooks 

1156 else: 

1157 hooks = None 

1158 

1159 for hook in hooks or []: 

1160 hook_name = get_hook_name(hook) 

1161 

1162 try: 

1163 self.logger.info( 

1164 f"Running hook {hook_name!r} in response to entering state" 

1165 f" {state.name!r}" 

1166 ) 

1167 result = hook(flow, flow_run, state) 

1168 if asyncio.iscoroutine(result): 

1169 await result 

1170 except Exception: 

1171 self.logger.error( 

1172 f"An error was encountered while running hook {hook_name!r}", 

1173 exc_info=True, 

1174 ) 

1175 else: 

1176 self.logger.info(f"Hook {hook_name!r} finished running successfully") 

1177 

1178 @asynccontextmanager 1a

1179 async def setup_run_context(self, client: Optional[PrefectClient] = None): 1a

1180 from prefect.utilities.engine import ( 

1181 should_log_prints, 

1182 ) 

1183 

1184 if client is None: 

1185 client = self.client 

1186 if not self.flow_run: 

1187 raise ValueError("Flow run not set") 

1188 

1189 self.flow_run = await client.read_flow_run(self.flow_run.id) 

1190 log_prints = should_log_prints(self.flow) 

1191 

1192 async with AsyncExitStack() as stack: 

1193 # TODO: Explore closing task runner before completing the flow to 

1194 # wait for futures to complete 

1195 stack.enter_context(capture_sigterm()) 

1196 if log_prints: 

1197 stack.enter_context(patch_print()) 

1198 task_runner = stack.enter_context(self.flow.task_runner.duplicate()) 

1199 stack.enter_context( 

1200 FlowRunContext( 

1201 flow=self.flow, 

1202 log_prints=log_prints, 

1203 flow_run=self.flow_run, 

1204 parameters=self.parameters, 

1205 client=client, 

1206 result_store=get_result_store().update_for_flow( 

1207 self.flow, _sync=True 

1208 ), 

1209 task_runner=task_runner, 

1210 persist_result=self.flow.persist_result 

1211 if self.flow.persist_result is not None 

1212 else should_persist_result(), 

1213 ) 

1214 ) 

1215 # Set deployment context vars only if this is the top-level deployment run 

1216 # (nested flows will inherit via ContextVar propagation) 

1217 if self.flow_run.deployment_id and not _deployment_id.get(): 

1218 id_token = _deployment_id.set(self.flow_run.deployment_id) 

1219 params_token = _deployment_parameters.set(self.flow_run.parameters) 

1220 stack.callback(_deployment_id.reset, id_token) 

1221 stack.callback(_deployment_parameters.reset, params_token) 

1222 stack.enter_context(ConcurrencyContextV1()) 

1223 stack.enter_context(ConcurrencyContext()) 

1224 if lease_id := self.state.state_details.deployment_concurrency_lease_id: 

1225 await stack.enter_async_context( 

1226 amaintain_concurrency_lease( 

1227 lease_id, 300, raise_on_lease_renewal_failure=True 

1228 ) 

1229 ) 

1230 

1231 # set the logger to the flow run logger 

1232 self.logger: "logging.Logger" = flow_run_logger( 

1233 flow_run=self.flow_run, flow=self.flow 

1234 ) 

1235 

1236 # update the flow run name if necessary 

1237 

1238 if not self._flow_run_name_set and self.flow.flow_run_name: 

1239 flow_run_name = resolve_custom_flow_run_name( 

1240 flow=self.flow, parameters=self.parameters 

1241 ) 

1242 await self.client.set_flow_run_name( 

1243 flow_run_id=self.flow_run.id, name=flow_run_name 

1244 ) 

1245 self.logger.extra["flow_run_name"] = flow_run_name 

1246 self.logger.debug( 

1247 f"Renamed flow run {self.flow_run.name!r} to {flow_run_name!r}" 

1248 ) 

1249 self.flow_run.name = flow_run_name 

1250 self._flow_run_name_set = True 

1251 

1252 self._telemetry.update_run_name(name=flow_run_name) 

1253 if self.flow_run.parent_task_run_id: 

1254 _logger = get_run_logger(FlowRunContext.get()) 

1255 run_type = "subflow" 

1256 else: 

1257 _logger = self.logger 

1258 run_type = "flow" 

1259 

1260 _logger.info( 

1261 f"Beginning {run_type} run {self.flow_run.name!r} for flow {self.flow.name!r}" 

1262 ) 

1263 

1264 if flow_run_url := url_for(self.flow_run): 

1265 self.logger.info( 

1266 f"View at {flow_run_url}", extra={"send_to_api": False} 

1267 ) 

1268 

1269 yield 

1270 

1271 @asynccontextmanager 1a

1272 async def initialize_run(self): 1a

1273 """ 

1274 Enters a client context and creates a flow run if needed. 

1275 """ 

1276 with hydrated_context(self.context): 

1277 async with AsyncClientContext.get_or_create() as client_ctx: 

1278 self._client = client_ctx.client 

1279 self._is_started = True 

1280 

1281 if not self.flow_run: 

1282 self.flow_run = await self.create_flow_run(self.client) 

1283 flow_run_url = url_for(self.flow_run) 

1284 

1285 if flow_run_url: 

1286 self.logger.info( 

1287 f"View at {flow_run_url}", extra={"send_to_api": False} 

1288 ) 

1289 else: 

1290 # Update the empirical policy to match the flow if it is not set 

1291 if self.flow_run.empirical_policy.retry_delay is None: 

1292 self.flow_run.empirical_policy.retry_delay = ( 

1293 self.flow.retry_delay_seconds 

1294 ) 

1295 

1296 if self.flow_run.empirical_policy.retries is None: 

1297 self.flow_run.empirical_policy.retries = self.flow.retries 

1298 

1299 await self.client.update_flow_run( 

1300 flow_run_id=self.flow_run.id, 

1301 flow_version=self.flow.version, 

1302 empirical_policy=self.flow_run.empirical_policy, 

1303 ) 

1304 

1305 await self._telemetry.async_start_span( 

1306 run=self.flow_run, 

1307 client=self.client, 

1308 parameters=self.parameters, 

1309 ) 

1310 

1311 try: 

1312 yield self 

1313 

1314 except TerminationSignal as exc: 

1315 self.cancel_all_tasks() 

1316 await self.handle_crash(exc) 

1317 raise 

1318 except Exception: 

1319 # regular exceptions are caught and re-raised to the user 

1320 raise 

1321 except (Abort, Pause) as exc: 

1322 if getattr(exc, "state", None): 

1323 # we set attribute explicitly because 

1324 # internals will have already called the state change API 

1325 self.flow_run.state = exc.state 

1326 raise 

1327 except GeneratorExit: 

1328 # Do not capture generator exits as crashes 

1329 raise 

1330 except BaseException as exc: 

1331 # We don't want to crash a flow run if the user code finished executing 

1332 if self.flow_run.state and not self.flow_run.state.is_final(): 

1333 # BaseExceptions are caught and handled as crashes 

1334 await self.handle_crash(exc) 

1335 raise 

1336 else: 

1337 self.logger.debug( 

1338 "BaseException was raised after user code finished executing", 

1339 exc_info=exc, 

1340 ) 

1341 finally: 

1342 # If debugging, use the more complete `repr` than the usual `str` description 

1343 display_state = ( 

1344 repr(self.state) if PREFECT_DEBUG_MODE else str(self.state) 

1345 ) 

1346 self.logger.log( 

1347 level=logging.INFO 

1348 if self.state.is_completed() 

1349 else logging.ERROR, 

1350 msg=f"Finished in state {display_state}", 

1351 ) 

1352 

1353 self._is_started = False 

1354 self._client = None 

1355 

1356 # -------------------------- 

1357 # 

1358 # The following methods compose the main task run loop 

1359 # 

1360 # -------------------------- 

1361 

1362 @asynccontextmanager 1a

1363 async def start(self) -> AsyncGenerator[None, None]: 1a

1364 async with self.initialize_run(): 

1365 with ( 

1366 trace.use_span(self._telemetry.span) 

1367 if self._telemetry.span 

1368 else nullcontext() 

1369 ): 

1370 await self.begin_run() 

1371 

1372 yield 

1373 

1374 @asynccontextmanager 1a

1375 async def run_context(self): 1a

1376 timeout_context = timeout_async if self.flow.isasync else timeout 

1377 # reenter the run context to ensure it is up to date for every run 

1378 async with self.setup_run_context(): 

1379 try: 

1380 with timeout_context( 

1381 seconds=self.flow.timeout_seconds, 

1382 timeout_exc_type=FlowRunTimeoutError, 

1383 ): 

1384 self.logger.debug( 

1385 f"Executing flow {self.flow.name!r} for flow run {self.flow_run.name!r}..." 

1386 ) 

1387 yield self 

1388 except TimeoutError as exc: 

1389 await self.handle_timeout(exc) 

1390 except Exception as exc: 

1391 self.logger.exception("Encountered exception during execution: %r", exc) 

1392 await self.handle_exception(exc) 

1393 

1394 async def call_flow_fn(self) -> Coroutine[Any, Any, R]: 1a

1395 """ 

1396 Convenience method to call the flow function. Returns a coroutine if the 

1397 flow is async. 

1398 """ 

1399 assert self.flow.isasync, "Flow must be async to be run with AsyncFlowRunEngine" 

1400 

1401 result = await call_with_parameters(self.flow.fn, self.parameters) 

1402 await self.handle_success(result) 

1403 return result 

1404 

1405 

1406def run_flow_sync( 1a

1407 flow: Flow[P, R], 

1408 flow_run: Optional[FlowRun] = None, 

1409 parameters: Optional[Dict[str, Any]] = None, 

1410 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, 

1411 return_type: Literal["state", "result"] = "result", 

1412 context: Optional[dict[str, Any]] = None, 

1413) -> Union[R, State, None]: 

1414 engine = FlowRunEngine[P, R]( 

1415 flow=flow, 

1416 parameters=parameters, 

1417 flow_run=flow_run, 

1418 wait_for=wait_for, 

1419 context=context, 

1420 ) 

1421 

1422 with engine.start(): 

1423 while engine.is_running(): 

1424 with engine.run_context(): 

1425 engine.call_flow_fn() 

1426 

1427 return engine.state if return_type == "state" else engine.result() 

1428 

1429 

1430async def run_flow_async( 1a

1431 flow: Flow[P, R], 

1432 flow_run: Optional[FlowRun] = None, 

1433 parameters: Optional[Dict[str, Any]] = None, 

1434 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, 

1435 return_type: Literal["state", "result"] = "result", 

1436 context: Optional[dict[str, Any]] = None, 

1437) -> Union[R, State, None]: 

1438 engine = AsyncFlowRunEngine[P, R]( 

1439 flow=flow, 

1440 parameters=parameters, 

1441 flow_run=flow_run, 

1442 wait_for=wait_for, 

1443 context=context, 

1444 ) 

1445 

1446 async with engine.start(): 

1447 while engine.is_running(): 

1448 async with engine.run_context(): 

1449 await engine.call_flow_fn() 

1450 

1451 return engine.state if return_type == "state" else await engine.result() 

1452 

1453 

1454def run_generator_flow_sync( 1a

1455 flow: Flow[P, R], 

1456 flow_run: Optional[FlowRun] = None, 

1457 parameters: Optional[Dict[str, Any]] = None, 

1458 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, 

1459 return_type: Literal["state", "result"] = "result", 

1460 context: Optional[dict[str, Any]] = None, 

1461) -> Generator[R, None, None]: 

1462 if return_type != "result": 

1463 raise ValueError("The return_type for a generator flow must be 'result'") 

1464 

1465 engine = FlowRunEngine[P, R]( 

1466 flow=flow, 

1467 parameters=parameters, 

1468 flow_run=flow_run, 

1469 wait_for=wait_for, 

1470 context=context, 

1471 ) 

1472 

1473 with engine.start(): 

1474 while engine.is_running(): 

1475 with engine.run_context(): 

1476 call_args, call_kwargs = parameters_to_args_kwargs( 

1477 flow.fn, engine.parameters or {} 

1478 ) 

1479 gen = flow.fn(*call_args, **call_kwargs) 

1480 try: 

1481 while True: 

1482 gen_result = next(gen) 

1483 # link the current state to the result for dependency tracking 

1484 link_state_to_flow_run_result(engine.state, gen_result) 

1485 yield gen_result 

1486 except StopIteration as exc: 

1487 engine.handle_success(exc.value) 

1488 except GeneratorExit as exc: 

1489 engine.handle_success(None) 

1490 gen.throw(exc) 

1491 

1492 return engine.result() 

1493 

1494 

1495async def run_generator_flow_async( 1a

1496 flow: Flow[P, R], 

1497 flow_run: Optional[FlowRun] = None, 

1498 parameters: Optional[Dict[str, Any]] = None, 

1499 wait_for: Optional[Iterable[PrefectFuture[R]]] = None, 

1500 return_type: Literal["state", "result"] = "result", 

1501 context: Optional[dict[str, Any]] = None, 

1502) -> AsyncGenerator[R, None]: 

1503 if return_type != "result": 

1504 raise ValueError("The return_type for a generator flow must be 'result'") 

1505 

1506 engine = AsyncFlowRunEngine[P, R]( 

1507 flow=flow, 

1508 parameters=parameters, 

1509 flow_run=flow_run, 

1510 wait_for=wait_for, 

1511 context=context, 

1512 ) 

1513 

1514 async with engine.start(): 

1515 while engine.is_running(): 

1516 async with engine.run_context(): 

1517 call_args, call_kwargs = parameters_to_args_kwargs( 

1518 flow.fn, engine.parameters or {} 

1519 ) 

1520 gen = flow.fn(*call_args, **call_kwargs) 

1521 try: 

1522 while True: 

1523 # can't use anext in Python < 3.10 

1524 gen_result = await gen.__anext__() 

1525 # link the current state to the result for dependency tracking 

1526 link_state_to_flow_run_result(engine.state, gen_result) 

1527 yield gen_result 

1528 except (StopAsyncIteration, GeneratorExit) as exc: 

1529 await engine.handle_success(None) 

1530 if isinstance(exc, GeneratorExit): 

1531 gen.throw(exc) 

1532 

1533 # async generators can't return, but we can raise failures here 

1534 if engine.state.is_failed(): 

1535 await engine.result() 

1536 

1537 

1538def run_flow( 1a

1539 flow: Flow[P, R], 

1540 flow_run: Optional[FlowRun] = None, 

1541 parameters: Optional[Dict[str, Any]] = None, 

1542 wait_for: Optional[Iterable[PrefectFuture[R]]] = None, 

1543 return_type: Literal["state", "result"] = "result", 

1544 error_logger: Optional[logging.Logger] = None, 

1545 context: Optional[dict[str, Any]] = None, 

1546) -> ( 

1547 R 

1548 | State 

1549 | None 

1550 | Coroutine[Any, Any, R | State | None] 

1551 | Generator[R, None, None] 

1552 | AsyncGenerator[R, None] 

1553): 

1554 ret_val: Union[ 

1555 R, 

1556 State, 

1557 None, 

1558 Coroutine[Any, Any, R | State | None], 

1559 Generator[R, None, None], 

1560 AsyncGenerator[R, None], 

1561 ] = None 

1562 

1563 try: 

1564 kwargs: dict[str, Any] = dict( 

1565 flow=flow, 

1566 flow_run=flow_run, 

1567 parameters=_flow_parameters( 

1568 flow=flow, flow_run=flow_run, parameters=parameters 

1569 ), 

1570 wait_for=wait_for, 

1571 return_type=return_type, 

1572 context=context, 

1573 ) 

1574 

1575 if flow.isasync and flow.isgenerator: 

1576 ret_val = run_generator_flow_async(**kwargs) 

1577 elif flow.isgenerator: 

1578 ret_val = run_generator_flow_sync(**kwargs) 

1579 elif flow.isasync: 

1580 ret_val = run_flow_async(**kwargs) 

1581 else: 

1582 ret_val = run_flow_sync(**kwargs) 

1583 except (Abort, Pause): 

1584 raise 

1585 except: 

1586 if error_logger: 

1587 error_logger.error( 

1588 "Engine execution exited with unexpected exception", exc_info=True 

1589 ) 

1590 raise 

1591 return ret_val 

1592 

1593 

1594def _flow_parameters( 1a

1595 flow: Flow[P, R], flow_run: Optional[FlowRun], parameters: Optional[Dict[str, Any]] 

1596) -> Dict[str, Any]: 

1597 if parameters: 

1598 # This path is taken when a flow is being called directly with 

1599 # parameters, in that case just return the parameters as-is. 

1600 return parameters 

1601 

1602 # Otherwise the flow is being executed indirectly and we may need to grab 

1603 # the parameters from the flow run. We also need to resolve any default 

1604 # parameters that are defined on the flow function itself. 

1605 

1606 parameters = flow_run.parameters if flow_run else {} 

1607 call_args, call_kwargs = parameters_to_args_kwargs(flow.fn, parameters) 

1608 return get_call_parameters(flow.fn, call_args, call_kwargs) 

1609 

1610 

1611def run_flow_in_subprocess( 1a

1612 flow: "Flow[..., Any]", 

1613 flow_run: "FlowRun | None" = None, 

1614 parameters: dict[str, Any] | None = None, 

1615 wait_for: Iterable[PrefectFuture[Any]] | None = None, 

1616 context: dict[str, Any] | None = None, 

1617) -> multiprocessing.context.SpawnProcess: 

1618 """ 

1619 Run a flow in a subprocess. 

1620 

1621 Note the result of the flow will only be accessible if the flow is configured to 

1622 persist its result. 

1623 

1624 Args: 

1625 flow: The flow to run. 

1626 flow_run: The flow run object containing run metadata. 

1627 parameters: The parameters to use when invoking the flow. 

1628 wait_for: The futures to wait for before starting the flow. 

1629 context: A serialized context to hydrate before running the flow. If not provided, 

1630 the current context will be used. A serialized context should be provided if 

1631 this function is called in a separate memory space from the parent run (e.g. 

1632 in a subprocess or on another machine). 

1633 

1634 Returns: 

1635 A multiprocessing.context.SpawnProcess representing the process that is running the flow. 

1636 """ 

1637 from prefect.flow_engine import run_flow 

1638 

1639 @wraps(run_flow) 

1640 def run_flow_with_env( 

1641 *args: Any, 

1642 env: dict[str, str] | None = None, 

1643 **kwargs: Any, 

1644 ): 

1645 """ 

1646 Wrapper function to update environment variables and settings before running the flow. 

1647 """ 

1648 os.environ.update(env or {}) 

1649 settings_context = get_settings_context() 

1650 # Create a new settings context with a new settings object to pick up the updated 

1651 # environment variables 

1652 with SettingsContext( 

1653 profile=settings_context.profile, 

1654 settings=Settings(), 

1655 ): 

1656 with handle_engine_signals(getattr(flow_run, "id", None)): 

1657 maybe_coro = run_flow(*args, **kwargs) 

1658 if asyncio.iscoroutine(maybe_coro): 

1659 # This is running in a brand new process, so there won't be an existing 

1660 # event loop. 

1661 asyncio.run(maybe_coro) 

1662 

1663 ctx = multiprocessing.get_context("spawn") 

1664 

1665 context = context or serialize_context() 

1666 

1667 process = ctx.Process( 

1668 target=cloudpickle_wrapped_call( 

1669 run_flow_with_env, 

1670 env=get_current_settings().to_environment_variables(exclude_unset=True) 

1671 | os.environ 

1672 | { 

1673 # TODO: make this a thing we can pass into the engine 

1674 "PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS": "false", 

1675 }, 

1676 flow=flow, 

1677 flow_run=flow_run, 

1678 parameters=parameters, 

1679 wait_for=wait_for, 

1680 context=context, 

1681 ), 

1682 ) 

1683 process.start() 

1684 

1685 return process