Coverage for /usr/local/lib/python3.12/site-packages/prefect/flow_engine.py: 14%
731 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import asyncio 1a
4import logging 1a
5import multiprocessing 1a
6import multiprocessing.context 1a
7import os 1a
8import time 1a
9from contextlib import ( 1a
10 AsyncExitStack,
11 ExitStack,
12 asynccontextmanager,
13 contextmanager,
14 nullcontext,
15)
16from dataclasses import dataclass, field 1a
17from functools import wraps 1a
18from typing import ( 1a
19 Any,
20 AsyncGenerator,
21 Coroutine,
22 Dict,
23 Generator,
24 Generic,
25 Iterable,
26 Literal,
27 Optional,
28 Type,
29 TypeVar,
30 Union,
31 cast,
32)
33from uuid import UUID 1a
35from anyio import CancelScope 1a
36from opentelemetry import propagate, trace 1a
37from typing_extensions import ParamSpec 1a
39from prefect import Task 1a
40from prefect.client.orchestration import PrefectClient, SyncPrefectClient, get_client 1a
41from prefect.client.schemas import FlowRun, TaskRun 1a
42from prefect.client.schemas.filters import FlowRunFilter 1a
43from prefect.client.schemas.sorting import FlowRunSort 1a
44from prefect.concurrency._leases import ( 1a
45 amaintain_concurrency_lease,
46 maintain_concurrency_lease,
47)
48from prefect.concurrency.context import ConcurrencyContext 1a
49from prefect.concurrency.v1.context import ConcurrencyContext as ConcurrencyContextV1 1a
50from prefect.context import ( 1a
51 AsyncClientContext,
52 FlowRunContext,
53 SettingsContext,
54 SyncClientContext,
55 TagsContext,
56 _deployment_id,
57 _deployment_parameters,
58 get_settings_context,
59 hydrated_context,
60 serialize_context,
61)
62from prefect.engine import handle_engine_signals 1a
63from prefect.exceptions import ( 1a
64 Abort,
65 MissingFlowError,
66 Pause,
67 PrefectException,
68 TerminationSignal,
69 UpstreamTaskError,
70)
71from prefect.flows import ( 1a
72 Flow,
73 load_flow_from_entrypoint,
74 load_flow_from_flow_run,
75 load_function_and_convert_to_flow,
76)
77from prefect.futures import PrefectFuture, resolve_futures_to_states 1a
78from prefect.logging.loggers import ( 1a
79 flow_run_logger,
80 get_logger,
81 get_run_logger,
82 patch_print,
83)
84from prefect.results import ( 1a
85 ResultStore,
86 get_result_store,
87 should_persist_result,
88)
89from prefect.settings import PREFECT_DEBUG_MODE 1a
90from prefect.settings.context import get_current_settings 1a
91from prefect.settings.models.root import Settings 1a
92from prefect.states import ( 1a
93 Failed,
94 Pending,
95 Running,
96 State,
97 exception_to_crashed_state,
98 exception_to_failed_state,
99 return_value_to_state,
100)
101from prefect.telemetry.run_telemetry import ( 1a
102 LABELS_TRACEPARENT_KEY,
103 TRACEPARENT_KEY,
104 OTELSetter,
105 RunTelemetry,
106)
107from prefect.types import KeyValueLabels 1a
108from prefect.utilities._engine import get_hook_name, resolve_custom_flow_run_name 1a
109from prefect.utilities.annotations import NotSet 1a
110from prefect.utilities.asyncutils import run_coro_as_sync 1a
111from prefect.utilities.callables import ( 1a
112 call_with_parameters,
113 cloudpickle_wrapped_call,
114 get_call_parameters,
115 parameters_to_args_kwargs,
116)
117from prefect.utilities.collections import visit_collection 1a
118from prefect.utilities.engine import ( 1a
119 capture_sigterm,
120 link_state_to_flow_run_result,
121 propose_state,
122 propose_state_sync,
123 resolve_to_final_result,
124)
125from prefect.utilities.timeout import timeout, timeout_async 1a
126from prefect.utilities.urls import url_for 1a
128P = ParamSpec("P") 1a
129R = TypeVar("R") 1a
132class FlowRunTimeoutError(TimeoutError): 1a
133 """Raised when a flow run exceeds its defined timeout."""
136def load_flow_run(flow_run_id: UUID) -> FlowRun: 1a
137 client = get_client(sync_client=True)
138 flow_run = client.read_flow_run(flow_run_id)
139 return flow_run
142def load_flow(flow_run: FlowRun) -> Flow[..., Any]: 1a
143 entrypoint = os.environ.get("PREFECT__FLOW_ENTRYPOINT")
145 if entrypoint:
146 # we should not accept a placeholder flow at runtime
147 try:
148 flow = load_flow_from_entrypoint(entrypoint, use_placeholder_flow=False)
149 except MissingFlowError:
150 flow = load_function_and_convert_to_flow(entrypoint)
151 else:
152 flow = run_coro_as_sync(
153 load_flow_from_flow_run(flow_run, use_placeholder_flow=False)
154 )
155 return flow
158def load_flow_and_flow_run(flow_run_id: UUID) -> tuple[FlowRun, Flow[..., Any]]: 1a
159 flow_run = load_flow_run(flow_run_id)
160 flow = load_flow(flow_run)
161 return flow_run, flow
164@dataclass 1a
165class BaseFlowRunEngine(Generic[P, R]): 1a
166 flow: Union[Flow[P, R], Flow[P, Coroutine[Any, Any, R]]] 1a
167 parameters: Optional[Dict[str, Any]] = None 1a
168 flow_run: Optional[FlowRun] = None 1a
169 flow_run_id: Optional[UUID] = None 1a
170 logger: logging.Logger = field(default_factory=lambda: get_logger("engine")) 1a
171 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None 1a
172 context: Optional[dict[str, Any]] = None 1a
173 # holds the return value from the user code
174 _return_value: Union[R, Type[NotSet]] = NotSet 1a
175 # holds the exception raised by the user code, if any
176 _raised: Union[Exception, Type[NotSet]] = NotSet 1a
177 _is_started: bool = False 1a
178 short_circuit: bool = False 1a
179 _flow_run_name_set: bool = False 1a
180 _telemetry: RunTelemetry = field(default_factory=RunTelemetry) 1a
182 def __post_init__(self) -> None: 1a
183 if self.flow is None and self.flow_run_id is None:
184 raise ValueError("Either a flow or a flow_run_id must be provided.")
186 if self.parameters is None:
187 self.parameters = {}
189 @property 1a
190 def state(self) -> State: 1a
191 return self.flow_run.state # type: ignore
193 def is_running(self) -> bool: 1a
194 if getattr(self, "flow_run", None) is None:
195 return False
196 return getattr(self, "flow_run").state.is_running()
198 def is_pending(self) -> bool: 1a
199 if getattr(self, "flow_run", None) is None:
200 return False # TODO: handle this differently?
201 return getattr(self, "flow_run").state.is_pending()
203 def cancel_all_tasks(self) -> None: 1a
204 if hasattr(self.flow.task_runner, "cancel_all"):
205 self.flow.task_runner.cancel_all() # type: ignore
207 def _update_otel_labels( 1a
208 self, span: trace.Span, client: Union[SyncPrefectClient, PrefectClient]
209 ):
210 parent_flow_run_ctx = FlowRunContext.get()
212 if parent_flow_run_ctx and parent_flow_run_ctx.flow_run:
213 if traceparent := parent_flow_run_ctx.flow_run.labels.get(
214 LABELS_TRACEPARENT_KEY
215 ):
216 carrier: KeyValueLabels = {TRACEPARENT_KEY: traceparent}
217 propagate.get_global_textmap().inject(
218 carrier={TRACEPARENT_KEY: traceparent},
219 setter=OTELSetter(),
220 )
222 else:
223 carrier: KeyValueLabels = {}
224 propagate.get_global_textmap().inject(
225 carrier,
226 context=trace.set_span_in_context(span),
227 setter=OTELSetter(),
228 )
229 if carrier.get(TRACEPARENT_KEY):
230 if self.flow_run:
231 client.update_flow_run_labels(
232 flow_run_id=self.flow_run.id,
233 labels={LABELS_TRACEPARENT_KEY: carrier[TRACEPARENT_KEY]},
234 )
235 else:
236 self.logger.info(
237 f"Tried to set traceparent {carrier[TRACEPARENT_KEY]} for flow run, but None was found"
238 )
241@dataclass 1a
242class FlowRunEngine(BaseFlowRunEngine[P, R]): 1a
243 _client: Optional[SyncPrefectClient] = None 1a
244 flow_run: FlowRun | None = None 1a
245 parameters: dict[str, Any] | None = None 1a
247 @property 1a
248 def client(self) -> SyncPrefectClient: 1a
249 if not self._is_started or self._client is None:
250 raise RuntimeError("Engine has not started.")
251 return self._client
253 def _resolve_parameters(self): 1a
254 if not self.parameters:
255 return
257 resolved_parameters = {}
258 for parameter, value in self.parameters.items():
259 try:
260 resolved_parameters[parameter] = visit_collection(
261 value,
262 visit_fn=resolve_to_final_result,
263 return_data=True,
264 max_depth=-1,
265 remove_annotations=True,
266 context={"parameter_name": parameter},
267 )
268 except UpstreamTaskError:
269 raise
270 except Exception as exc:
271 raise PrefectException(
272 f"Failed to resolve inputs in parameter {parameter!r}. If your"
273 " parameter type is not supported, consider using the `quote`"
274 " annotation to skip resolution of inputs."
275 ) from exc
277 self.parameters = resolved_parameters
279 def _wait_for_dependencies(self): 1a
280 if not self.wait_for:
281 return
283 visit_collection(
284 self.wait_for,
285 visit_fn=resolve_to_final_result,
286 return_data=False,
287 max_depth=-1,
288 remove_annotations=True,
289 context={},
290 )
292 def begin_run(self) -> State: 1a
293 try:
294 self._resolve_parameters()
295 self._wait_for_dependencies()
296 except UpstreamTaskError as upstream_exc:
297 state = self.set_state(
298 Pending(
299 name="NotReady",
300 message=str(upstream_exc),
301 ),
302 # if orchestrating a run already in a pending state, force orchestration to
303 # update the state name
304 force=self.state.is_pending(),
305 )
306 return state
308 # validate prior to context so that context receives validated params
309 if self.flow.should_validate_parameters:
310 try:
311 self.parameters = self.flow.validate_parameters(self.parameters or {})
312 except Exception as exc:
313 message = "Validation of flow parameters failed with error:"
314 self.logger.error("%s %s", message, exc)
315 self.handle_exception(
316 exc,
317 msg=message,
318 result_store=get_result_store().update_for_flow(
319 self.flow, _sync=True
320 ),
321 )
322 self.short_circuit = True
324 new_state = Running()
325 state = self.set_state(new_state)
326 while state.is_pending():
327 time.sleep(0.2)
328 state = self.set_state(new_state)
329 return state
331 def set_state(self, state: State, force: bool = False) -> State: 1a
332 """ """
333 # prevents any state-setting activity
334 if self.short_circuit:
335 return self.state
337 state = propose_state_sync(
338 self.client, state, flow_run_id=self.flow_run.id, force=force
339 ) # type: ignore
340 self.flow_run.state = state # type: ignore
341 self.flow_run.state_name = state.name # type: ignore
342 self.flow_run.state_type = state.type # type: ignore
344 self._telemetry.update_state(state)
345 self.call_hooks(state)
346 return state
348 def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": 1a
349 if self._return_value is not NotSet and not isinstance(
350 self._return_value, State
351 ):
352 _result = self._return_value
353 link_state_to_flow_run_result(self.state, _result)
355 if asyncio.iscoroutine(_result):
356 # getting the value for a BaseResult may return an awaitable
357 # depending on whether the parent frame is sync or not
358 _result = run_coro_as_sync(_result)
359 return _result
361 if self._raised is not NotSet:
362 if raise_on_failure:
363 raise self._raised
364 return self._raised
366 # This is a fall through case which leans on the existing state result mechanics to get the
367 # return value. This is necessary because we currently will return a State object if the
368 # the State was Prefect-created.
369 # TODO: Remove the need to get the result from a State except in cases where the return value
370 # is a State object.
371 _result = self.state.result(raise_on_failure=raise_on_failure, _sync=True) # type: ignore
372 return _result
374 def handle_success(self, result: R) -> R: 1a
375 result_store = getattr(FlowRunContext.get(), "result_store", None)
376 if result_store is None:
377 raise ValueError("Result store is not set")
378 resolved_result = resolve_futures_to_states(result)
379 terminal_state = run_coro_as_sync(
380 return_value_to_state(
381 resolved_result,
382 result_store=result_store,
383 write_result=should_persist_result(),
384 )
385 )
386 self.set_state(terminal_state)
387 self._return_value = resolved_result
389 link_state_to_flow_run_result(terminal_state, resolved_result)
390 self._telemetry.end_span_on_success()
392 return result
394 def handle_exception( 1a
395 self,
396 exc: Exception,
397 msg: Optional[str] = None,
398 result_store: Optional[ResultStore] = None,
399 ) -> State:
400 context = FlowRunContext.get()
401 terminal_state = cast(
402 State,
403 run_coro_as_sync(
404 exception_to_failed_state(
405 exc,
406 message=msg or "Flow run encountered an exception:",
407 result_store=result_store or getattr(context, "result_store", None),
408 write_result=True,
409 )
410 ),
411 )
412 state = self.set_state(terminal_state)
413 if self.state.is_scheduled():
414 self.logger.info(
415 (
416 f"Received non-final state {state.name!r} when proposing final"
417 f" state {terminal_state.name!r} and will attempt to run again..."
418 ),
419 )
420 state = self.set_state(Running())
421 self._raised = exc
422 self._telemetry.record_exception(exc)
423 self._telemetry.end_span_on_failure(state.message)
425 return state
427 def handle_timeout(self, exc: TimeoutError) -> None: 1a
428 if isinstance(exc, FlowRunTimeoutError):
429 message = (
430 f"Flow run exceeded timeout of {self.flow.timeout_seconds} second(s)"
431 )
432 else:
433 message = f"Flow run failed due to timeout: {exc!r}"
434 self.logger.error(message)
435 state = Failed(
436 data=exc,
437 message=message,
438 name="TimedOut",
439 )
440 self.set_state(state)
441 self._raised = exc
442 self._telemetry.record_exception(exc)
443 self._telemetry.end_span_on_failure(message)
445 def handle_crash(self, exc: BaseException) -> None: 1a
446 state = run_coro_as_sync(exception_to_crashed_state(exc))
447 self.logger.error(f"Crash detected! {state.message}")
448 self.logger.debug("Crash details:", exc_info=exc)
449 self.set_state(state, force=True)
450 self._raised = exc
451 self._telemetry.record_exception(exc)
452 self._telemetry.end_span_on_failure(state.message if state else None)
454 def load_subflow_run( 1a
455 self,
456 parent_task_run: TaskRun,
457 client: SyncPrefectClient,
458 context: FlowRunContext,
459 ) -> Union[FlowRun, None]:
460 """
461 This method attempts to load an existing flow run for a subflow task
462 run, if appropriate.
464 If the parent task run is in a final but not COMPLETED state, and not
465 being rerun, then we attempt to load an existing flow run instead of
466 creating a new one. This will prevent the engine from running the
467 subflow again.
469 If no existing flow run is found, or if the subflow should be rerun,
470 then no flow run is returned.
471 """
473 # check if the parent flow run is rerunning
474 rerunning = (
475 context.flow_run.run_count > 1
476 if getattr(context, "flow_run", None)
477 and isinstance(context.flow_run, FlowRun)
478 else False
479 )
481 # if the parent task run is in a final but not completed state, and
482 # not rerunning, then retrieve the most recent flow run instead of
483 # creating a new one. This effectively loads a cached flow run for
484 # situations where we are confident the flow should not be run
485 # again.
486 assert isinstance(parent_task_run.state, State)
487 if parent_task_run.state.is_final() and not (
488 rerunning and not parent_task_run.state.is_completed()
489 ):
490 # return the most recent flow run, if it exists
491 flow_runs = client.read_flow_runs(
492 flow_run_filter=FlowRunFilter(
493 parent_task_run_id={"any_": [parent_task_run.id]}
494 ),
495 sort=FlowRunSort.EXPECTED_START_TIME_ASC,
496 limit=1,
497 )
498 if flow_runs:
499 loaded_flow_run = flow_runs[-1]
500 self._return_value = loaded_flow_run.state
501 return loaded_flow_run
503 def create_flow_run(self, client: SyncPrefectClient) -> FlowRun: 1a
504 flow_run_ctx = FlowRunContext.get()
505 parameters = self.parameters or {}
507 parent_task_run = None
509 # this is a subflow run
510 if flow_run_ctx:
511 # add a task to a parent flow run that represents the execution of a subflow run
512 parent_task = Task(
513 name=self.flow.name, fn=self.flow.fn, version=self.flow.version
514 )
516 parent_task_run = run_coro_as_sync(
517 parent_task.create_run(
518 flow_run_context=flow_run_ctx,
519 parameters=self.parameters,
520 wait_for=self.wait_for,
521 )
522 )
524 # check if there is already a flow run for this subflow
525 if subflow_run := self.load_subflow_run(
526 parent_task_run=parent_task_run, client=client, context=flow_run_ctx
527 ):
528 return subflow_run
530 return client.create_flow_run(
531 flow=self.flow,
532 parameters=self.flow.serialize_parameters(parameters),
533 state=Pending(),
534 parent_task_run_id=getattr(parent_task_run, "id", None),
535 tags=TagsContext.get().current_tags,
536 )
538 def call_hooks(self, state: Optional[State] = None) -> None: 1a
539 if state is None:
540 state = self.state
541 flow = self.flow
542 flow_run = self.flow_run
544 if not flow_run:
545 raise ValueError("Flow run is not set")
547 enable_cancellation_and_crashed_hooks = (
548 os.environ.get(
549 "PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS", "true"
550 ).lower()
551 == "true"
552 )
554 if state.is_failed() and flow.on_failure_hooks:
555 hooks = flow.on_failure_hooks
556 elif state.is_completed() and flow.on_completion_hooks:
557 hooks = flow.on_completion_hooks
558 elif (
559 enable_cancellation_and_crashed_hooks
560 and state.is_cancelling()
561 and flow.on_cancellation_hooks
562 ):
563 hooks = flow.on_cancellation_hooks
564 elif (
565 enable_cancellation_and_crashed_hooks
566 and state.is_crashed()
567 and flow.on_crashed_hooks
568 ):
569 hooks = flow.on_crashed_hooks
570 elif state.is_running() and flow.on_running_hooks:
571 hooks = flow.on_running_hooks
572 else:
573 hooks = None
575 for hook in hooks or []:
576 hook_name = get_hook_name(hook)
578 try:
579 self.logger.info(
580 f"Running hook {hook_name!r} in response to entering state"
581 f" {state.name!r}"
582 )
583 result = hook(flow, flow_run, state)
584 if asyncio.iscoroutine(result):
585 run_coro_as_sync(result)
586 except Exception:
587 self.logger.error(
588 f"An error was encountered while running hook {hook_name!r}",
589 exc_info=True,
590 )
591 else:
592 self.logger.info(f"Hook {hook_name!r} finished running successfully")
594 @contextmanager 1a
595 def setup_run_context(self, client: Optional[SyncPrefectClient] = None): 1a
596 from prefect.utilities.engine import (
597 should_log_prints,
598 )
600 if client is None:
601 client = self.client
602 if not self.flow_run:
603 raise ValueError("Flow run not set")
605 self.flow_run = client.read_flow_run(self.flow_run.id)
606 log_prints = should_log_prints(self.flow)
608 with ExitStack() as stack:
609 # TODO: Explore closing task runner before completing the flow to
610 # wait for futures to complete
611 stack.enter_context(capture_sigterm())
612 if log_prints:
613 stack.enter_context(patch_print())
614 task_runner = stack.enter_context(self.flow.task_runner.duplicate())
615 stack.enter_context(
616 FlowRunContext(
617 flow=self.flow,
618 log_prints=log_prints,
619 flow_run=self.flow_run,
620 parameters=self.parameters,
621 client=client,
622 result_store=get_result_store().update_for_flow(
623 self.flow, _sync=True
624 ),
625 task_runner=task_runner,
626 persist_result=self.flow.persist_result
627 if self.flow.persist_result is not None
628 else should_persist_result(),
629 )
630 )
631 # Set deployment context vars only if this is the top-level deployment run
632 # (nested flows will inherit via ContextVar propagation)
633 if self.flow_run.deployment_id and not _deployment_id.get():
634 id_token = _deployment_id.set(self.flow_run.deployment_id)
635 params_token = _deployment_parameters.set(self.flow_run.parameters)
636 stack.callback(_deployment_id.reset, id_token)
637 stack.callback(_deployment_parameters.reset, params_token)
638 stack.enter_context(ConcurrencyContextV1())
639 stack.enter_context(ConcurrencyContext())
640 if lease_id := self.state.state_details.deployment_concurrency_lease_id:
641 stack.enter_context(
642 maintain_concurrency_lease(
643 lease_id, 300, raise_on_lease_renewal_failure=True
644 )
645 )
647 # set the logger to the flow run logger
649 self.logger: "logging.Logger" = flow_run_logger(
650 flow_run=self.flow_run, flow=self.flow
651 ) # type: ignore
653 # update the flow run name if necessary
654 if not self._flow_run_name_set and self.flow.flow_run_name:
655 flow_run_name = resolve_custom_flow_run_name(
656 flow=self.flow, parameters=self.parameters
657 )
658 self.client.set_flow_run_name(
659 flow_run_id=self.flow_run.id, name=flow_run_name
660 )
662 self.logger.extra["flow_run_name"] = flow_run_name
663 self.logger.debug(
664 f"Renamed flow run {self.flow_run.name!r} to {flow_run_name!r}"
665 )
666 self.flow_run.name = flow_run_name
667 self._flow_run_name_set = True
669 self._telemetry.update_run_name(name=flow_run_name)
671 if self.flow_run.parent_task_run_id:
672 _logger = get_run_logger(FlowRunContext.get())
673 run_type = "subflow"
674 else:
675 _logger = self.logger
676 run_type = "flow"
678 _logger.info(
679 f"Beginning {run_type} run {self.flow_run.name!r} for flow {self.flow.name!r}"
680 )
682 if flow_run_url := url_for(self.flow_run):
683 self.logger.info(
684 f"View at {flow_run_url}", extra={"send_to_api": False}
685 )
687 yield
689 @contextmanager 1a
690 def initialize_run(self): 1a
691 """
692 Enters a client context and creates a flow run if needed.
693 """
694 with hydrated_context(self.context):
695 with SyncClientContext.get_or_create() as client_ctx:
696 self._client = client_ctx.client
697 self._is_started = True
699 if not self.flow_run:
700 self.flow_run = self.create_flow_run(self.client)
701 else:
702 # Update the empirical policy to match the flow if it is not set
703 if self.flow_run.empirical_policy.retry_delay is None:
704 self.flow_run.empirical_policy.retry_delay = (
705 self.flow.retry_delay_seconds
706 )
708 if self.flow_run.empirical_policy.retries is None:
709 self.flow_run.empirical_policy.retries = self.flow.retries
711 self.client.update_flow_run(
712 flow_run_id=self.flow_run.id,
713 flow_version=self.flow.version,
714 empirical_policy=self.flow_run.empirical_policy,
715 )
717 self._telemetry.start_span(
718 run=self.flow_run,
719 client=self.client,
720 parameters=self.parameters,
721 )
723 try:
724 yield self
726 except TerminationSignal as exc:
727 self.cancel_all_tasks()
728 self.handle_crash(exc)
729 raise
730 except Exception:
731 # regular exceptions are caught and re-raised to the user
732 raise
733 except (Abort, Pause) as exc:
734 if getattr(exc, "state", None):
735 # we set attribute explicitly because
736 # internals will have already called the state change API
737 self.flow_run.state = exc.state
738 raise
739 except GeneratorExit:
740 # Do not capture generator exits as crashes
741 raise
742 except BaseException as exc:
743 # We don't want to crash a flow run if the user code finished executing
744 if self.flow_run.state and not self.flow_run.state.is_final():
745 # BaseExceptions are caught and handled as crashes
746 self.handle_crash(exc)
747 raise
748 else:
749 self.logger.debug(
750 "BaseException was raised after user code finished executing",
751 exc_info=exc,
752 )
753 finally:
754 # If debugging, use the more complete `repr` than the usual `str` description
755 display_state = (
756 repr(self.state) if PREFECT_DEBUG_MODE else str(self.state)
757 )
758 self.logger.log(
759 level=logging.INFO,
760 msg=f"Finished in state {display_state}",
761 )
763 self._is_started = False
764 self._client = None
766 # --------------------------
767 #
768 # The following methods compose the main task run loop
769 #
770 # --------------------------
772 @contextmanager 1a
773 def start(self) -> Generator[None, None, None]: 1a
774 with self.initialize_run():
775 with (
776 trace.use_span(self._telemetry.span)
777 if self._telemetry.span
778 else nullcontext()
779 ):
780 self.begin_run()
782 yield
784 @contextmanager 1a
785 def run_context(self): 1a
786 timeout_context = timeout_async if self.flow.isasync else timeout
787 # reenter the run context to ensure it is up to date for every run
788 with self.setup_run_context():
789 try:
790 with timeout_context(
791 seconds=self.flow.timeout_seconds,
792 timeout_exc_type=FlowRunTimeoutError,
793 ):
794 self.logger.debug(
795 f"Executing flow {self.flow.name!r} for flow run {self.flow_run.name!r}..."
796 )
797 yield self
798 except TimeoutError as exc:
799 self.handle_timeout(exc)
800 except Exception as exc:
801 self.logger.exception("Encountered exception during execution: %r", exc)
802 self.handle_exception(exc)
804 def call_flow_fn(self) -> Union[R, Coroutine[Any, Any, R]]: 1a
805 """
806 Convenience method to call the flow function. Returns a coroutine if the
807 flow is async.
808 """
809 if self.flow.isasync:
811 async def _call_flow_fn():
812 result = await call_with_parameters(self.flow.fn, self.parameters)
813 self.handle_success(result)
815 return _call_flow_fn()
816 else:
817 result = call_with_parameters(self.flow.fn, self.parameters)
818 self.handle_success(result)
821@dataclass 1a
822class AsyncFlowRunEngine(BaseFlowRunEngine[P, R]): 1a
823 """
824 Async version of the flow run engine.
826 NOTE: This has not been fully asyncified yet which may lead to async flows
827 not being fully asyncified.
828 """
830 _client: Optional[PrefectClient] = None 1a
831 parameters: dict[str, Any] | None = None 1a
832 flow_run: FlowRun | None = None 1a
834 @property 1a
835 def client(self) -> PrefectClient: 1a
836 if not self._is_started or self._client is None:
837 raise RuntimeError("Engine has not started.")
838 return self._client
840 def _resolve_parameters(self): 1a
841 if not self.parameters:
842 return
844 resolved_parameters = {}
845 for parameter, value in self.parameters.items():
846 try:
847 resolved_parameters[parameter] = visit_collection(
848 value,
849 visit_fn=resolve_to_final_result,
850 return_data=True,
851 max_depth=-1,
852 remove_annotations=True,
853 context={"parameter_name": parameter},
854 )
855 except UpstreamTaskError:
856 raise
857 except Exception as exc:
858 raise PrefectException(
859 f"Failed to resolve inputs in parameter {parameter!r}. If your"
860 " parameter type is not supported, consider using the `quote`"
861 " annotation to skip resolution of inputs."
862 ) from exc
864 self.parameters = resolved_parameters
866 def _wait_for_dependencies(self): 1a
867 if not self.wait_for:
868 return
870 visit_collection(
871 self.wait_for,
872 visit_fn=resolve_to_final_result,
873 return_data=False,
874 max_depth=-1,
875 remove_annotations=True,
876 context={},
877 )
879 async def begin_run(self) -> State: 1a
880 try:
881 self._resolve_parameters()
882 self._wait_for_dependencies()
883 except UpstreamTaskError as upstream_exc:
884 state = await self.set_state(
885 Pending(
886 name="NotReady",
887 message=str(upstream_exc),
888 ),
889 # if orchestrating a run already in a pending state, force orchestration to
890 # update the state name
891 force=self.state.is_pending(),
892 )
893 return state
895 # validate prior to context so that context receives validated params
896 if self.flow.should_validate_parameters:
897 try:
898 self.parameters = self.flow.validate_parameters(self.parameters or {})
899 except Exception as exc:
900 message = "Validation of flow parameters failed with error:"
901 self.logger.error("%s %s", message, exc)
902 await self.handle_exception(
903 exc,
904 msg=message,
905 result_store=get_result_store().update_for_flow(
906 self.flow, _sync=True
907 ),
908 )
909 self.short_circuit = True
911 new_state = Running()
912 state = await self.set_state(new_state)
913 while state.is_pending():
914 await asyncio.sleep(0.2)
915 state = await self.set_state(new_state)
916 return state
918 async def set_state(self, state: State, force: bool = False) -> State: 1a
919 """ """
920 # prevents any state-setting activity
921 if self.short_circuit:
922 return self.state
924 state = await propose_state(
925 self.client, state, flow_run_id=self.flow_run.id, force=force
926 ) # type: ignore
927 self.flow_run.state = state # type: ignore
928 self.flow_run.state_name = state.name # type: ignore
929 self.flow_run.state_type = state.type # type: ignore
931 self._telemetry.update_state(state)
932 await self.call_hooks(state)
933 return state
935 async def result(self, raise_on_failure: bool = True) -> "Union[R, State, None]": 1a
936 if self._return_value is not NotSet and not isinstance(
937 self._return_value, State
938 ):
939 _result = self._return_value
940 link_state_to_flow_run_result(self.state, _result)
942 if asyncio.iscoroutine(_result):
943 # getting the value for a BaseResult may return an awaitable
944 # depending on whether the parent frame is sync or not
945 _result = await _result
946 return _result
948 if self._raised is not NotSet:
949 if raise_on_failure:
950 raise self._raised
951 return self._raised
953 # This is a fall through case which leans on the existing state result mechanics to get the
954 # return value. This is necessary because we currently will return a State object if the
955 # the State was Prefect-created.
956 # TODO: Remove the need to get the result from a State except in cases where the return value
957 # is a State object.
958 return await self.state.aresult(raise_on_failure=raise_on_failure) # type: ignore
960 async def handle_success(self, result: R) -> R: 1a
961 result_store = getattr(FlowRunContext.get(), "result_store", None)
962 if result_store is None:
963 raise ValueError("Result store is not set")
964 resolved_result = resolve_futures_to_states(result)
965 terminal_state = await return_value_to_state(
966 resolved_result,
967 result_store=result_store,
968 write_result=should_persist_result(),
969 )
970 await self.set_state(terminal_state)
971 self._return_value = resolved_result
973 self._telemetry.end_span_on_success()
975 return result
977 async def handle_exception( 1a
978 self,
979 exc: Exception,
980 msg: Optional[str] = None,
981 result_store: Optional[ResultStore] = None,
982 ) -> State:
983 context = FlowRunContext.get()
984 terminal_state = cast(
985 State,
986 await exception_to_failed_state(
987 exc,
988 message=msg or "Flow run encountered an exception:",
989 result_store=result_store or getattr(context, "result_store", None),
990 write_result=True,
991 ),
992 )
993 state = await self.set_state(terminal_state)
994 if self.state.is_scheduled():
995 self.logger.info(
996 (
997 f"Received non-final state {state.name!r} when proposing final"
998 f" state {terminal_state.name!r} and will attempt to run again..."
999 ),
1000 )
1001 state = await self.set_state(Running())
1002 self._raised = exc
1003 self._telemetry.record_exception(exc)
1004 self._telemetry.end_span_on_failure(state.message)
1006 return state
1008 async def handle_timeout(self, exc: TimeoutError) -> None: 1a
1009 if isinstance(exc, FlowRunTimeoutError):
1010 message = (
1011 f"Flow run exceeded timeout of {self.flow.timeout_seconds} second(s)"
1012 )
1013 else:
1014 message = f"Flow run failed due to timeout: {exc!r}"
1015 self.logger.error(message)
1016 state = Failed(
1017 data=exc,
1018 message=message,
1019 name="TimedOut",
1020 )
1021 await self.set_state(state)
1022 self._raised = exc
1024 self._telemetry.record_exception(exc)
1025 self._telemetry.end_span_on_failure(message)
1027 async def handle_crash(self, exc: BaseException) -> None: 1a
1028 # need to shield from asyncio cancellation to ensure we update the state
1029 # on the server before exiting
1030 with CancelScope(shield=True):
1031 state = await exception_to_crashed_state(exc)
1032 self.logger.error(f"Crash detected! {state.message}")
1033 self.logger.debug("Crash details:", exc_info=exc)
1034 await self.set_state(state, force=True)
1035 self._raised = exc
1037 self._telemetry.record_exception(exc)
1038 self._telemetry.end_span_on_failure(state.message)
1040 async def load_subflow_run( 1a
1041 self,
1042 parent_task_run: TaskRun,
1043 client: PrefectClient,
1044 context: FlowRunContext,
1045 ) -> Union[FlowRun, None]:
1046 """
1047 This method attempts to load an existing flow run for a subflow task
1048 run, if appropriate.
1050 If the parent task run is in a final but not COMPLETED state, and not
1051 being rerun, then we attempt to load an existing flow run instead of
1052 creating a new one. This will prevent the engine from running the
1053 subflow again.
1055 If no existing flow run is found, or if the subflow should be rerun,
1056 then no flow run is returned.
1057 """
1059 # check if the parent flow run is rerunning
1060 rerunning = (
1061 context.flow_run.run_count > 1
1062 if getattr(context, "flow_run", None)
1063 and isinstance(context.flow_run, FlowRun)
1064 else False
1065 )
1067 # if the parent task run is in a final but not completed state, and
1068 # not rerunning, then retrieve the most recent flow run instead of
1069 # creating a new one. This effectively loads a cached flow run for
1070 # situations where we are confident the flow should not be run
1071 # again.
1072 assert isinstance(parent_task_run.state, State)
1073 if parent_task_run.state.is_final() and not (
1074 rerunning and not parent_task_run.state.is_completed()
1075 ):
1076 # return the most recent flow run, if it exists
1077 flow_runs = await client.read_flow_runs(
1078 flow_run_filter=FlowRunFilter(
1079 parent_task_run_id={"any_": [parent_task_run.id]}
1080 ),
1081 sort=FlowRunSort.EXPECTED_START_TIME_ASC,
1082 limit=1,
1083 )
1084 if flow_runs:
1085 loaded_flow_run = flow_runs[-1]
1086 self._return_value = loaded_flow_run.state
1087 return loaded_flow_run
1089 async def create_flow_run(self, client: PrefectClient) -> FlowRun: 1a
1090 flow_run_ctx = FlowRunContext.get()
1091 parameters = self.parameters or {}
1093 parent_task_run = None
1095 # this is a subflow run
1096 if flow_run_ctx:
1097 # add a task to a parent flow run that represents the execution of a subflow run
1098 parent_task = Task(
1099 name=self.flow.name, fn=self.flow.fn, version=self.flow.version
1100 )
1102 parent_task_run = await parent_task.create_run(
1103 flow_run_context=flow_run_ctx,
1104 parameters=self.parameters,
1105 wait_for=self.wait_for,
1106 )
1108 # check if there is already a flow run for this subflow
1109 if subflow_run := await self.load_subflow_run(
1110 parent_task_run=parent_task_run, client=client, context=flow_run_ctx
1111 ):
1112 return subflow_run
1114 return await client.create_flow_run(
1115 flow=self.flow,
1116 parameters=self.flow.serialize_parameters(parameters),
1117 state=Pending(),
1118 parent_task_run_id=getattr(parent_task_run, "id", None),
1119 tags=TagsContext.get().current_tags,
1120 )
1122 async def call_hooks(self, state: Optional[State] = None) -> None: 1a
1123 if state is None:
1124 state = self.state
1125 flow = self.flow
1126 flow_run = self.flow_run
1128 if not flow_run:
1129 raise ValueError("Flow run is not set")
1131 enable_cancellation_and_crashed_hooks = (
1132 os.environ.get(
1133 "PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS", "true"
1134 ).lower()
1135 == "true"
1136 )
1138 if state.is_failed() and flow.on_failure_hooks:
1139 hooks = flow.on_failure_hooks
1140 elif state.is_completed() and flow.on_completion_hooks:
1141 hooks = flow.on_completion_hooks
1142 elif (
1143 enable_cancellation_and_crashed_hooks
1144 and state.is_cancelling()
1145 and flow.on_cancellation_hooks
1146 ):
1147 hooks = flow.on_cancellation_hooks
1148 elif (
1149 enable_cancellation_and_crashed_hooks
1150 and state.is_crashed()
1151 and flow.on_crashed_hooks
1152 ):
1153 hooks = flow.on_crashed_hooks
1154 elif state.is_running() and flow.on_running_hooks:
1155 hooks = flow.on_running_hooks
1156 else:
1157 hooks = None
1159 for hook in hooks or []:
1160 hook_name = get_hook_name(hook)
1162 try:
1163 self.logger.info(
1164 f"Running hook {hook_name!r} in response to entering state"
1165 f" {state.name!r}"
1166 )
1167 result = hook(flow, flow_run, state)
1168 if asyncio.iscoroutine(result):
1169 await result
1170 except Exception:
1171 self.logger.error(
1172 f"An error was encountered while running hook {hook_name!r}",
1173 exc_info=True,
1174 )
1175 else:
1176 self.logger.info(f"Hook {hook_name!r} finished running successfully")
1178 @asynccontextmanager 1a
1179 async def setup_run_context(self, client: Optional[PrefectClient] = None): 1a
1180 from prefect.utilities.engine import (
1181 should_log_prints,
1182 )
1184 if client is None:
1185 client = self.client
1186 if not self.flow_run:
1187 raise ValueError("Flow run not set")
1189 self.flow_run = await client.read_flow_run(self.flow_run.id)
1190 log_prints = should_log_prints(self.flow)
1192 async with AsyncExitStack() as stack:
1193 # TODO: Explore closing task runner before completing the flow to
1194 # wait for futures to complete
1195 stack.enter_context(capture_sigterm())
1196 if log_prints:
1197 stack.enter_context(patch_print())
1198 task_runner = stack.enter_context(self.flow.task_runner.duplicate())
1199 stack.enter_context(
1200 FlowRunContext(
1201 flow=self.flow,
1202 log_prints=log_prints,
1203 flow_run=self.flow_run,
1204 parameters=self.parameters,
1205 client=client,
1206 result_store=get_result_store().update_for_flow(
1207 self.flow, _sync=True
1208 ),
1209 task_runner=task_runner,
1210 persist_result=self.flow.persist_result
1211 if self.flow.persist_result is not None
1212 else should_persist_result(),
1213 )
1214 )
1215 # Set deployment context vars only if this is the top-level deployment run
1216 # (nested flows will inherit via ContextVar propagation)
1217 if self.flow_run.deployment_id and not _deployment_id.get():
1218 id_token = _deployment_id.set(self.flow_run.deployment_id)
1219 params_token = _deployment_parameters.set(self.flow_run.parameters)
1220 stack.callback(_deployment_id.reset, id_token)
1221 stack.callback(_deployment_parameters.reset, params_token)
1222 stack.enter_context(ConcurrencyContextV1())
1223 stack.enter_context(ConcurrencyContext())
1224 if lease_id := self.state.state_details.deployment_concurrency_lease_id:
1225 await stack.enter_async_context(
1226 amaintain_concurrency_lease(
1227 lease_id, 300, raise_on_lease_renewal_failure=True
1228 )
1229 )
1231 # set the logger to the flow run logger
1232 self.logger: "logging.Logger" = flow_run_logger(
1233 flow_run=self.flow_run, flow=self.flow
1234 )
1236 # update the flow run name if necessary
1238 if not self._flow_run_name_set and self.flow.flow_run_name:
1239 flow_run_name = resolve_custom_flow_run_name(
1240 flow=self.flow, parameters=self.parameters
1241 )
1242 await self.client.set_flow_run_name(
1243 flow_run_id=self.flow_run.id, name=flow_run_name
1244 )
1245 self.logger.extra["flow_run_name"] = flow_run_name
1246 self.logger.debug(
1247 f"Renamed flow run {self.flow_run.name!r} to {flow_run_name!r}"
1248 )
1249 self.flow_run.name = flow_run_name
1250 self._flow_run_name_set = True
1252 self._telemetry.update_run_name(name=flow_run_name)
1253 if self.flow_run.parent_task_run_id:
1254 _logger = get_run_logger(FlowRunContext.get())
1255 run_type = "subflow"
1256 else:
1257 _logger = self.logger
1258 run_type = "flow"
1260 _logger.info(
1261 f"Beginning {run_type} run {self.flow_run.name!r} for flow {self.flow.name!r}"
1262 )
1264 if flow_run_url := url_for(self.flow_run):
1265 self.logger.info(
1266 f"View at {flow_run_url}", extra={"send_to_api": False}
1267 )
1269 yield
1271 @asynccontextmanager 1a
1272 async def initialize_run(self): 1a
1273 """
1274 Enters a client context and creates a flow run if needed.
1275 """
1276 with hydrated_context(self.context):
1277 async with AsyncClientContext.get_or_create() as client_ctx:
1278 self._client = client_ctx.client
1279 self._is_started = True
1281 if not self.flow_run:
1282 self.flow_run = await self.create_flow_run(self.client)
1283 flow_run_url = url_for(self.flow_run)
1285 if flow_run_url:
1286 self.logger.info(
1287 f"View at {flow_run_url}", extra={"send_to_api": False}
1288 )
1289 else:
1290 # Update the empirical policy to match the flow if it is not set
1291 if self.flow_run.empirical_policy.retry_delay is None:
1292 self.flow_run.empirical_policy.retry_delay = (
1293 self.flow.retry_delay_seconds
1294 )
1296 if self.flow_run.empirical_policy.retries is None:
1297 self.flow_run.empirical_policy.retries = self.flow.retries
1299 await self.client.update_flow_run(
1300 flow_run_id=self.flow_run.id,
1301 flow_version=self.flow.version,
1302 empirical_policy=self.flow_run.empirical_policy,
1303 )
1305 await self._telemetry.async_start_span(
1306 run=self.flow_run,
1307 client=self.client,
1308 parameters=self.parameters,
1309 )
1311 try:
1312 yield self
1314 except TerminationSignal as exc:
1315 self.cancel_all_tasks()
1316 await self.handle_crash(exc)
1317 raise
1318 except Exception:
1319 # regular exceptions are caught and re-raised to the user
1320 raise
1321 except (Abort, Pause) as exc:
1322 if getattr(exc, "state", None):
1323 # we set attribute explicitly because
1324 # internals will have already called the state change API
1325 self.flow_run.state = exc.state
1326 raise
1327 except GeneratorExit:
1328 # Do not capture generator exits as crashes
1329 raise
1330 except BaseException as exc:
1331 # We don't want to crash a flow run if the user code finished executing
1332 if self.flow_run.state and not self.flow_run.state.is_final():
1333 # BaseExceptions are caught and handled as crashes
1334 await self.handle_crash(exc)
1335 raise
1336 else:
1337 self.logger.debug(
1338 "BaseException was raised after user code finished executing",
1339 exc_info=exc,
1340 )
1341 finally:
1342 # If debugging, use the more complete `repr` than the usual `str` description
1343 display_state = (
1344 repr(self.state) if PREFECT_DEBUG_MODE else str(self.state)
1345 )
1346 self.logger.log(
1347 level=logging.INFO
1348 if self.state.is_completed()
1349 else logging.ERROR,
1350 msg=f"Finished in state {display_state}",
1351 )
1353 self._is_started = False
1354 self._client = None
1356 # --------------------------
1357 #
1358 # The following methods compose the main task run loop
1359 #
1360 # --------------------------
1362 @asynccontextmanager 1a
1363 async def start(self) -> AsyncGenerator[None, None]: 1a
1364 async with self.initialize_run():
1365 with (
1366 trace.use_span(self._telemetry.span)
1367 if self._telemetry.span
1368 else nullcontext()
1369 ):
1370 await self.begin_run()
1372 yield
1374 @asynccontextmanager 1a
1375 async def run_context(self): 1a
1376 timeout_context = timeout_async if self.flow.isasync else timeout
1377 # reenter the run context to ensure it is up to date for every run
1378 async with self.setup_run_context():
1379 try:
1380 with timeout_context(
1381 seconds=self.flow.timeout_seconds,
1382 timeout_exc_type=FlowRunTimeoutError,
1383 ):
1384 self.logger.debug(
1385 f"Executing flow {self.flow.name!r} for flow run {self.flow_run.name!r}..."
1386 )
1387 yield self
1388 except TimeoutError as exc:
1389 await self.handle_timeout(exc)
1390 except Exception as exc:
1391 self.logger.exception("Encountered exception during execution: %r", exc)
1392 await self.handle_exception(exc)
1394 async def call_flow_fn(self) -> Coroutine[Any, Any, R]: 1a
1395 """
1396 Convenience method to call the flow function. Returns a coroutine if the
1397 flow is async.
1398 """
1399 assert self.flow.isasync, "Flow must be async to be run with AsyncFlowRunEngine"
1401 result = await call_with_parameters(self.flow.fn, self.parameters)
1402 await self.handle_success(result)
1403 return result
1406def run_flow_sync( 1a
1407 flow: Flow[P, R],
1408 flow_run: Optional[FlowRun] = None,
1409 parameters: Optional[Dict[str, Any]] = None,
1410 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None,
1411 return_type: Literal["state", "result"] = "result",
1412 context: Optional[dict[str, Any]] = None,
1413) -> Union[R, State, None]:
1414 engine = FlowRunEngine[P, R](
1415 flow=flow,
1416 parameters=parameters,
1417 flow_run=flow_run,
1418 wait_for=wait_for,
1419 context=context,
1420 )
1422 with engine.start():
1423 while engine.is_running():
1424 with engine.run_context():
1425 engine.call_flow_fn()
1427 return engine.state if return_type == "state" else engine.result()
1430async def run_flow_async( 1a
1431 flow: Flow[P, R],
1432 flow_run: Optional[FlowRun] = None,
1433 parameters: Optional[Dict[str, Any]] = None,
1434 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None,
1435 return_type: Literal["state", "result"] = "result",
1436 context: Optional[dict[str, Any]] = None,
1437) -> Union[R, State, None]:
1438 engine = AsyncFlowRunEngine[P, R](
1439 flow=flow,
1440 parameters=parameters,
1441 flow_run=flow_run,
1442 wait_for=wait_for,
1443 context=context,
1444 )
1446 async with engine.start():
1447 while engine.is_running():
1448 async with engine.run_context():
1449 await engine.call_flow_fn()
1451 return engine.state if return_type == "state" else await engine.result()
1454def run_generator_flow_sync( 1a
1455 flow: Flow[P, R],
1456 flow_run: Optional[FlowRun] = None,
1457 parameters: Optional[Dict[str, Any]] = None,
1458 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None,
1459 return_type: Literal["state", "result"] = "result",
1460 context: Optional[dict[str, Any]] = None,
1461) -> Generator[R, None, None]:
1462 if return_type != "result":
1463 raise ValueError("The return_type for a generator flow must be 'result'")
1465 engine = FlowRunEngine[P, R](
1466 flow=flow,
1467 parameters=parameters,
1468 flow_run=flow_run,
1469 wait_for=wait_for,
1470 context=context,
1471 )
1473 with engine.start():
1474 while engine.is_running():
1475 with engine.run_context():
1476 call_args, call_kwargs = parameters_to_args_kwargs(
1477 flow.fn, engine.parameters or {}
1478 )
1479 gen = flow.fn(*call_args, **call_kwargs)
1480 try:
1481 while True:
1482 gen_result = next(gen)
1483 # link the current state to the result for dependency tracking
1484 link_state_to_flow_run_result(engine.state, gen_result)
1485 yield gen_result
1486 except StopIteration as exc:
1487 engine.handle_success(exc.value)
1488 except GeneratorExit as exc:
1489 engine.handle_success(None)
1490 gen.throw(exc)
1492 return engine.result()
1495async def run_generator_flow_async( 1a
1496 flow: Flow[P, R],
1497 flow_run: Optional[FlowRun] = None,
1498 parameters: Optional[Dict[str, Any]] = None,
1499 wait_for: Optional[Iterable[PrefectFuture[R]]] = None,
1500 return_type: Literal["state", "result"] = "result",
1501 context: Optional[dict[str, Any]] = None,
1502) -> AsyncGenerator[R, None]:
1503 if return_type != "result":
1504 raise ValueError("The return_type for a generator flow must be 'result'")
1506 engine = AsyncFlowRunEngine[P, R](
1507 flow=flow,
1508 parameters=parameters,
1509 flow_run=flow_run,
1510 wait_for=wait_for,
1511 context=context,
1512 )
1514 async with engine.start():
1515 while engine.is_running():
1516 async with engine.run_context():
1517 call_args, call_kwargs = parameters_to_args_kwargs(
1518 flow.fn, engine.parameters or {}
1519 )
1520 gen = flow.fn(*call_args, **call_kwargs)
1521 try:
1522 while True:
1523 # can't use anext in Python < 3.10
1524 gen_result = await gen.__anext__()
1525 # link the current state to the result for dependency tracking
1526 link_state_to_flow_run_result(engine.state, gen_result)
1527 yield gen_result
1528 except (StopAsyncIteration, GeneratorExit) as exc:
1529 await engine.handle_success(None)
1530 if isinstance(exc, GeneratorExit):
1531 gen.throw(exc)
1533 # async generators can't return, but we can raise failures here
1534 if engine.state.is_failed():
1535 await engine.result()
1538def run_flow( 1a
1539 flow: Flow[P, R],
1540 flow_run: Optional[FlowRun] = None,
1541 parameters: Optional[Dict[str, Any]] = None,
1542 wait_for: Optional[Iterable[PrefectFuture[R]]] = None,
1543 return_type: Literal["state", "result"] = "result",
1544 error_logger: Optional[logging.Logger] = None,
1545 context: Optional[dict[str, Any]] = None,
1546) -> (
1547 R
1548 | State
1549 | None
1550 | Coroutine[Any, Any, R | State | None]
1551 | Generator[R, None, None]
1552 | AsyncGenerator[R, None]
1553):
1554 ret_val: Union[
1555 R,
1556 State,
1557 None,
1558 Coroutine[Any, Any, R | State | None],
1559 Generator[R, None, None],
1560 AsyncGenerator[R, None],
1561 ] = None
1563 try:
1564 kwargs: dict[str, Any] = dict(
1565 flow=flow,
1566 flow_run=flow_run,
1567 parameters=_flow_parameters(
1568 flow=flow, flow_run=flow_run, parameters=parameters
1569 ),
1570 wait_for=wait_for,
1571 return_type=return_type,
1572 context=context,
1573 )
1575 if flow.isasync and flow.isgenerator:
1576 ret_val = run_generator_flow_async(**kwargs)
1577 elif flow.isgenerator:
1578 ret_val = run_generator_flow_sync(**kwargs)
1579 elif flow.isasync:
1580 ret_val = run_flow_async(**kwargs)
1581 else:
1582 ret_val = run_flow_sync(**kwargs)
1583 except (Abort, Pause):
1584 raise
1585 except:
1586 if error_logger:
1587 error_logger.error(
1588 "Engine execution exited with unexpected exception", exc_info=True
1589 )
1590 raise
1591 return ret_val
1594def _flow_parameters( 1a
1595 flow: Flow[P, R], flow_run: Optional[FlowRun], parameters: Optional[Dict[str, Any]]
1596) -> Dict[str, Any]:
1597 if parameters:
1598 # This path is taken when a flow is being called directly with
1599 # parameters, in that case just return the parameters as-is.
1600 return parameters
1602 # Otherwise the flow is being executed indirectly and we may need to grab
1603 # the parameters from the flow run. We also need to resolve any default
1604 # parameters that are defined on the flow function itself.
1606 parameters = flow_run.parameters if flow_run else {}
1607 call_args, call_kwargs = parameters_to_args_kwargs(flow.fn, parameters)
1608 return get_call_parameters(flow.fn, call_args, call_kwargs)
1611def run_flow_in_subprocess( 1a
1612 flow: "Flow[..., Any]",
1613 flow_run: "FlowRun | None" = None,
1614 parameters: dict[str, Any] | None = None,
1615 wait_for: Iterable[PrefectFuture[Any]] | None = None,
1616 context: dict[str, Any] | None = None,
1617) -> multiprocessing.context.SpawnProcess:
1618 """
1619 Run a flow in a subprocess.
1621 Note the result of the flow will only be accessible if the flow is configured to
1622 persist its result.
1624 Args:
1625 flow: The flow to run.
1626 flow_run: The flow run object containing run metadata.
1627 parameters: The parameters to use when invoking the flow.
1628 wait_for: The futures to wait for before starting the flow.
1629 context: A serialized context to hydrate before running the flow. If not provided,
1630 the current context will be used. A serialized context should be provided if
1631 this function is called in a separate memory space from the parent run (e.g.
1632 in a subprocess or on another machine).
1634 Returns:
1635 A multiprocessing.context.SpawnProcess representing the process that is running the flow.
1636 """
1637 from prefect.flow_engine import run_flow
1639 @wraps(run_flow)
1640 def run_flow_with_env(
1641 *args: Any,
1642 env: dict[str, str] | None = None,
1643 **kwargs: Any,
1644 ):
1645 """
1646 Wrapper function to update environment variables and settings before running the flow.
1647 """
1648 os.environ.update(env or {})
1649 settings_context = get_settings_context()
1650 # Create a new settings context with a new settings object to pick up the updated
1651 # environment variables
1652 with SettingsContext(
1653 profile=settings_context.profile,
1654 settings=Settings(),
1655 ):
1656 with handle_engine_signals(getattr(flow_run, "id", None)):
1657 maybe_coro = run_flow(*args, **kwargs)
1658 if asyncio.iscoroutine(maybe_coro):
1659 # This is running in a brand new process, so there won't be an existing
1660 # event loop.
1661 asyncio.run(maybe_coro)
1663 ctx = multiprocessing.get_context("spawn")
1665 context = context or serialize_context()
1667 process = ctx.Process(
1668 target=cloudpickle_wrapped_call(
1669 run_flow_with_env,
1670 env=get_current_settings().to_environment_variables(exclude_unset=True)
1671 | os.environ
1672 | {
1673 # TODO: make this a thing we can pass into the engine
1674 "PREFECT__ENABLE_CANCELLATION_AND_CRASHED_HOOKS": "false",
1675 },
1676 flow=flow,
1677 flow_run=flow_run,
1678 parameters=parameters,
1679 wait_for=wait_for,
1680 context=context,
1681 ),
1682 )
1683 process.start()
1685 return process