Coverage for /usr/local/lib/python3.12/site-packages/prefect/flows.py: 23%
758 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Module containing the base workflow class and decorator - for most use cases, using the `@flow` decorator is preferred.
3"""
5from __future__ import annotations 1a
7# This file requires type-checking with pyright because mypy does not yet support PEP612
8# See https://github.com/python/mypy/issues/8645
9import ast 1a
10import asyncio 1a
11import datetime 1a
12import importlib.util 1a
13import inspect 1a
14import os 1a
15import re 1a
16import sys 1a
17import tempfile 1a
18import uuid 1a
19import warnings 1a
20from copy import copy 1a
21from functools import partial, update_wrapper 1a
22from pathlib import Path 1a
23from typing import ( 1a
24 TYPE_CHECKING,
25 Any,
26 Awaitable,
27 Callable,
28 Coroutine,
29 Generic,
30 Iterable,
31 List,
32 NoReturn,
33 Optional,
34 Protocol,
35 Tuple,
36 Type,
37 TypeVar,
38 Union,
39 cast,
40 overload,
41)
42from uuid import UUID 1a
44import pydantic 1a
45from exceptiongroup import BaseExceptionGroup, ExceptionGroup 1a
46from rich.console import Console 1a
47from typing_extensions import Literal, ParamSpec 1a
49from prefect._experimental.sla.objects import SlaTypes 1a
50from prefect._internal.concurrency.api import create_call, from_async 1a
51from prefect._versioning import VersionType 1a
52from prefect.client.schemas.filters import WorkerFilter, WorkerFilterStatus 1a
53from prefect.client.schemas.objects import ConcurrencyLimitConfig, FlowRun 1a
54from prefect.client.utilities import client_injector 1a
55from prefect.events import DeploymentTriggerTypes, TriggerTypes 1a
56from prefect.exceptions import ( 1a
57 InvalidNameError,
58 MissingFlowError,
59 ObjectNotFound,
60 ParameterTypeError,
61 ScriptError,
62 TerminationSignal,
63 UnspecifiedFlowError,
64)
65from prefect.filesystems import LocalFileSystem, ReadableDeploymentStorage 1a
66from prefect.futures import PrefectFlowRunFuture, PrefectFuture 1a
67from prefect.logging import get_logger 1a
68from prefect.logging.loggers import flow_run_logger 1a
69from prefect.results import ResultSerializer, ResultStorage 1a
70from prefect.schedules import Schedule 1a
71from prefect.settings import ( 1a
72 PREFECT_DEFAULT_WORK_POOL_NAME,
73 PREFECT_FLOW_DEFAULT_RETRIES,
74 PREFECT_FLOW_DEFAULT_RETRY_DELAY_SECONDS,
75 PREFECT_TESTING_UNIT_TEST_MODE,
76 PREFECT_UI_URL,
77)
78from prefect.states import State 1a
79from prefect.task_runners import TaskRunner, ThreadPoolTaskRunner 1a
80from prefect.types import BANNED_CHARACTERS, WITHOUT_BANNED_CHARACTERS 1a
81from prefect.types.entrypoint import EntrypointType 1a
82from prefect.utilities.annotations import NotSet 1a
83from prefect.utilities.asyncutils import ( 1a
84 run_coro_as_sync,
85 run_sync_in_worker_thread,
86 sync_compatible,
87)
88from prefect.utilities.callables import ( 1a
89 ParameterSchema,
90 get_call_parameters,
91 parameter_schema,
92 parameters_to_args_kwargs,
93 raise_for_reserved_arguments,
94)
95from prefect.utilities.collections import listrepr, visit_collection 1a
96from prefect.utilities.filesystem import relative_path_to_current_platform 1a
97from prefect.utilities.hashing import file_hash 1a
98from prefect.utilities.importtools import import_object, safe_load_namespace 1a
100from ._internal.compatibility.async_dispatch import async_dispatch, is_in_async_context 1a
101from ._internal.pydantic.v2_schema import is_v2_type 1a
102from ._internal.pydantic.validated_func import ValidatedFunction 1a
104if TYPE_CHECKING: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true1a
105 from prefect.docker.docker_image import DockerImage
106 from prefect.workers.base import BaseWorker
109T = TypeVar("T") # Generic type var for capturing the inner return type of async funcs 1a
110R = TypeVar("R") # The return type of the user's function 1a
111P = ParamSpec("P") # The parameters of the flow 1a
112F = TypeVar("F", bound="Flow[Any, Any]") # The type of the flow 1a
115class FlowStateHook(Protocol, Generic[P, R]): 1a
116 """
117 A callable that is invoked when a flow enters a given state.
118 """
120 __name__: str 1a
122 def __call__( 122 ↛ exitline 122 didn't return from function '__call__' because 1a
123 self, flow: Flow[P, R], flow_run: FlowRun, state: State
124 ) -> Awaitable[None] | None: ...
127if TYPE_CHECKING: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true1a
128 import logging
130 from prefect.client.orchestration import PrefectClient
131 from prefect.client.schemas.objects import FlowRun
132 from prefect.client.types.flexible_schedule_list import FlexibleScheduleList
133 from prefect.deployments.runner import RunnerDeployment
134 from prefect.runner.storage import RunnerStorage
136logger: "logging.Logger" = get_logger("flows") 1a
139class Flow(Generic[P, R]): 1a
140 """
141 A Prefect workflow definition.
143 Wraps a function with an entrypoint to the Prefect engine. To preserve the input
144 and output types, we use the generic type variables `P` and `R` for "Parameters" and
145 "Returns" respectively.
147 Args:
148 fn: The function defining the workflow.
149 name: An optional name for the flow; if not provided, the name will be inferred
150 from the given function.
151 version: An optional version string for the flow; if not provided, we will
152 attempt to create a version string as a hash of the file containing the
153 wrapped function; if the file cannot be located, the version will be null.
154 flow_run_name: An optional name to distinguish runs of this flow; this name can
155 be provided as a string template with the flow's parameters as variables,
156 or a function that returns a string.
157 task_runner: An optional task runner to use for task execution within the flow;
158 if not provided, a `ThreadPoolTaskRunner` will be used.
159 description: An optional string description for the flow; if not provided, the
160 description will be pulled from the docstring for the decorated function.
161 timeout_seconds: An optional number of seconds indicating a maximum runtime for
162 the flow. If the flow exceeds this runtime, it will be marked as failed.
163 Flow execution may continue until the next task is called.
164 validate_parameters: By default, parameters passed to flows are validated by
165 Pydantic. This will check that input values conform to the annotated types
166 on the function. Where possible, values will be coerced into the correct
167 type; for example, if a parameter is defined as `x: int` and "5" is passed,
168 it will be resolved to `5`. If set to `False`, no validation will be
169 performed on flow parameters.
170 retries: An optional number of times to retry on flow run failure.
171 retry_delay_seconds: An optional number of seconds to wait before retrying the
172 flow after failure. This is only applicable if `retries` is nonzero.
173 persist_result: An optional toggle indicating whether the result of this flow
174 should be persisted to result storage. Defaults to `None`, which indicates
175 that Prefect should choose whether the result should be persisted depending on
176 the features being used.
177 result_storage: An optional block to use to persist the result of this flow.
178 This value will be used as the default for any tasks in this flow.
179 If not provided, the local file system will be used unless called as
180 a subflow, at which point the default will be loaded from the parent flow.
181 result_serializer: An optional serializer to use to serialize the result of this
182 flow for persistence. This value will be used as the default for any tasks
183 in this flow. If not provided, the value of `PREFECT_RESULTS_DEFAULT_SERIALIZER`
184 will be used unless called as a subflow, at which point the default will be
185 loaded from the parent flow.
186 on_failure: An optional list of callables to run when the flow enters a failed state.
187 on_completion: An optional list of callables to run when the flow enters a completed state.
188 on_cancellation: An optional list of callables to run when the flow enters a cancelling state.
189 on_crashed: An optional list of callables to run when the flow enters a crashed state.
190 on_running: An optional list of callables to run when the flow enters a running state.
191 """
193 # NOTE: These parameters (types, defaults, and docstrings) should be duplicated
194 # exactly in the @flow decorator
195 def __init__( 1a
196 self,
197 fn: Callable[P, R] | "classmethod[Any, P, R]" | "staticmethod[P, R]",
198 name: Optional[str] = None,
199 version: Optional[str] = None,
200 flow_run_name: Optional[Union[Callable[[], str], str]] = None,
201 retries: Optional[int] = None,
202 retry_delay_seconds: Optional[Union[int, float]] = None,
203 task_runner: Union[
204 Type[TaskRunner[PrefectFuture[Any]]], TaskRunner[PrefectFuture[Any]], None
205 ] = None,
206 description: Optional[str] = None,
207 timeout_seconds: Union[int, float, None] = None,
208 validate_parameters: bool = True,
209 persist_result: Optional[bool] = None,
210 result_storage: Optional[Union[ResultStorage, str]] = None,
211 result_serializer: Optional[ResultSerializer] = None,
212 cache_result_in_memory: bool = True,
213 log_prints: Optional[bool] = None,
214 on_completion: Optional[list[FlowStateHook[P, R]]] = None,
215 on_failure: Optional[list[FlowStateHook[P, R]]] = None,
216 on_cancellation: Optional[list[FlowStateHook[P, R]]] = None,
217 on_crashed: Optional[list[FlowStateHook[P, R]]] = None,
218 on_running: Optional[list[FlowStateHook[P, R]]] = None,
219 ):
220 if name is not None and not isinstance(name, str): # pyright: ignore[reportUnnecessaryIsInstance] 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true1a
221 raise TypeError(
222 "Expected string for flow parameter 'name'; got {} instead. {}".format(
223 type(name).__name__,
224 (
225 "Perhaps you meant to call it? e.g."
226 " '@flow(name=get_flow_run_name())'"
227 if callable(name)
228 else ""
229 ),
230 )
231 )
233 # Validate if hook passed is list and contains callables
234 hook_categories = [ 1a
235 on_completion,
236 on_failure,
237 on_cancellation,
238 on_crashed,
239 on_running,
240 ]
241 hook_names = [ 1a
242 "on_completion",
243 "on_failure",
244 "on_cancellation",
245 "on_crashed",
246 "on_running",
247 ]
248 for hooks, hook_name in zip(hook_categories, hook_names): 1a
249 if hooks is not None: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true1a
250 try:
251 hooks = list(hooks)
252 except TypeError:
253 raise TypeError(
254 f"Expected iterable for '{hook_name}'; got"
255 f" {type(hooks).__name__} instead. Please provide a list of"
256 f" hooks to '{hook_name}':\n\n"
257 f"@flow({hook_name}=[hook1, hook2])\ndef"
258 " my_flow():\n\tpass"
259 )
261 for hook in hooks:
262 if not callable(hook):
263 raise TypeError(
264 f"Expected callables in '{hook_name}'; got"
265 f" {type(hook).__name__} instead. Please provide a list of"
266 f" hooks to '{hook_name}':\n\n"
267 f"@flow({hook_name}=[hook1, hook2])\ndef"
268 " my_flow():\n\tpass"
269 )
271 if isinstance(fn, classmethod): 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true1a
272 fn = cast(Callable[P, R], fn.__func__)
273 self._isclassmethod = True
275 if isinstance(fn, staticmethod): 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true1a
276 fn = cast(Callable[P, R], fn.__func__)
277 self._isstaticmethod = True
279 if not callable(fn): 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true1a
280 raise TypeError("'fn' must be callable")
282 self.name: str = name or fn.__name__.replace("_", "-").replace( 1a
283 "<lambda>",
284 "unknown-lambda", # prefect API will not accept "<" or ">" in flow names
285 )
286 _raise_on_name_with_banned_characters(self.name) 1a
288 if flow_run_name is not None: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true1a
289 if not isinstance(flow_run_name, str) and not callable(flow_run_name):
290 raise TypeError(
291 "Expected string or callable for 'flow_run_name'; got"
292 f" {type(flow_run_name).__name__} instead."
293 )
294 self.flow_run_name = flow_run_name 1a
296 if task_runner is None: 296 ↛ 301line 296 didn't jump to line 301 because the condition on line 296 was always true1a
297 self.task_runner: TaskRunner[PrefectFuture[Any]] = cast( 1a
298 TaskRunner[PrefectFuture[Any]], ThreadPoolTaskRunner()
299 )
300 else:
301 self.task_runner: TaskRunner[PrefectFuture[Any]] = (
302 task_runner() if isinstance(task_runner, type) else task_runner
303 )
305 self.log_prints = log_prints 1a
307 self.description: str | None = description or inspect.getdoc(fn) 1a
308 update_wrapper(self, fn) 1a
309 self.fn = fn 1a
311 # the flow is considered async if its function is async or an async
312 # generator
313 self.isasync: bool = inspect.iscoroutinefunction( 1a
314 self.fn
315 ) or inspect.isasyncgenfunction(self.fn)
317 # the flow is considered a generator if its function is a generator or
318 # an async generator
319 self.isgenerator: bool = inspect.isgeneratorfunction( 1a
320 self.fn
321 ) or inspect.isasyncgenfunction(self.fn)
323 raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"]) 1a
325 # Version defaults to a hash of the function's file
326 if not version: 326 ↛ 334line 326 didn't jump to line 334 because the condition on line 326 was always true1a
327 try: 1a
328 flow_file = inspect.getsourcefile(self.fn) 1a
329 if flow_file is None: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true1a
330 raise FileNotFoundError
331 version = file_hash(flow_file) 1a
332 except (FileNotFoundError, TypeError, OSError):
333 pass # `getsourcefile` can return null values and "<stdin>" for objects in repls
334 self.version = version 1a
336 self.timeout_seconds: float | None = ( 1a
337 float(timeout_seconds) if timeout_seconds else None
338 )
340 # FlowRunPolicy settings
341 # TODO: We can instantiate a `FlowRunPolicy` and add Pydantic bound checks to
342 # validate that the user passes positive numbers here
343 self.retries: int = ( 1a
344 retries if retries is not None else PREFECT_FLOW_DEFAULT_RETRIES.value()
345 )
347 self.retry_delay_seconds: float | int = ( 1a
348 retry_delay_seconds
349 if retry_delay_seconds is not None
350 else PREFECT_FLOW_DEFAULT_RETRY_DELAY_SECONDS.value()
351 )
353 self.parameters: ParameterSchema = parameter_schema(self.fn) 1a
354 self.should_validate_parameters = validate_parameters 1a
356 if self.should_validate_parameters: 356 ↛ 370line 356 didn't jump to line 370 because the condition on line 356 was always true1a
357 # Try to create the validated function now so that incompatibility can be
358 # raised at declaration time rather than at runtime
359 # We cannot, however, store the validated function on the flow because it
360 # is not picklable in some environments
361 try: 1a
362 ValidatedFunction(self.fn, config={"arbitrary_types_allowed": True}) 1a
363 except Exception as exc:
364 raise ValueError(
365 "Flow function is not compatible with `validate_parameters`. "
366 "Disable validation or change the argument names."
367 ) from exc
369 # result persistence settings
370 if persist_result is None: 370 ↛ 374line 370 didn't jump to line 374 because the condition on line 370 was always true1a
371 if result_storage is not None or result_serializer is not None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true1a
372 persist_result = True
374 self.persist_result = persist_result 1a
375 if result_storage and not isinstance(result_storage, str): 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true1a
376 if getattr(result_storage, "_block_document_id", None) is None:
377 raise TypeError(
378 "Result storage configuration must be persisted server-side."
379 " Please call `.save()` on your block before passing it in."
380 )
381 self.result_storage = result_storage 1a
382 self.result_serializer = result_serializer 1a
383 self.cache_result_in_memory = cache_result_in_memory 1a
384 self.on_completion_hooks: list[FlowStateHook[P, R]] = on_completion or [] 1a
385 self.on_failure_hooks: list[FlowStateHook[P, R]] = on_failure or [] 1a
386 self.on_cancellation_hooks: list[FlowStateHook[P, R]] = on_cancellation or [] 1a
387 self.on_crashed_hooks: list[FlowStateHook[P, R]] = on_crashed or [] 1a
388 self.on_running_hooks: list[FlowStateHook[P, R]] = on_running or [] 1a
390 # Used for flows loaded from remote storage
391 self._storage: Optional["RunnerStorage"] = None 1a
392 self._entrypoint: Optional[str] = None 1a
394 module = fn.__module__ 1a
395 if module and (module == "__main__" or module.startswith("__prefect_loader_")): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true1a
396 module_name = inspect.getfile(fn)
397 module = module_name if module_name != "__main__" else module
399 self._entrypoint = f"{module}:{getattr(fn, '__qualname__', fn.__name__)}" 1a
401 @property 1a
402 def ismethod(self) -> bool: 1a
403 return hasattr(self.fn, "__prefect_self__")
405 @property 1a
406 def isclassmethod(self) -> bool: 1a
407 return getattr(self, "_isclassmethod", False)
409 @property 1a
410 def isstaticmethod(self) -> bool: 1a
411 return getattr(self, "_isstaticmethod", False)
413 def __get__(self, instance: Any, owner: Any) -> "Flow[P, R]": 1a
414 """
415 Implement the descriptor protocol so that the flow can be used as an instance or class method.
416 When an instance method is loaded, this method is called with the "self" instance as
417 an argument. We return a copy of the flow with that instance bound to the flow's function.
418 """
419 # wrapped function is a classmethod
420 if self.isclassmethod:
421 bound_task = copy(self)
422 setattr(bound_task.fn, "__prefect_cls__", owner)
423 return bound_task
425 # if the task is being accessed on an instance, bind the instance to the __prefect_self__ attribute
426 # of the task's function. This will allow it to be automatically added to the task's parameters
427 if instance:
428 bound_task = copy(self)
429 bound_task.fn.__prefect_self__ = instance # type: ignore[attr-defined]
430 return bound_task
432 return self
434 def with_options( 1a
435 self,
436 *,
437 name: Optional[str] = None,
438 version: Optional[str] = None,
439 retries: Optional[int] = None,
440 retry_delay_seconds: Optional[Union[int, float]] = None,
441 description: Optional[str] = None,
442 flow_run_name: Optional[Union[Callable[[], str], str]] = None,
443 task_runner: Union[
444 Type[TaskRunner[PrefectFuture[Any]]], TaskRunner[PrefectFuture[Any]], None
445 ] = None,
446 timeout_seconds: Union[int, float, None] = None,
447 validate_parameters: Optional[bool] = None,
448 persist_result: Optional[bool] = NotSet, # type: ignore
449 result_storage: Optional[ResultStorage] = NotSet, # type: ignore
450 result_serializer: Optional[ResultSerializer] = NotSet, # type: ignore
451 cache_result_in_memory: Optional[bool] = None,
452 log_prints: Optional[bool] = NotSet, # type: ignore
453 on_completion: Optional[list[FlowStateHook[P, R]]] = None,
454 on_failure: Optional[list[FlowStateHook[P, R]]] = None,
455 on_cancellation: Optional[list[FlowStateHook[P, R]]] = None,
456 on_crashed: Optional[list[FlowStateHook[P, R]]] = None,
457 on_running: Optional[list[FlowStateHook[P, R]]] = None,
458 ) -> "Flow[P, R]":
459 """
460 Create a new flow from the current object, updating provided options.
462 Args:
463 name: A new name for the flow.
464 version: A new version for the flow.
465 description: A new description for the flow.
466 flow_run_name: An optional name to distinguish runs of this flow; this name
467 can be provided as a string template with the flow's parameters as variables,
468 or a function that returns a string.
469 task_runner: A new task runner for the flow.
470 timeout_seconds: A new number of seconds to fail the flow after if still
471 running.
472 validate_parameters: A new value indicating if flow calls should validate
473 given parameters.
474 retries: A new number of times to retry on flow run failure.
475 retry_delay_seconds: A new number of seconds to wait before retrying the
476 flow after failure. This is only applicable if `retries` is nonzero.
477 persist_result: A new option for enabling or disabling result persistence.
478 result_storage: A new storage type to use for results.
479 result_serializer: A new serializer to use for results.
480 cache_result_in_memory: A new value indicating if the flow's result should
481 be cached in memory.
482 on_failure: A new list of callables to run when the flow enters a failed state.
483 on_completion: A new list of callables to run when the flow enters a completed state.
484 on_cancellation: A new list of callables to run when the flow enters a cancelling state.
485 on_crashed: A new list of callables to run when the flow enters a crashed state.
486 on_running: A new list of callables to run when the flow enters a running state.
488 Returns:
489 A new `Flow` instance.
491 Examples:
493 Create a new flow from an existing flow and update the name:
495 ```python
496 from prefect import flow
498 @flow(name="My flow")
499 def my_flow():
500 return 1
502 new_flow = my_flow.with_options(name="My new flow")
503 ```
505 Create a new flow from an existing flow, update the task runner, and call
506 it without an intermediate variable:
508 ```python
509 from prefect.task_runners import ThreadPoolTaskRunner
511 @flow
512 def my_flow(x, y):
513 return x + y
515 state = my_flow.with_options(task_runner=ThreadPoolTaskRunner)(1, 3)
516 assert state.result() == 4
517 ```
518 """
519 new_task_runner = (
520 task_runner() if isinstance(task_runner, type) else task_runner
521 )
522 if new_task_runner is None:
523 new_task_runner = self.task_runner
524 new_flow = Flow(
525 fn=self.fn,
526 name=name or self.name,
527 description=description or self.description,
528 flow_run_name=flow_run_name or self.flow_run_name,
529 version=version or self.version,
530 task_runner=new_task_runner,
531 retries=retries if retries is not None else self.retries,
532 retry_delay_seconds=(
533 retry_delay_seconds
534 if retry_delay_seconds is not None
535 else self.retry_delay_seconds
536 ),
537 timeout_seconds=(
538 timeout_seconds if timeout_seconds is not None else self.timeout_seconds
539 ),
540 validate_parameters=(
541 validate_parameters
542 if validate_parameters is not None
543 else self.should_validate_parameters
544 ),
545 persist_result=(
546 persist_result if persist_result is not NotSet else self.persist_result
547 ),
548 result_storage=(
549 result_storage if result_storage is not NotSet else self.result_storage
550 ),
551 result_serializer=(
552 result_serializer
553 if result_serializer is not NotSet
554 else self.result_serializer
555 ),
556 cache_result_in_memory=(
557 cache_result_in_memory
558 if cache_result_in_memory is not None
559 else self.cache_result_in_memory
560 ),
561 log_prints=log_prints if log_prints is not NotSet else self.log_prints,
562 on_completion=on_completion or self.on_completion_hooks,
563 on_failure=on_failure or self.on_failure_hooks,
564 on_cancellation=on_cancellation or self.on_cancellation_hooks,
565 on_crashed=on_crashed or self.on_crashed_hooks,
566 on_running=on_running or self.on_running_hooks,
567 )
568 new_flow._storage = self._storage
569 new_flow._entrypoint = self._entrypoint
570 return new_flow
572 def validate_parameters(self, parameters: dict[str, Any]) -> dict[str, Any]: 1a
573 """
574 Validate parameters for compatibility with the flow by attempting to cast the inputs to the
575 associated types specified by the function's type annotations.
577 Returns:
578 A new dict of parameters that have been cast to the appropriate types
580 Raises:
581 ParameterTypeError: if the provided parameters are not valid
582 """
584 def resolve_block_reference(data: Any | dict[str, Any]) -> Any:
585 if isinstance(data, dict) and "$ref" in data:
586 from prefect.blocks.core import Block
588 return Block.load_from_ref(data["$ref"], _sync=True)
589 return data
591 try:
592 parameters = visit_collection(
593 parameters, resolve_block_reference, return_data=True
594 )
595 except (ValueError, RuntimeError) as exc:
596 raise ParameterTypeError(
597 "Failed to resolve block references in parameters."
598 ) from exc
600 args, kwargs = parameters_to_args_kwargs(self.fn, parameters)
602 if sys.version_info >= (3, 14): # Pydantic v1 is not supported in Python 3.14+
603 has_v1_models = False
604 else:
605 from pydantic.v1 import BaseModel as V1BaseModel
607 with warnings.catch_warnings():
608 warnings.filterwarnings(
609 "ignore", category=pydantic.warnings.PydanticDeprecatedSince20
610 )
611 has_v1_models = any(isinstance(o, V1BaseModel) for o in args) or any(
612 isinstance(o, V1BaseModel) for o in kwargs.values()
613 )
615 has_v2_types = any(is_v2_type(o) for o in args) or any(
616 is_v2_type(o) for o in kwargs.values()
617 )
619 if has_v1_models and has_v2_types:
620 raise ParameterTypeError(
621 "Cannot mix Pydantic v1 and v2 types as arguments to a flow."
622 )
624 try:
625 if has_v1_models:
626 from pydantic.v1.decorator import (
627 ValidatedFunction as V1ValidatedFunction,
628 )
630 validated_fn = V1ValidatedFunction(
631 self.fn, config=dict(arbitrary_types_allowed=True)
632 )
633 with warnings.catch_warnings():
634 warnings.filterwarnings(
635 "ignore", category=pydantic.warnings.PydanticDeprecatedSince20
636 )
637 model = validated_fn.init_model_instance(*args, **kwargs)
639 # Get the updated parameter dict with cast values from the model
640 cast_parameters = {
641 k: v
642 for k, v in dict(iter(model)).items()
643 if k in model.model_fields_set
644 or type(model).model_fields[k].default_factory
645 }
646 return cast_parameters
647 else:
648 validated_fn = ValidatedFunction(
649 self.fn, config=pydantic.ConfigDict(arbitrary_types_allowed=True)
650 )
651 return validated_fn.validate_call_args(args, kwargs)
653 except pydantic.ValidationError as exc:
654 # We capture the pydantic exception and raise our own because the pydantic
655 # exception is not picklable when using a cythonized pydantic installation
656 logger.error(
657 f"Parameter validation failed for flow {self.name!r}: {exc.errors()}"
658 f"\nParameters: {parameters}"
659 )
660 raise ParameterTypeError.from_validation_error(exc) from None
662 def serialize_parameters( 1a
663 self, parameters: dict[str, Any | PrefectFuture[Any] | State]
664 ) -> dict[str, Any]:
665 """
666 Convert parameters to a serializable form.
668 Uses FastAPI's `jsonable_encoder` to convert to JSON compatible objects without
669 converting everything directly to a string. This maintains basic types like
670 integers during API roundtrips.
671 """
672 serialized_parameters: dict[str, Any] = {}
673 for key, value in parameters.items():
674 # do not serialize the bound self object
675 if self.ismethod and value is getattr(self.fn, "__prefect_self__", None):
676 continue
677 if self.isclassmethod and value is getattr(
678 self.fn, "__prefect_cls__", None
679 ):
680 continue
681 if isinstance(value, (PrefectFuture, State)):
682 # Don't call jsonable_encoder() on a PrefectFuture or State to
683 # avoid triggering a __getitem__ call
684 serialized_parameters[key] = f"<{type(value).__name__}>"
685 continue
686 try:
687 from fastapi.encoders import jsonable_encoder
689 serialized_parameters[key] = jsonable_encoder(value)
690 except (TypeError, ValueError):
691 logger.debug(
692 f"Parameter {key!r} for flow {self.name!r} is unserializable. "
693 f"Type {type(value).__name__!r} and will not be stored "
694 "in the backend."
695 )
696 serialized_parameters[key] = f"<{type(value).__name__}>"
697 return serialized_parameters
699 async def ato_deployment( 1a
700 self,
701 name: str,
702 interval: Optional[
703 Union[
704 Iterable[Union[int, float, datetime.timedelta]],
705 int,
706 float,
707 datetime.timedelta,
708 ]
709 ] = None,
710 cron: Optional[Union[Iterable[str], str]] = None,
711 rrule: Optional[Union[Iterable[str], str]] = None,
712 paused: Optional[bool] = None,
713 schedule: Optional[Schedule] = None,
714 schedules: Optional["FlexibleScheduleList"] = None,
715 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
716 parameters: Optional[dict[str, Any]] = None,
717 triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
718 description: Optional[str] = None,
719 tags: Optional[list[str]] = None,
720 version: Optional[str] = None,
721 version_type: Optional[VersionType] = None,
722 enforce_parameter_schema: bool = True,
723 work_pool_name: Optional[str] = None,
724 work_queue_name: Optional[str] = None,
725 job_variables: Optional[dict[str, Any]] = None,
726 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH,
727 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental
728 ) -> "RunnerDeployment":
729 """
730 Asynchronously creates a runner deployment object for this flow.
732 Args:
733 name: The name to give the created deployment.
734 interval: An interval on which to execute the new deployment. Accepts either a number
735 or a timedelta object. If a number is given, it will be interpreted as seconds.
736 cron: A cron schedule of when to execute runs of this deployment.
737 rrule: An rrule schedule of when to execute runs of this deployment.
738 paused: Whether or not to set this deployment as paused.
739 schedule: A schedule object defining when to execute runs of this deployment.
740 Used to provide additional scheduling options like `timezone` or `parameters`.
741 schedules: A list of schedule objects defining when to execute runs of this deployment.
742 Used to define multiple schedules or additional scheduling options such as `timezone`.
743 concurrency_limit: The maximum number of runs of this deployment that can run at the same time.
744 parameters: A dictionary of default parameter values to pass to runs of this deployment.
745 triggers: A list of triggers that will kick off runs of this deployment.
746 description: A description for the created deployment. Defaults to the flow's
747 description if not provided.
748 tags: A list of tags to associate with the created deployment for organizational
749 purposes.
750 version: A version for the created deployment. Defaults to the flow's version.
751 version_type: The type of version to use for the created deployment. The version type
752 will be inferred if not provided.
753 enforce_parameter_schema: Whether or not the Prefect API should enforce the
754 parameter schema for the created deployment.
755 work_pool_name: The name of the work pool to use for this deployment.
756 work_queue_name: The name of the work queue to use for this deployment's scheduled runs.
757 If not provided the default work queue for the work pool will be used.
758 job_variables: Settings used to override the values specified default base job template
759 of the chosen work pool. Refer to the base job template of the chosen work pool for
760 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path
761 entrypoint, ensure that the module will be importable in the execution environment.
762 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
764 Examples:
765 Prepare two deployments and serve them:
767 ```python
768 from prefect import flow, serve
770 @flow
771 def my_flow(name):
772 print(f"hello {name}")
774 @flow
775 def my_other_flow(name):
776 print(f"goodbye {name}")
778 if __name__ == "__main__":
779 hello_deploy = my_flow.to_deployment("hello", tags=["dev"])
780 bye_deploy = my_other_flow.to_deployment("goodbye", tags=["dev"])
781 serve(hello_deploy, bye_deploy)
782 ```
783 """
784 from prefect.deployments.runner import RunnerDeployment
786 if not name.endswith(".py"):
787 _raise_on_name_with_banned_characters(name)
789 if self._storage and self._entrypoint:
790 return await RunnerDeployment.afrom_storage(
791 storage=self._storage,
792 entrypoint=self._entrypoint,
793 name=name,
794 flow_name=self.name,
795 interval=interval,
796 cron=cron,
797 rrule=rrule,
798 paused=paused,
799 schedule=schedule,
800 schedules=schedules,
801 concurrency_limit=concurrency_limit,
802 tags=tags,
803 triggers=triggers,
804 parameters=parameters or {},
805 description=description,
806 version=version,
807 version_type=version_type,
808 enforce_parameter_schema=enforce_parameter_schema,
809 work_pool_name=work_pool_name,
810 work_queue_name=work_queue_name,
811 job_variables=job_variables,
812 _sla=_sla,
813 )
814 else:
815 return RunnerDeployment.from_flow(
816 flow=self,
817 name=name,
818 interval=interval,
819 cron=cron,
820 rrule=rrule,
821 paused=paused,
822 schedule=schedule,
823 schedules=schedules,
824 concurrency_limit=concurrency_limit,
825 tags=tags,
826 triggers=triggers,
827 parameters=parameters or {},
828 description=description,
829 version=version,
830 version_type=version_type,
831 enforce_parameter_schema=enforce_parameter_schema,
832 work_pool_name=work_pool_name,
833 work_queue_name=work_queue_name,
834 job_variables=job_variables,
835 entrypoint_type=entrypoint_type,
836 _sla=_sla,
837 )
839 @async_dispatch(ato_deployment) 1a
840 def to_deployment( 1a
841 self,
842 name: str,
843 interval: Optional[
844 Union[
845 Iterable[Union[int, float, datetime.timedelta]],
846 int,
847 float,
848 datetime.timedelta,
849 ]
850 ] = None,
851 cron: Optional[Union[Iterable[str], str]] = None,
852 rrule: Optional[Union[Iterable[str], str]] = None,
853 paused: Optional[bool] = None,
854 schedule: Optional[Schedule] = None,
855 schedules: Optional["FlexibleScheduleList"] = None,
856 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
857 parameters: Optional[dict[str, Any]] = None,
858 triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
859 description: Optional[str] = None,
860 tags: Optional[list[str]] = None,
861 version: Optional[str] = None,
862 version_type: Optional[VersionType] = None,
863 enforce_parameter_schema: bool = True,
864 work_pool_name: Optional[str] = None,
865 work_queue_name: Optional[str] = None,
866 job_variables: Optional[dict[str, Any]] = None,
867 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH,
868 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental
869 ) -> "RunnerDeployment":
870 """
871 Creates a runner deployment object for this flow.
873 Args:
874 name: The name to give the created deployment.
875 interval: An interval on which to execute the new deployment. Accepts either a number
876 or a timedelta object. If a number is given, it will be interpreted as seconds.
877 cron: A cron schedule of when to execute runs of this deployment.
878 rrule: An rrule schedule of when to execute runs of this deployment.
879 paused: Whether or not to set this deployment as paused.
880 schedule: A schedule object defining when to execute runs of this deployment.
881 Used to provide additional scheduling options like `timezone` or `parameters`.
882 schedules: A list of schedule objects defining when to execute runs of this deployment.
883 Used to define multiple schedules or additional scheduling options such as `timezone`.
884 concurrency_limit: The maximum number of runs of this deployment that can run at the same time.
885 parameters: A dictionary of default parameter values to pass to runs of this deployment.
886 triggers: A list of triggers that will kick off runs of this deployment.
887 description: A description for the created deployment. Defaults to the flow's
888 description if not provided.
889 tags: A list of tags to associate with the created deployment for organizational
890 purposes.
891 version: A version for the created deployment. Defaults to the flow's version.
892 version_type: The type of version to use for the created deployment. The version type
893 will be inferred if not provided.
894 enforce_parameter_schema: Whether or not the Prefect API should enforce the
895 parameter schema for the created deployment.
896 work_pool_name: The name of the work pool to use for this deployment.
897 work_queue_name: The name of the work queue to use for this deployment's scheduled runs.
898 If not provided the default work queue for the work pool will be used.
899 job_variables: Settings used to override the values specified default base job template
900 of the chosen work pool. Refer to the base job template of the chosen work pool for
901 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path
902 entrypoint, ensure that the module will be importable in the execution environment.
903 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
905 Examples:
906 Prepare two deployments and serve them:
908 ```python
909 from prefect import flow, serve
911 @flow
912 def my_flow(name):
913 print(f"hello {name}")
915 @flow
916 def my_other_flow(name):
917 print(f"goodbye {name}")
919 if __name__ == "__main__":
920 hello_deploy = my_flow.to_deployment("hello", tags=["dev"])
921 bye_deploy = my_other_flow.to_deployment("goodbye", tags=["dev"])
922 serve(hello_deploy, bye_deploy)
923 ```
924 """
925 from prefect.deployments.runner import RunnerDeployment
927 if not name.endswith(".py"):
928 _raise_on_name_with_banned_characters(name)
930 if self._storage and self._entrypoint:
931 return cast(
932 RunnerDeployment,
933 RunnerDeployment.from_storage(
934 storage=self._storage,
935 entrypoint=self._entrypoint,
936 name=name,
937 flow_name=self.name,
938 interval=interval,
939 cron=cron,
940 rrule=rrule,
941 paused=paused,
942 schedule=schedule,
943 schedules=schedules,
944 concurrency_limit=concurrency_limit,
945 tags=tags,
946 triggers=triggers,
947 parameters=parameters or {},
948 description=description,
949 version=version,
950 version_type=version_type,
951 enforce_parameter_schema=enforce_parameter_schema,
952 work_pool_name=work_pool_name,
953 work_queue_name=work_queue_name,
954 job_variables=job_variables,
955 _sla=_sla,
956 _sync=True, # pyright: ignore[reportCallIssue] _sync is valid because .from_storage is decorated with async_dispatch
957 ),
958 )
959 else:
960 return RunnerDeployment.from_flow(
961 flow=self,
962 name=name,
963 interval=interval,
964 cron=cron,
965 rrule=rrule,
966 paused=paused,
967 schedule=schedule,
968 schedules=schedules,
969 concurrency_limit=concurrency_limit,
970 tags=tags,
971 triggers=triggers,
972 parameters=parameters or {},
973 description=description,
974 version=version,
975 version_type=version_type,
976 enforce_parameter_schema=enforce_parameter_schema,
977 work_pool_name=work_pool_name,
978 work_queue_name=work_queue_name,
979 job_variables=job_variables,
980 entrypoint_type=entrypoint_type,
981 _sla=_sla,
982 )
984 def on_completion(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a
985 self.on_completion_hooks.append(fn)
986 return fn
988 def on_cancellation(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a
989 self.on_cancellation_hooks.append(fn)
990 return fn
992 def on_crashed(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a
993 self.on_crashed_hooks.append(fn)
994 return fn
996 def on_running(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a
997 self.on_running_hooks.append(fn)
998 return fn
1000 def on_failure(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a
1001 self.on_failure_hooks.append(fn)
1002 return fn
1004 def serve( 1a
1005 self,
1006 name: Optional[str] = None,
1007 interval: Optional[
1008 Union[
1009 Iterable[Union[int, float, datetime.timedelta]],
1010 int,
1011 float,
1012 datetime.timedelta,
1013 ]
1014 ] = None,
1015 cron: Optional[Union[Iterable[str], str]] = None,
1016 rrule: Optional[Union[Iterable[str], str]] = None,
1017 paused: Optional[bool] = None,
1018 schedule: Optional[Schedule] = None,
1019 schedules: Optional["FlexibleScheduleList"] = None,
1020 global_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
1021 triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
1022 parameters: Optional[dict[str, Any]] = None,
1023 description: Optional[str] = None,
1024 tags: Optional[list[str]] = None,
1025 version: Optional[str] = None,
1026 enforce_parameter_schema: bool = True,
1027 pause_on_shutdown: bool = True,
1028 print_starting_message: bool = True,
1029 limit: Optional[int] = None,
1030 webserver: bool = False,
1031 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH,
1032 ) -> None:
1033 """
1034 Creates a deployment for this flow and starts a runner to monitor for scheduled work.
1036 Args:
1037 name: The name to give the created deployment. Defaults to the name of the flow.
1038 interval: An interval on which to execute the deployment. Accepts a number or a
1039 timedelta object to create a single schedule. If a number is given, it will be
1040 interpreted as seconds. Also accepts an iterable of numbers or timedelta to create
1041 multiple schedules.
1042 cron: A cron schedule string of when to execute runs of this deployment.
1043 Also accepts an iterable of cron schedule strings to create multiple schedules.
1044 rrule: An rrule schedule string of when to execute runs of this deployment.
1045 Also accepts an iterable of rrule schedule strings to create multiple schedules.
1046 triggers: A list of triggers that will kick off runs of this deployment.
1047 paused: Whether or not to set this deployment as paused.
1048 schedule: A schedule object defining when to execute runs of this deployment.
1049 Used to provide additional scheduling options like `timezone` or `parameters`.
1050 schedules: A list of schedule objects defining when to execute runs of this deployment.
1051 Used to define multiple schedules or additional scheduling options like `timezone`.
1052 global_limit: The maximum number of concurrent runs allowed across all served flow instances associated with the same deployment.
1053 parameters: A dictionary of default parameter values to pass to runs of this deployment.
1054 description: A description for the created deployment. Defaults to the flow's
1055 description if not provided.
1056 tags: A list of tags to associate with the created deployment for organizational
1057 purposes.
1058 version: A version for the created deployment. Defaults to the flow's version.
1059 enforce_parameter_schema: Whether or not the Prefect API should enforce the
1060 parameter schema for the created deployment.
1061 pause_on_shutdown: If True, provided schedule will be paused when the serve function is stopped.
1062 If False, the schedules will continue running.
1063 print_starting_message: Whether or not to print the starting message when flow is served.
1064 limit: The maximum number of runs that can be executed concurrently by the created runner; only applies to this served flow. To apply a limit across multiple served flows, use `global_limit`.
1065 webserver: Whether or not to start a monitoring webserver for this flow.
1066 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path
1067 entrypoint, ensure that the module will be importable in the execution environment.
1069 Examples:
1070 Serve a flow:
1072 ```python
1073 from prefect import flow
1075 @flow
1076 def my_flow(name):
1077 print(f"hello {name}")
1079 if __name__ == "__main__":
1080 my_flow.serve("example-deployment")
1081 ```
1083 Serve a flow and run it every hour:
1085 ```python
1086 from prefect import flow
1088 @flow
1089 def my_flow(name):
1090 print(f"hello {name}")
1092 if __name__ == "__main__":
1093 my_flow.serve("example-deployment", interval=3600)
1094 ```
1095 """
1096 from prefect.runner import Runner
1098 if not name:
1099 name = self.name
1100 else:
1101 # Only strip extension if it is a file path
1102 if (p := Path(name)).is_file():
1103 name = p.stem
1105 runner = Runner(name=name, pause_on_shutdown=pause_on_shutdown, limit=limit)
1106 deployment_id = runner.add_flow(
1107 self,
1108 name=name,
1109 triggers=triggers,
1110 interval=interval,
1111 cron=cron,
1112 rrule=rrule,
1113 paused=paused,
1114 schedule=schedule,
1115 schedules=schedules,
1116 concurrency_limit=global_limit,
1117 parameters=parameters,
1118 description=description,
1119 tags=tags,
1120 version=version,
1121 enforce_parameter_schema=enforce_parameter_schema,
1122 entrypoint_type=entrypoint_type,
1123 )
1124 if print_starting_message:
1125 help_message = (
1126 f"[green]Your flow {self.name!r} is being served and polling for"
1127 " scheduled runs!\n[/]\nTo trigger a run for this flow, use the"
1128 " following command:\n[blue]\n\t$ prefect deployment run"
1129 f" '{self.name}/{name}'\n[/]"
1130 )
1131 if PREFECT_UI_URL:
1132 help_message += (
1133 "\nYou can also run your flow via the Prefect UI:"
1134 f" [blue]{PREFECT_UI_URL.value()}/deployments/deployment/{deployment_id}[/]\n"
1135 )
1137 console = Console()
1138 console.print(help_message, soft_wrap=True)
1140 try:
1141 loop = asyncio.get_running_loop()
1142 except RuntimeError as exc:
1143 if "no running event loop" in str(exc):
1144 loop = None
1145 else:
1146 raise
1148 try:
1149 if loop is not None:
1150 loop.run_until_complete(runner.start(webserver=webserver))
1151 else:
1152 asyncio.run(runner.start(webserver=webserver))
1153 except (KeyboardInterrupt, TerminationSignal) as exc:
1154 logger.info(f"Received {type(exc).__name__}, shutting down...")
1155 if loop is not None:
1156 loop.stop()
1158 @classmethod 1a
1159 async def afrom_source( 1a
1160 cls,
1161 source: Union[str, Path, "RunnerStorage", ReadableDeploymentStorage],
1162 entrypoint: str,
1163 ) -> "Flow[..., Any]":
1164 """
1165 Loads a flow from a remote source asynchronously.
1167 Args:
1168 source: Either a URL to a git repository or a storage object.
1169 entrypoint: The path to a file containing a flow and the name of the flow function in
1170 the format `./path/to/file.py:flow_func_name`.
1172 Returns:
1173 A new `Flow` instance.
1175 Examples:
1176 Load a flow from a public git repository:
1179 ```python
1180 from prefect import flow
1181 from prefect.runner.storage import GitRepository
1182 from prefect.blocks.system import Secret
1184 my_flow = flow.from_source(
1185 source="https://github.com/org/repo.git",
1186 entrypoint="flows.py:my_flow",
1187 )
1189 my_flow()
1190 ```
1192 Load a flow from a private git repository using an access token stored in a `Secret` block:
1194 ```python
1195 from prefect import flow
1196 from prefect.runner.storage import GitRepository
1197 from prefect.blocks.system import Secret
1199 my_flow = flow.from_source(
1200 source=GitRepository(
1201 url="https://github.com/org/repo.git",
1202 credentials={"access_token": Secret.load("github-access-token")}
1203 ),
1204 entrypoint="flows.py:my_flow",
1205 )
1207 my_flow()
1208 ```
1210 Load a flow from a local directory:
1212 ``` python
1213 # from_local_source.py
1215 from pathlib import Path
1216 from prefect import flow
1218 @flow(log_prints=True)
1219 def my_flow(name: str = "world"):
1220 print(f"Hello {name}! I'm a flow from a Python script!")
1222 if __name__ == "__main__":
1223 my_flow.from_source(
1224 source=str(Path(__file__).parent),
1225 entrypoint="from_local_source.py:my_flow",
1226 ).deploy(
1227 name="my-deployment",
1228 parameters=dict(name="Marvin"),
1229 work_pool_name="local",
1230 )
1231 ```
1232 """
1234 from prefect.runner.storage import (
1235 BlockStorageAdapter,
1236 LocalStorage,
1237 RunnerStorage,
1238 create_storage_from_source,
1239 )
1241 if isinstance(source, (Path, str)):
1242 if isinstance(source, Path):
1243 source = str(source)
1244 storage = create_storage_from_source(source)
1245 elif isinstance(source, RunnerStorage):
1246 storage = source
1247 elif hasattr(source, "get_directory"):
1248 storage = BlockStorageAdapter(source)
1249 else:
1250 raise TypeError(
1251 f"Unsupported source type {type(source).__name__!r}. Please provide a"
1252 " URL to remote storage or a storage object."
1253 )
1254 with tempfile.TemporaryDirectory() as tmpdir:
1255 if not isinstance(storage, LocalStorage):
1256 storage.set_base_path(Path(tmpdir))
1257 await storage.pull_code()
1259 full_entrypoint = str(storage.destination / entrypoint)
1260 flow = cast(
1261 "Flow[..., Any]",
1262 await from_async.wait_for_call_in_new_thread(
1263 create_call(load_flow_from_entrypoint, full_entrypoint)
1264 ),
1265 )
1266 flow._storage = storage
1267 flow._entrypoint = entrypoint
1269 return flow
1271 @classmethod 1a
1272 @async_dispatch(afrom_source) 1a
1273 def from_source( 1a
1274 cls,
1275 source: Union[str, Path, "RunnerStorage", ReadableDeploymentStorage],
1276 entrypoint: str,
1277 ) -> "Flow[..., Any]":
1278 """
1279 Loads a flow from a remote source.
1281 Args:
1282 source: Either a URL to a git repository or a storage object.
1283 entrypoint: The path to a file containing a flow and the name of the flow function in
1284 the format `./path/to/file.py:flow_func_name`.
1286 Returns:
1287 A new `Flow` instance.
1289 Examples:
1290 Load a flow from a public git repository:
1293 ```python
1294 from prefect import flow
1295 from prefect.runner.storage import GitRepository
1296 from prefect.blocks.system import Secret
1298 my_flow = flow.from_source(
1299 source="https://github.com/org/repo.git",
1300 entrypoint="flows.py:my_flow",
1301 )
1303 my_flow()
1304 ```
1306 Load a flow from a private git repository using an access token stored in a `Secret` block:
1308 ```python
1309 from prefect import flow
1310 from prefect.runner.storage import GitRepository
1311 from prefect.blocks.system import Secret
1313 my_flow = flow.from_source(
1314 source=GitRepository(
1315 url="https://github.com/org/repo.git",
1316 credentials={"access_token": Secret.load("github-access-token")}
1317 ),
1318 entrypoint="flows.py:my_flow",
1319 )
1321 my_flow()
1322 ```
1324 Load a flow from a local directory:
1326 ``` python
1327 # from_local_source.py
1329 from pathlib import Path
1330 from prefect import flow
1332 @flow(log_prints=True)
1333 def my_flow(name: str = "world"):
1334 print(f"Hello {name}! I'm a flow from a Python script!")
1336 if __name__ == "__main__":
1337 my_flow.from_source(
1338 source=str(Path(__file__).parent),
1339 entrypoint="from_local_source.py:my_flow",
1340 ).deploy(
1341 name="my-deployment",
1342 parameters=dict(name="Marvin"),
1343 work_pool_name="local",
1344 )
1345 ```
1346 """
1348 from prefect.runner.storage import (
1349 BlockStorageAdapter,
1350 LocalStorage,
1351 RunnerStorage,
1352 create_storage_from_source,
1353 )
1355 if isinstance(source, (Path, str)):
1356 if isinstance(source, Path):
1357 source = str(source)
1358 storage = create_storage_from_source(source)
1359 elif isinstance(source, RunnerStorage):
1360 storage = source
1361 elif hasattr(source, "get_directory"):
1362 storage = BlockStorageAdapter(source)
1363 else:
1364 raise TypeError(
1365 f"Unsupported source type {type(source).__name__!r}. Please provide a"
1366 " URL to remote storage or a storage object."
1367 )
1368 with tempfile.TemporaryDirectory() as tmpdir:
1369 if not isinstance(storage, LocalStorage):
1370 storage.set_base_path(Path(tmpdir))
1371 run_coro_as_sync(storage.pull_code())
1373 full_entrypoint = str(storage.destination / entrypoint)
1374 flow = load_flow_from_entrypoint(full_entrypoint)
1375 flow._storage = storage
1376 flow._entrypoint = entrypoint
1378 return flow
1380 @sync_compatible 1a
1381 async def deploy( 1a
1382 self,
1383 name: str,
1384 work_pool_name: Optional[str] = None,
1385 image: Optional[Union[str, "DockerImage"]] = None,
1386 build: bool = True,
1387 push: bool = True,
1388 work_queue_name: Optional[str] = None,
1389 job_variables: Optional[dict[str, Any]] = None,
1390 interval: Optional[Union[int, float, datetime.timedelta]] = None,
1391 cron: Optional[str] = None,
1392 rrule: Optional[str] = None,
1393 paused: Optional[bool] = None,
1394 schedule: Optional[Schedule] = None,
1395 schedules: Optional[list[Schedule]] = None,
1396 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
1397 triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
1398 parameters: Optional[dict[str, Any]] = None,
1399 description: Optional[str] = None,
1400 tags: Optional[list[str]] = None,
1401 version: Optional[str] = None,
1402 version_type: Optional[VersionType] = None,
1403 enforce_parameter_schema: bool = True,
1404 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH,
1405 print_next_steps: bool = True,
1406 ignore_warnings: bool = False,
1407 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None,
1408 ) -> UUID:
1409 """
1410 Deploys a flow to run on dynamic infrastructure via a work pool.
1412 By default, calling this method will build a Docker image for the flow, push it to a registry,
1413 and create a deployment via the Prefect API that will run the flow on the given schedule.
1415 If you want to use an existing image, you can pass `build=False` to skip building and pushing
1416 an image.
1418 Args:
1419 name: The name to give the created deployment.
1420 work_pool_name: The name of the work pool to use for this deployment. Defaults to
1421 the value of `PREFECT_DEFAULT_WORK_POOL_NAME`.
1422 image: The name of the Docker image to build, including the registry and
1423 repository. Pass a DockerImage instance to customize the Dockerfile used
1424 and build arguments.
1425 build: Whether or not to build a new image for the flow. If False, the provided
1426 image will be used as-is and pulled at runtime.
1427 push: Whether or not to skip pushing the built image to a registry.
1428 work_queue_name: The name of the work queue to use for this deployment's scheduled runs.
1429 If not provided the default work queue for the work pool will be used.
1430 job_variables: Settings used to override the values specified default base job template
1431 of the chosen work pool. Refer to the base job template of the chosen work pool for
1432 available settings.
1433 interval: An interval on which to execute the deployment. Accepts a number or a
1434 timedelta object to create a single schedule. If a number is given, it will be
1435 interpreted as seconds. Also accepts an iterable of numbers or timedelta to create
1436 multiple schedules.
1437 cron: A cron schedule string of when to execute runs of this deployment.
1438 Also accepts an iterable of cron schedule strings to create multiple schedules.
1439 rrule: An rrule schedule string of when to execute runs of this deployment.
1440 Also accepts an iterable of rrule schedule strings to create multiple schedules.
1441 triggers: A list of triggers that will kick off runs of this deployment.
1442 paused: Whether or not to set this deployment as paused.
1443 schedule: A schedule object defining when to execute runs of this deployment.
1444 Used to provide additional scheduling options like `timezone` or `parameters`.
1445 schedules: A list of schedule objects defining when to execute runs of this deployment.
1446 Used to define multiple schedules or additional scheduling options like `timezone`.
1447 concurrency_limit: The maximum number of runs that can be executed concurrently.
1448 parameters: A dictionary of default parameter values to pass to runs of this deployment.
1449 description: A description for the created deployment. Defaults to the flow's
1450 description if not provided.
1451 tags: A list of tags to associate with the created deployment for organizational
1452 purposes.
1453 version: A version for the created deployment. Defaults to the flow's version.
1454 version_type: The type of version to use for the created deployment. The version type
1455 will be inferred if not provided.
1456 enforce_parameter_schema: Whether or not the Prefect API should enforce the
1457 parameter schema for the created deployment.
1458 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path
1459 entrypoint, ensure that the module will be importable in the execution environment.
1460 print_next_steps_message: Whether or not to print a message with next steps
1461 after deploying the deployments.
1462 ignore_warnings: Whether or not to ignore warnings about the work pool type.
1463 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
1464 Returns:
1465 The ID of the created/updated deployment.
1467 Examples:
1468 Deploy a local flow to a work pool:
1470 ```python
1471 from prefect import flow
1473 @flow
1474 def my_flow(name):
1475 print(f"hello {name}")
1477 if __name__ == "__main__":
1478 my_flow.deploy(
1479 "example-deployment",
1480 work_pool_name="my-work-pool",
1481 image="my-repository/my-image:dev",
1482 )
1483 ```
1485 Deploy a remotely stored flow to a work pool:
1487 ```python
1488 from prefect import flow
1490 if __name__ == "__main__":
1491 flow.from_source(
1492 source="https://github.com/org/repo.git",
1493 entrypoint="flows.py:my_flow",
1494 ).deploy(
1495 "example-deployment",
1496 work_pool_name="my-work-pool",
1497 image="my-repository/my-image:dev",
1498 )
1499 ```
1500 """
1501 if not (
1502 work_pool_name := work_pool_name or PREFECT_DEFAULT_WORK_POOL_NAME.value()
1503 ):
1504 raise ValueError(
1505 "No work pool name provided. Please provide a `work_pool_name` or set the"
1506 " `PREFECT_DEFAULT_WORK_POOL_NAME` environment variable."
1507 )
1509 from prefect.client.orchestration import get_client
1511 try:
1512 async with get_client() as client:
1513 work_pool = await client.read_work_pool(work_pool_name)
1514 active_workers = await client.read_workers_for_work_pool(
1515 work_pool_name,
1516 worker_filter=WorkerFilter(
1517 status=WorkerFilterStatus(any_=["ONLINE"])
1518 ),
1519 )
1520 except ObjectNotFound as exc:
1521 raise ValueError(
1522 f"Could not find work pool {work_pool_name!r}. Please create it before"
1523 " deploying this flow."
1524 ) from exc
1526 to_deployment_coro = self.to_deployment(
1527 name=name,
1528 interval=interval,
1529 cron=cron,
1530 rrule=rrule,
1531 schedule=schedule,
1532 schedules=schedules,
1533 concurrency_limit=concurrency_limit,
1534 paused=paused,
1535 triggers=triggers,
1536 parameters=parameters,
1537 description=description,
1538 tags=tags,
1539 version=version,
1540 version_type=version_type,
1541 enforce_parameter_schema=enforce_parameter_schema,
1542 work_queue_name=work_queue_name,
1543 job_variables=job_variables,
1544 entrypoint_type=entrypoint_type,
1545 _sla=_sla,
1546 )
1548 if inspect.isawaitable(to_deployment_coro):
1549 deployment = await to_deployment_coro
1550 else:
1551 deployment = to_deployment_coro
1553 from prefect.deployments.runner import deploy
1555 deploy_coro = deploy(
1556 deployment,
1557 work_pool_name=work_pool_name,
1558 image=image,
1559 build=build,
1560 push=push,
1561 print_next_steps_message=False,
1562 ignore_warnings=ignore_warnings,
1563 )
1564 if TYPE_CHECKING:
1565 assert inspect.isawaitable(deploy_coro)
1567 deployment_ids = await deploy_coro
1569 if print_next_steps:
1570 console = Console()
1571 if (
1572 not work_pool.is_push_pool
1573 and not work_pool.is_managed_pool
1574 and not active_workers
1575 ):
1576 console.print(
1577 "\nTo execute flow runs from this deployment, start a worker in a"
1578 " separate terminal that pulls work from the"
1579 f" {work_pool_name!r} work pool:"
1580 )
1581 console.print(
1582 f"\n\t$ prefect worker start --pool {work_pool_name!r}",
1583 style="blue",
1584 )
1585 console.print(
1586 "\nTo schedule a run for this deployment, use the following command:"
1587 )
1588 console.print(
1589 f"\n\t$ prefect deployment run '{self.name}/{name}'\n",
1590 style="blue",
1591 )
1592 if PREFECT_UI_URL:
1593 message = (
1594 "\nYou can also run your flow via the Prefect UI:"
1595 f" [blue]{PREFECT_UI_URL.value()}/deployments/deployment/{deployment_ids[0]}[/]\n"
1596 )
1597 console.print(message, soft_wrap=True)
1599 return deployment_ids[0]
1601 @overload 1a
1602 def __call__(self: "Flow[P, NoReturn]", *args: P.args, **kwargs: P.kwargs) -> None: 1a
1603 # `NoReturn` matches if a type can't be inferred for the function which stops a
1604 # sync function from matching the `Coroutine` overload
1605 ...
1607 @overload 1a
1608 def __call__( 1608 ↛ exitline 1608 didn't return from function '__call__' because 1a
1609 self: "Flow[P, Coroutine[Any, Any, T]]", *args: P.args, **kwargs: P.kwargs
1610 ) -> Coroutine[Any, Any, T]: ...
1612 @overload 1a
1613 def __call__( 1613 ↛ exitline 1613 didn't return from function '__call__' because 1a
1614 self: "Flow[P, T]",
1615 *args: P.args,
1616 **kwargs: P.kwargs,
1617 ) -> T: ...
1619 @overload 1a
1620 def __call__( 1620 ↛ exitline 1620 didn't return from function '__call__' because 1a
1621 self: "Flow[P, Coroutine[Any, Any, T]]",
1622 *args: P.args,
1623 return_state: Literal[True],
1624 **kwargs: P.kwargs,
1625 ) -> Awaitable[State[T]]: ...
1627 @overload 1a
1628 def __call__( 1628 ↛ exitline 1628 didn't return from function '__call__' because 1a
1629 self: "Flow[P, T]",
1630 *args: P.args,
1631 return_state: Literal[True],
1632 **kwargs: P.kwargs,
1633 ) -> State[T]: ...
1635 def __call__( 1a
1636 self,
1637 *args: "P.args",
1638 return_state: bool = False,
1639 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None,
1640 **kwargs: "P.kwargs",
1641 ):
1642 """
1643 Run the flow and return its result.
1646 Flow parameter values must be serializable by Pydantic.
1648 If writing an async flow, this call must be awaited.
1650 This will create a new flow run in the API.
1652 Args:
1653 *args: Arguments to run the flow with.
1654 return_state: Return a Prefect State containing the result of the
1655 flow run.
1656 wait_for: Upstream task futures to wait for before starting the flow if called as a subflow
1657 **kwargs: Keyword arguments to run the flow with.
1659 Returns:
1660 If `return_state` is False, returns the result of the flow run.
1661 If `return_state` is True, returns the result of the flow run
1662 wrapped in a Prefect State which provides error handling.
1664 Examples:
1666 Define a flow
1668 ```python
1669 @flow
1670 def my_flow(name):
1671 print(f"hello {name}")
1672 return f"goodbye {name}"
1673 ```
1675 Run a flow
1677 ```python
1678 my_flow("marvin")
1679 ```
1681 Run a flow with additional tags
1683 ```python
1684 from prefect import tags
1686 with tags("db", "blue"):
1687 my_flow("foo")
1688 ```
1689 """
1690 from prefect.utilities.visualization import (
1691 get_task_viz_tracker,
1692 track_viz_task,
1693 )
1695 # Convert the call args/kwargs to a parameter dict
1696 parameters = get_call_parameters(self.fn, args, kwargs)
1698 return_type = "state" if return_state else "result"
1700 task_viz_tracker = get_task_viz_tracker()
1701 if task_viz_tracker:
1702 # this is a subflow, for now return a single task and do not go further
1703 # we can add support for exploring subflows for tasks in the future.
1704 return track_viz_task(self.isasync, self.name, parameters)
1706 from prefect.flow_engine import run_flow
1708 return run_flow(
1709 flow=self,
1710 parameters=parameters,
1711 wait_for=wait_for,
1712 return_type=return_type,
1713 )
1715 @sync_compatible 1a
1716 async def visualize(self, *args: "P.args", **kwargs: "P.kwargs"): 1a
1717 """
1718 Generates a graphviz object representing the current flow. In IPython notebooks,
1719 it's rendered inline, otherwise in a new window as a PNG.
1721 Raises:
1722 - ImportError: If `graphviz` isn't installed.
1723 - GraphvizExecutableNotFoundError: If the `dot` executable isn't found.
1724 - FlowVisualizationError: If the flow can't be visualized for any other reason.
1725 """
1726 from prefect.utilities.visualization import (
1727 FlowVisualizationError,
1728 GraphvizExecutableNotFoundError,
1729 GraphvizImportError,
1730 TaskVizTracker,
1731 VisualizationUnsupportedError,
1732 build_task_dependencies,
1733 visualize_task_dependencies,
1734 )
1736 if not PREFECT_TESTING_UNIT_TEST_MODE:
1737 warnings.warn(
1738 "`flow.visualize()` will execute code inside of your flow that is not"
1739 " decorated with `@task` or `@flow`."
1740 )
1742 try:
1743 with TaskVizTracker() as tracker:
1744 if self.isasync:
1745 await self.fn(*args, **kwargs) # type: ignore[reportGeneralTypeIssues]
1746 else:
1747 self.fn(*args, **kwargs)
1749 graph = build_task_dependencies(tracker)
1751 visualize_task_dependencies(graph, self.name)
1753 except GraphvizImportError:
1754 raise
1755 except GraphvizExecutableNotFoundError:
1756 raise
1757 except VisualizationUnsupportedError:
1758 raise
1759 except FlowVisualizationError:
1760 raise
1761 except Exception as e:
1762 msg = (
1763 "It's possible you are trying to visualize a flow that contains "
1764 "code that directly interacts with the result of a task"
1765 " inside of the flow. \nTry passing a `viz_return_value` "
1766 "to the task decorator, e.g. `@task(viz_return_value=[1, 2, 3]).`"
1767 )
1769 new_exception = type(e)(str(e) + "\n" + msg)
1770 # Copy traceback information from the original exception
1771 new_exception.__traceback__ = e.__traceback__
1772 raise new_exception
1775class FlowDecorator: 1a
1776 @overload 1a
1777 def __call__(self, __fn: Callable[P, R]) -> Flow[P, R]: ... 1777 ↛ exitline 1777 didn't return from function '__call__' because 1a
1779 @overload 1a
1780 def __call__( 1780 ↛ exitline 1780 didn't return from function '__call__' because 1a
1781 self,
1782 __fn: None = None,
1783 *,
1784 name: Optional[str] = None,
1785 version: Optional[str] = None,
1786 flow_run_name: Optional[Union[Callable[[], str], str]] = None,
1787 retries: Optional[int] = None,
1788 retry_delay_seconds: Optional[Union[int, float]] = None,
1789 task_runner: None = None,
1790 description: Optional[str] = None,
1791 timeout_seconds: Union[int, float, None] = None,
1792 validate_parameters: bool = True,
1793 persist_result: Optional[bool] = None,
1794 result_storage: Optional[ResultStorage] = None,
1795 result_serializer: Optional[ResultSerializer] = None,
1796 cache_result_in_memory: bool = True,
1797 log_prints: Optional[bool] = None,
1798 on_completion: Optional[list[FlowStateHook[..., Any]]] = None,
1799 on_failure: Optional[list[FlowStateHook[..., Any]]] = None,
1800 on_cancellation: Optional[list[FlowStateHook[..., Any]]] = None,
1801 on_crashed: Optional[list[FlowStateHook[..., Any]]] = None,
1802 on_running: Optional[list[FlowStateHook[..., Any]]] = None,
1803 ) -> Callable[[Callable[P, R]], Flow[P, R]]: ...
1805 @overload 1a
1806 def __call__( 1806 ↛ exitline 1806 didn't return from function '__call__' because 1a
1807 self,
1808 __fn: None = None,
1809 *,
1810 name: Optional[str] = None,
1811 version: Optional[str] = None,
1812 flow_run_name: Optional[Union[Callable[[], str], str]] = None,
1813 retries: Optional[int] = None,
1814 retry_delay_seconds: Optional[Union[int, float]] = None,
1815 task_runner: Optional[TaskRunner[PrefectFuture[Any]]] = None,
1816 description: Optional[str] = None,
1817 timeout_seconds: Union[int, float, None] = None,
1818 validate_parameters: bool = True,
1819 persist_result: Optional[bool] = None,
1820 result_storage: Optional[ResultStorage] = None,
1821 result_serializer: Optional[ResultSerializer] = None,
1822 cache_result_in_memory: bool = True,
1823 log_prints: Optional[bool] = None,
1824 on_completion: Optional[list[FlowStateHook[..., Any]]] = None,
1825 on_failure: Optional[list[FlowStateHook[..., Any]]] = None,
1826 on_cancellation: Optional[list[FlowStateHook[..., Any]]] = None,
1827 on_crashed: Optional[list[FlowStateHook[..., Any]]] = None,
1828 on_running: Optional[list[FlowStateHook[..., Any]]] = None,
1829 ) -> Callable[[Callable[P, R]], Flow[P, R]]: ...
1831 def __call__( 1a
1832 self,
1833 __fn: Optional[Callable[P, R]] = None,
1834 *,
1835 name: Optional[str] = None,
1836 version: Optional[str] = None,
1837 flow_run_name: Optional[Union[Callable[[], str], str]] = None,
1838 retries: Optional[int] = None,
1839 retry_delay_seconds: Union[int, float, None] = None,
1840 task_runner: Optional[TaskRunner[PrefectFuture[Any]]] = None,
1841 description: Optional[str] = None,
1842 timeout_seconds: Union[int, float, None] = None,
1843 validate_parameters: bool = True,
1844 persist_result: Optional[bool] = None,
1845 result_storage: Optional[ResultStorage] = None,
1846 result_serializer: Optional[ResultSerializer] = None,
1847 cache_result_in_memory: bool = True,
1848 log_prints: Optional[bool] = None,
1849 on_completion: Optional[list[FlowStateHook[..., Any]]] = None,
1850 on_failure: Optional[list[FlowStateHook[..., Any]]] = None,
1851 on_cancellation: Optional[list[FlowStateHook[..., Any]]] = None,
1852 on_crashed: Optional[list[FlowStateHook[..., Any]]] = None,
1853 on_running: Optional[list[FlowStateHook[..., Any]]] = None,
1854 ) -> Union[Flow[P, R], Callable[[Callable[P, R]], Flow[P, R]]]:
1855 """
1856 Decorator to designate a function as a Prefect workflow.
1858 This decorator may be used for asynchronous or synchronous functions.
1860 Flow parameters must be serializable by Pydantic.
1862 Args:
1863 name: An optional name for the flow; if not provided, the name will be inferred
1864 from the given function.
1865 version: An optional version string for the flow; if not provided, we will
1866 attempt to create a version string as a hash of the file containing the
1867 wrapped function; if the file cannot be located, the version will be null.
1868 flow_run_name: An optional name to distinguish runs of this flow; this name can
1869 be provided as a string template with the flow's parameters as variables,
1870 or a function that returns a string.
1871 retries: An optional number of times to retry on flow run failure.
1872 retry_delay_seconds: An optional number of seconds to wait before retrying the
1873 flow after failure. This is only applicable if `retries` is nonzero.
1874 task_runner: An optional task runner to use for task execution within the flow; if
1875 not provided, a `ConcurrentTaskRunner` will be instantiated.
1876 description: An optional string description for the flow; if not provided, the
1877 description will be pulled from the docstring for the decorated function.
1878 timeout_seconds: An optional number of seconds indicating a maximum runtime for
1879 the flow. If the flow exceeds this runtime, it will be marked as failed.
1880 Flow execution may continue until the next task is called.
1881 validate_parameters: By default, parameters passed to flows are validated by
1882 Pydantic. This will check that input values conform to the annotated types
1883 on the function. Where possible, values will be coerced into the correct
1884 type; for example, if a parameter is defined as `x: int` and "5" is passed,
1885 it will be resolved to `5`. If set to `False`, no validation will be
1886 performed on flow parameters.
1887 persist_result: An optional toggle indicating whether the result of this flow
1888 should be persisted to result storage. Defaults to `None`, which indicates
1889 that Prefect should choose whether the result should be persisted depending on
1890 the features being used.
1891 result_storage: An optional block to use to persist the result of this flow.
1892 This value will be used as the default for any tasks in this flow.
1893 If not provided, the local file system will be used unless called as
1894 a subflow, at which point the default will be loaded from the parent flow.
1895 result_serializer: An optional serializer to use to serialize the result of this
1896 flow for persistence. This value will be used as the default for any tasks
1897 in this flow. If not provided, the value of `PREFECT_RESULTS_DEFAULT_SERIALIZER`
1898 will be used unless called as a subflow, at which point the default will be
1899 loaded from the parent flow.
1900 cache_result_in_memory: An optional toggle indicating whether the cached result of
1901 a running the flow should be stored in memory. Defaults to `True`.
1902 log_prints: If set, `print` statements in the flow will be redirected to the
1903 Prefect logger for the flow run. Defaults to `None`, which indicates that
1904 the value from the parent flow should be used. If this is a parent flow,
1905 the default is pulled from the `PREFECT_LOGGING_LOG_PRINTS` setting.
1906 on_completion: An optional list of functions to call when the flow run is
1907 completed. Each function should accept three arguments: the flow, the flow
1908 run, and the final state of the flow run.
1909 on_failure: An optional list of functions to call when the flow run fails. Each
1910 function should accept three arguments: the flow, the flow run, and the
1911 final state of the flow run.
1912 on_cancellation: An optional list of functions to call when the flow run is
1913 cancelled. These functions will be passed the flow, flow run, and final state.
1914 on_crashed: An optional list of functions to call when the flow run crashes. Each
1915 function should accept three arguments: the flow, the flow run, and the
1916 final state of the flow run.
1917 on_running: An optional list of functions to call when the flow run is started. Each
1918 function should accept three arguments: the flow, the flow run, and the current state
1920 Returns:
1921 A callable `Flow` object which, when called, will run the flow and return its
1922 final state.
1924 Examples:
1925 Define a simple flow
1927 ```python
1928 from prefect import flow
1930 @flow
1931 def add(x, y):
1932 return x + y
1933 ```
1935 Define an async flow
1937 ```python
1938 @flow
1939 async def add(x, y):
1940 return x + y
1941 ```
1943 Define a flow with a version and description
1945 ```python
1946 @flow(version="first-flow", description="This flow is empty!")
1947 def my_flow():
1948 pass
1949 ```
1951 Define a flow with a custom name
1953 ```python
1954 @flow(name="The Ultimate Flow")
1955 def my_flow():
1956 pass
1957 ```
1959 Define a flow that submits its tasks to dask
1961 ```python
1962 from prefect_dask.task_runners import DaskTaskRunner
1964 @flow(task_runner=DaskTaskRunner)
1965 def my_flow():
1966 pass
1967 ```
1968 """
1969 if __fn: 1969 ↛ 1993line 1969 didn't jump to line 1993 because the condition on line 1969 was always true1a
1970 return Flow( 1a
1971 fn=__fn,
1972 name=name,
1973 version=version,
1974 flow_run_name=flow_run_name,
1975 task_runner=task_runner,
1976 description=description,
1977 timeout_seconds=timeout_seconds,
1978 validate_parameters=validate_parameters,
1979 retries=retries,
1980 retry_delay_seconds=retry_delay_seconds,
1981 persist_result=persist_result,
1982 result_storage=result_storage,
1983 result_serializer=result_serializer,
1984 cache_result_in_memory=cache_result_in_memory,
1985 log_prints=log_prints,
1986 on_completion=on_completion,
1987 on_failure=on_failure,
1988 on_cancellation=on_cancellation,
1989 on_crashed=on_crashed,
1990 on_running=on_running,
1991 )
1992 else:
1993 return cast(
1994 Callable[[Callable[P, R]], Flow[P, R]],
1995 partial(
1996 flow,
1997 name=name,
1998 version=version,
1999 flow_run_name=flow_run_name,
2000 task_runner=task_runner,
2001 description=description,
2002 timeout_seconds=timeout_seconds,
2003 validate_parameters=validate_parameters,
2004 retries=retries,
2005 retry_delay_seconds=retry_delay_seconds,
2006 persist_result=persist_result,
2007 result_storage=result_storage,
2008 result_serializer=result_serializer,
2009 cache_result_in_memory=cache_result_in_memory,
2010 log_prints=log_prints,
2011 on_completion=on_completion,
2012 on_failure=on_failure,
2013 on_cancellation=on_cancellation,
2014 on_crashed=on_crashed,
2015 on_running=on_running,
2016 ),
2017 )
2019 if not TYPE_CHECKING: 2019 ↛ 2025line 2019 didn't jump to line 2025 because the condition on line 2019 was always true1a
2020 # Add from_source so it is available on the flow function we all know and love
2021 from_source = staticmethod(Flow.from_source) 1a
2022 else:
2023 # Mypy loses the plot somewhere along the line, so the annotation is reconstructed
2024 # manually here.
2025 @staticmethod
2026 def from_source(
2027 source: Union[str, Path, "RunnerStorage", ReadableDeploymentStorage],
2028 entrypoint: str,
2029 ) -> Union["Flow[..., Any]", Coroutine[Any, Any, "Flow[..., Any]"]]: ...
2032flow: FlowDecorator = FlowDecorator() 1a
2035class InfrastructureBoundFlow(Flow[P, R]): 1a
2036 """
2037 EXPERIMENTAL: This class is experimental and may be removed or changed in future
2038 releases.
2040 A flow that is bound to running on a specific infrastructure.
2042 Attributes:
2043 work_pool: The name of the work pool to run the flow on. The base job
2044 configuration of the work pool will determine the configuration of the
2045 infrastructure the flow will run on.
2046 job_variables: Infrastructure configuration that will override the base job
2047 configuration of the work pool.
2048 worker_cls: The class of the worker to use to spin up infrastructure and submit
2049 the flow to it.
2050 """
2052 def __init__( 1a
2053 self,
2054 *args: Any,
2055 work_pool: str,
2056 job_variables: dict[str, Any],
2057 worker_cls: type["BaseWorker[Any, Any, Any]"],
2058 **kwargs: Any,
2059 ):
2060 super().__init__(*args, **kwargs)
2061 self.work_pool = work_pool
2062 self.job_variables = job_variables
2063 self.worker_cls = worker_cls
2065 @overload 1a
2066 def __call__(self: "Flow[P, NoReturn]", *args: P.args, **kwargs: P.kwargs) -> None: 1a
2067 # `NoReturn` matches if a type can't be inferred for the function which stops a
2068 # sync function from matching the `Coroutine` overload
2069 ...
2071 @overload 1a
2072 def __call__( 2072 ↛ exitline 2072 didn't return from function '__call__' because 1a
2073 self: "Flow[P, Coroutine[Any, Any, T]]",
2074 *args: P.args,
2075 **kwargs: P.kwargs,
2076 ) -> Coroutine[Any, Any, T]: ...
2078 @overload 1a
2079 def __call__( 2079 ↛ exitline 2079 didn't return from function '__call__' because 1a
2080 self: "Flow[P, T]",
2081 *args: P.args,
2082 **kwargs: P.kwargs,
2083 ) -> T: ...
2085 @overload 1a
2086 def __call__( 2086 ↛ exitline 2086 didn't return from function '__call__' because 1a
2087 self: "Flow[P, Coroutine[Any, Any, T]]",
2088 *args: P.args,
2089 return_state: Literal[True],
2090 **kwargs: P.kwargs,
2091 ) -> Awaitable[State[T]]: ...
2093 @overload 1a
2094 def __call__( 2094 ↛ exitline 2094 didn't return from function '__call__' because 1a
2095 self: "Flow[P, T]",
2096 *args: P.args,
2097 return_state: Literal[True],
2098 **kwargs: P.kwargs,
2099 ) -> State[T]: ...
2101 def __call__( 1a
2102 self,
2103 *args: "P.args",
2104 return_state: bool = False,
2105 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None,
2106 **kwargs: "P.kwargs",
2107 ):
2108 async def modified_call(
2109 *args: P.args,
2110 return_state: bool = False,
2111 # TODO: Handle wait_for once we have an asynchronous way to wait for futures
2112 # We should wait locally for futures to resolve before spinning up
2113 # infrastructure.
2114 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None,
2115 **kwargs: P.kwargs,
2116 ) -> R | State[R]:
2117 try:
2118 async with self.worker_cls(work_pool_name=self.work_pool) as worker:
2119 parameters = get_call_parameters(self, args, kwargs)
2120 future = await worker.submit(
2121 flow=self,
2122 parameters=parameters,
2123 job_variables=self.job_variables,
2124 )
2125 if return_state:
2126 await future.wait_async()
2127 return future.state
2128 return await future.aresult()
2129 except (ExceptionGroup, BaseExceptionGroup) as exc:
2130 # For less verbose tracebacks
2131 exceptions = exc.exceptions
2132 if len(exceptions) == 1:
2133 raise exceptions[0] from None
2134 else:
2135 raise
2137 if inspect.iscoroutinefunction(self.fn):
2138 return modified_call(
2139 *args, return_state=return_state, wait_for=wait_for, **kwargs
2140 )
2141 else:
2142 return run_coro_as_sync(
2143 modified_call(
2144 *args,
2145 return_state=return_state,
2146 wait_for=wait_for,
2147 **kwargs,
2148 )
2149 )
2151 def submit(self, *args: P.args, **kwargs: P.kwargs) -> PrefectFlowRunFuture[R]: 1a
2152 """
2153 EXPERIMENTAL: This method is experimental and may be removed or changed in future
2154 releases.
2156 Submit the flow to run on remote infrastructure.
2158 This method will spin up a local worker to submit the flow to remote infrastructure. To
2159 submit the flow to remote infrastructure without spinning up a local worker, use
2160 `submit_to_work_pool` instead.
2162 Args:
2163 *args: Positional arguments to pass to the flow.
2164 **kwargs: Keyword arguments to pass to the flow.
2166 Returns:
2167 A `PrefectFlowRunFuture` that can be used to retrieve the result of the flow run.
2169 Examples:
2170 Submit a flow to run on Kubernetes:
2172 ```python
2173 from prefect import flow
2174 from prefect_kubernetes.experimental import kubernetes
2176 @kubernetes(work_pool="my-kubernetes-work-pool")
2177 @flow
2178 def my_flow(x: int, y: int):
2179 return x + y
2181 future = my_flow.submit(x=1, y=2)
2182 result = future.result()
2183 print(result)
2184 ```
2185 """
2187 async def submit_func():
2188 async with self.worker_cls(work_pool_name=self.work_pool) as worker:
2189 parameters = get_call_parameters(self, args, kwargs)
2190 return await worker.submit(
2191 flow=self,
2192 parameters=parameters,
2193 job_variables=self.job_variables,
2194 )
2196 return run_coro_as_sync(submit_func())
2198 def submit_to_work_pool( 1a
2199 self, *args: P.args, **kwargs: P.kwargs
2200 ) -> PrefectFlowRunFuture[R]:
2201 """
2202 EXPERIMENTAL: This method is experimental and may be removed or changed in future
2203 releases.
2205 Submits the flow to run on remote infrastructure.
2207 This method will create a flow run for an existing worker to submit to remote infrastructure.
2208 If you don't have a worker available, use `submit` instead.
2210 Args:
2211 *args: Positional arguments to pass to the flow.
2212 **kwargs: Keyword arguments to pass to the flow.
2214 Returns:
2215 A `PrefectFlowRunFuture` that can be used to retrieve the result of the flow run.
2217 Examples:
2218 Dispatch a flow to run on Kubernetes:
2220 ```python
2221 from prefect import flow
2222 from prefect_kubernetes.experimental import kubernetes
2224 @kubernetes(work_pool="my-kubernetes-work-pool")
2225 @flow
2226 def my_flow(x: int, y: int):
2227 return x + y
2229 future = my_flow.submit_to_work_pool(x=1, y=2)
2230 result = future.result()
2231 print(result)
2232 ```
2233 """
2234 warnings.warn(
2235 "Dispatching flows to remote infrastructure is experimental. The interface "
2236 "and behavior of this method are subject to change.",
2237 category=FutureWarning,
2238 )
2239 from prefect import get_client
2240 from prefect._experimental.bundles import (
2241 convert_step_to_command,
2242 create_bundle_for_flow_run,
2243 upload_bundle_to_storage,
2244 )
2245 from prefect.context import FlowRunContext, TagsContext
2246 from prefect.results import get_result_store, resolve_result_storage
2247 from prefect.states import Pending, Scheduled
2248 from prefect.tasks import Task
2250 # Get parameters to error early if they are invalid
2251 parameters = get_call_parameters(self, args, kwargs)
2253 with get_client(sync_client=True) as client:
2254 work_pool = client.read_work_pool(self.work_pool)
2256 if (
2257 work_pool.storage_configuration.bundle_upload_step is None
2258 or work_pool.storage_configuration.bundle_execution_step is None
2259 ):
2260 raise RuntimeError(
2261 f"Storage is not configured for work pool {work_pool.name!r}. "
2262 "Please configure storage for the work pool by running `prefect "
2263 "work-pool storage configure`."
2264 )
2266 current_result_store = get_result_store()
2267 # Check result storage and use the work pool default if needed
2268 if (
2269 current_result_store.result_storage is None
2270 or isinstance(current_result_store.result_storage, LocalFileSystem)
2271 and self.result_storage is None
2272 ):
2273 if (
2274 work_pool.storage_configuration.default_result_storage_block_id
2275 is None
2276 ):
2277 logger.warning(
2278 f"Flow {self.name!r} has no result storage configured. Please configure "
2279 "result storage for the flow if you want to retrieve the result for the flow run."
2280 )
2281 else:
2282 # Use the work pool's default result storage block for the flow run to ensure the caller can retrieve the result
2283 flow = self.with_options(
2284 result_storage=resolve_result_storage(
2285 work_pool.storage_configuration.default_result_storage_block_id,
2286 _sync=True,
2287 ),
2288 persist_result=True,
2289 )
2290 else:
2291 flow = self
2293 bundle_key = str(uuid.uuid4())
2294 upload_command = convert_step_to_command(
2295 work_pool.storage_configuration.bundle_upload_step,
2296 bundle_key,
2297 quiet=True,
2298 )
2299 execute_command = convert_step_to_command(
2300 work_pool.storage_configuration.bundle_execution_step, bundle_key
2301 )
2303 job_variables = (self.job_variables or {}) | {
2304 "command": " ".join(execute_command)
2305 }
2307 # Create a parent task run if this is a child flow run to ensure it shows up as a child flow in the UI
2308 parent_task_run = None
2309 if flow_run_ctx := FlowRunContext.get():
2310 parent_task = Task[Any, Any](
2311 name=flow.name,
2312 fn=flow.fn,
2313 version=flow.version,
2314 )
2315 parent_task_run = run_coro_as_sync(
2316 parent_task.create_run(
2317 flow_run_context=flow_run_ctx,
2318 parameters=parameters,
2319 )
2320 )
2322 flow_run = client.create_flow_run(
2323 flow,
2324 parameters=flow.serialize_parameters(parameters),
2325 # Start out in pending to prevent a worker from starting the flow run before the bundle is uploaded
2326 state=Pending(),
2327 job_variables=job_variables,
2328 work_pool_name=work_pool.name,
2329 tags=TagsContext.get().current_tags,
2330 parent_task_run_id=getattr(parent_task_run, "id", None),
2331 )
2333 bundle = create_bundle_for_flow_run(flow=flow, flow_run=flow_run)
2334 upload_bundle_to_storage(bundle, bundle_key, upload_command)
2336 # Set flow run to scheduled now that the bundle is uploaded and ready to be executed
2337 client.set_flow_run_state(flow_run.id, state=Scheduled())
2339 # TODO: It'd be nice to be able to return the future sooner
2340 return PrefectFlowRunFuture(flow_run_id=flow_run.id)
2342 def with_options( 1a
2343 self,
2344 *,
2345 name: Optional[str] = None,
2346 version: Optional[str] = None,
2347 retries: Optional[int] = None,
2348 retry_delay_seconds: Optional[Union[int, float]] = None,
2349 description: Optional[str] = None,
2350 flow_run_name: Optional[Union[Callable[[], str], str]] = None,
2351 task_runner: Union[
2352 Type[TaskRunner[PrefectFuture[Any]]], TaskRunner[PrefectFuture[Any]], None
2353 ] = None,
2354 timeout_seconds: Union[int, float, None] = None,
2355 validate_parameters: Optional[bool] = None,
2356 persist_result: Optional[bool] = NotSet, # type: ignore
2357 result_storage: Optional[ResultStorage] = NotSet, # type: ignore
2358 result_serializer: Optional[ResultSerializer] = NotSet, # type: ignore
2359 cache_result_in_memory: Optional[bool] = None,
2360 log_prints: Optional[bool] = NotSet, # type: ignore
2361 on_completion: Optional[list[FlowStateHook[P, R]]] = None,
2362 on_failure: Optional[list[FlowStateHook[P, R]]] = None,
2363 on_cancellation: Optional[list[FlowStateHook[P, R]]] = None,
2364 on_crashed: Optional[list[FlowStateHook[P, R]]] = None,
2365 on_running: Optional[list[FlowStateHook[P, R]]] = None,
2366 job_variables: Optional[dict[str, Any]] = None,
2367 ) -> "InfrastructureBoundFlow[P, R]":
2368 new_flow = super().with_options(
2369 name=name,
2370 version=version,
2371 retries=retries,
2372 retry_delay_seconds=retry_delay_seconds,
2373 description=description,
2374 flow_run_name=flow_run_name,
2375 task_runner=task_runner,
2376 timeout_seconds=timeout_seconds,
2377 validate_parameters=validate_parameters,
2378 persist_result=persist_result,
2379 result_storage=result_storage,
2380 result_serializer=result_serializer,
2381 cache_result_in_memory=cache_result_in_memory,
2382 log_prints=log_prints,
2383 on_completion=on_completion,
2384 on_failure=on_failure,
2385 on_cancellation=on_cancellation,
2386 on_crashed=on_crashed,
2387 on_running=on_running,
2388 )
2389 new_infrastructure_bound_flow = bind_flow_to_infrastructure(
2390 new_flow,
2391 self.work_pool,
2392 self.worker_cls,
2393 job_variables=job_variables
2394 if job_variables is not None
2395 else self.job_variables,
2396 )
2397 return new_infrastructure_bound_flow
2400def bind_flow_to_infrastructure( 1a
2401 flow: Flow[P, R],
2402 work_pool: str,
2403 worker_cls: type["BaseWorker[Any, Any, Any]"],
2404 job_variables: dict[str, Any] | None = None,
2405) -> InfrastructureBoundFlow[P, R]:
2406 new = InfrastructureBoundFlow[P, R](
2407 flow.fn,
2408 work_pool=work_pool,
2409 job_variables=job_variables or {},
2410 worker_cls=worker_cls,
2411 )
2412 # Copy all attributes from the original flow
2413 for attr, value in flow.__dict__.items():
2414 setattr(new, attr, value)
2415 return new
2418def _raise_on_name_with_banned_characters(name: Optional[str]) -> Optional[str]: 1a
2419 """
2420 Raise an InvalidNameError if the given name contains any invalid
2421 characters.
2422 """
2423 if name is None: 2423 ↛ 2424line 2423 didn't jump to line 2424 because the condition on line 2423 was never true1a
2424 return name
2426 if not re.match(WITHOUT_BANNED_CHARACTERS, name): 2426 ↛ 2427line 2426 didn't jump to line 2427 because the condition on line 2426 was never true1a
2427 raise InvalidNameError(
2428 f"Name {name!r} contains an invalid character. "
2429 f"Must not contain any of: {BANNED_CHARACTERS}."
2430 )
2432 return name 1a
2435def select_flow( 1a
2436 flows: Iterable[Flow[P, R]],
2437 flow_name: Optional[str] = None,
2438 from_message: Optional[str] = None,
2439) -> Flow[P, R]:
2440 """
2441 Select the only flow in an iterable or a flow specified by name.
2443 Returns
2444 A single flow object
2446 Raises:
2447 MissingFlowError: If no flows exist in the iterable
2448 MissingFlowError: If a flow name is provided and that flow does not exist
2449 UnspecifiedFlowError: If multiple flows exist but no flow name was provided
2450 """
2451 # Convert to flows by name
2452 flows_dict = {f.name: f for f in flows}
2454 # Add a leading space if given, otherwise use an empty string
2455 from_message = (" " + from_message) if from_message else ""
2456 if not Optional:
2457 raise MissingFlowError(f"No flows found{from_message}.")
2459 elif flow_name and flow_name not in flows_dict:
2460 raise MissingFlowError(
2461 f"Flow {flow_name!r} not found{from_message}. "
2462 f"Found the following flows: {listrepr(flows_dict.keys())}. "
2463 "Check to make sure that your flow function is decorated with `@flow`."
2464 )
2466 elif not flow_name and len(flows_dict) > 1:
2467 raise UnspecifiedFlowError(
2468 (
2469 f"Found {len(flows_dict)} flows{from_message}:"
2470 f" {listrepr(sorted(flows_dict.keys()))}. Specify a flow name to select a"
2471 " flow."
2472 ),
2473 )
2475 if flow_name:
2476 return flows_dict[flow_name]
2477 else:
2478 return list(flows_dict.values())[0]
2481def load_flow_from_entrypoint( 1a
2482 entrypoint: str,
2483 use_placeholder_flow: bool = True,
2484) -> Flow[P, Any]:
2485 """
2486 Extract a flow object from a script at an entrypoint by running all of the code in the file.
2488 Args:
2489 entrypoint: a string in the format `<path_to_script>:<flow_func_name>`
2490 or a string in the format `<path_to_script>:<class_name>.<flow_method_name>`
2491 or a module path to a flow function
2492 use_placeholder_flow: if True, use a placeholder Flow object if the actual flow object
2493 cannot be loaded from the entrypoint (e.g. dependencies are missing)
2495 Returns:
2496 The flow object from the script
2498 Raises:
2499 ScriptError: If an exception is encountered while running the script
2500 MissingFlowError: If the flow function specified in the entrypoint does not exist
2501 """
2503 if ":" in entrypoint:
2504 # split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
2505 path, func_name = entrypoint.rsplit(":", maxsplit=1)
2506 else:
2507 path, func_name = entrypoint.rsplit(".", maxsplit=1)
2508 try:
2509 flow: Flow[P, Any] = import_object(entrypoint) # pyright: ignore[reportRedeclaration]
2510 except AttributeError as exc:
2511 raise MissingFlowError(
2512 f"Flow function with name {func_name!r} not found in {path!r}. "
2513 ) from exc
2514 except ScriptError:
2515 # If the flow has dependencies that are not installed in the current
2516 # environment, fallback to loading the flow via AST parsing.
2517 if use_placeholder_flow:
2518 flow: Optional[Flow[P, Any]] = safe_load_flow_from_entrypoint(entrypoint)
2519 if flow is None:
2520 raise
2521 else:
2522 raise
2524 if not isinstance(flow, Flow): # pyright: ignore[reportUnnecessaryIsInstance]
2525 raise MissingFlowError(
2526 f"Function with name {func_name!r} is not a flow. Make sure that it is "
2527 "decorated with '@flow'."
2528 )
2530 return flow
2533def load_function_and_convert_to_flow(entrypoint: str) -> Flow[P, Any]: 1a
2534 """
2535 Loads a function from an entrypoint and converts it to a flow if it is not already a flow.
2536 """
2538 if ":" in entrypoint:
2539 # split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff
2540 path, func_name = entrypoint.rsplit(":", maxsplit=1)
2541 else:
2542 path, func_name = entrypoint.rsplit(".", maxsplit=1)
2543 try:
2544 func = import_object(entrypoint) # pyright: ignore[reportRedeclaration]
2545 except AttributeError as exc:
2546 raise RuntimeError(
2547 f"Function with name {func_name!r} not found in {path!r}."
2548 ) from exc
2550 if isinstance(func, Flow):
2551 return func
2552 else:
2553 return Flow(func, log_prints=True)
2556def serve( 1a
2557 *args: "RunnerDeployment",
2558 pause_on_shutdown: bool = True,
2559 print_starting_message: bool = True,
2560 limit: Optional[int] = None,
2561 **kwargs: Any,
2562) -> None:
2563 """
2564 Serve the provided list of deployments.
2566 Args:
2567 *args: A list of deployments to serve.
2568 pause_on_shutdown: A boolean for whether or not to automatically pause
2569 deployment schedules on shutdown.
2570 print_starting_message: Whether or not to print message to the console
2571 on startup.
2572 limit: The maximum number of runs that can be executed concurrently.
2573 **kwargs: Additional keyword arguments to pass to the runner.
2575 Examples:
2576 Prepare two deployments and serve them:
2578 ```python
2579 import datetime
2581 from prefect import flow, serve
2583 @flow
2584 def my_flow(name):
2585 print(f"hello {name}")
2587 @flow
2588 def my_other_flow(name):
2589 print(f"goodbye {name}")
2591 if __name__ == "__main__":
2592 # Run once a day
2593 hello_deploy = my_flow.to_deployment(
2594 "hello", tags=["dev"], interval=datetime.timedelta(days=1)
2595 )
2597 # Run every Sunday at 4:00 AM
2598 bye_deploy = my_other_flow.to_deployment(
2599 "goodbye", tags=["dev"], cron="0 4 * * sun"
2600 )
2602 serve(hello_deploy, bye_deploy)
2603 ```
2604 """
2606 from prefect.runner import Runner
2608 if is_in_async_context():
2609 raise RuntimeError(
2610 "Cannot call `serve` in an asynchronous context. Use `aserve` instead."
2611 )
2613 runner = Runner(pause_on_shutdown=pause_on_shutdown, limit=limit, **kwargs)
2614 for deployment in args:
2615 if deployment.work_pool_name:
2616 warnings.warn(
2617 "Work pools are not necessary for served deployments - "
2618 "the `work_pool_name` argument will be ignored. Omit the "
2619 f"`work_pool_name` argument from `to_deployment` for {deployment.name!r}.",
2620 UserWarning,
2621 )
2622 deployment.work_pool_name = None
2623 runner.add_deployment(deployment)
2625 if print_starting_message:
2626 _display_serve_start_message(*args)
2628 try:
2629 asyncio.run(runner.start())
2630 except (KeyboardInterrupt, TerminationSignal) as exc:
2631 logger.info(f"Received {type(exc).__name__}, shutting down...")
2634async def aserve( 1a
2635 *args: "RunnerDeployment",
2636 pause_on_shutdown: bool = True,
2637 print_starting_message: bool = True,
2638 limit: Optional[int] = None,
2639 **kwargs: Any,
2640) -> None:
2641 """
2642 Asynchronously serve the provided list of deployments.
2644 Use `serve` instead if calling from a synchronous context.
2646 Args:
2647 *args: A list of deployments to serve.
2648 pause_on_shutdown: A boolean for whether or not to automatically pause
2649 deployment schedules on shutdown.
2650 print_starting_message: Whether or not to print message to the console
2651 on startup.
2652 limit: The maximum number of runs that can be executed concurrently.
2653 **kwargs: Additional keyword arguments to pass to the runner.
2655 Examples:
2656 Prepare deployment and asynchronous initialization function and serve them:
2658 ```python
2659 import asyncio
2660 import datetime
2662 from prefect import flow, aserve, get_client
2665 async def init():
2666 await set_concurrency_limit()
2669 async def set_concurrency_limit():
2670 async with get_client() as client:
2671 await client.create_concurrency_limit(tag='dev', concurrency_limit=3)
2674 @flow
2675 async def my_flow(name):
2676 print(f"hello {name}")
2679 async def main():
2680 # Initialization function
2681 await init()
2683 # Run once a day
2684 hello_deploy = await my_flow.to_deployment(
2685 "hello", tags=["dev"], interval=datetime.timedelta(days=1)
2686 )
2688 await aserve(hello_deploy)
2691 if __name__ == "__main__":
2692 asyncio.run(main())
2693 """
2695 from prefect.runner import Runner
2697 runner = Runner(pause_on_shutdown=pause_on_shutdown, limit=limit, **kwargs)
2698 for deployment in args:
2699 add_deployment_coro = runner.add_deployment(deployment)
2700 if TYPE_CHECKING:
2701 assert inspect.isawaitable(add_deployment_coro)
2703 await add_deployment_coro
2705 if print_starting_message:
2706 _display_serve_start_message(*args)
2708 await runner.start()
2711def _display_serve_start_message(*args: "RunnerDeployment"): 1a
2712 from rich.console import Console, Group
2713 from rich.table import Table
2715 help_message_top = (
2716 "[green]Your deployments are being served and polling for scheduled runs!\n[/]"
2717 )
2719 table = Table(title="Deployments", show_header=False)
2721 table.add_column(style="blue", no_wrap=True)
2723 for deployment in args:
2724 table.add_row(f"{deployment.flow_name}/{deployment.name}")
2726 help_message_bottom = (
2727 "\nTo trigger any of these deployments, use the"
2728 " following command:\n[blue]\n\t$ prefect deployment run"
2729 " [DEPLOYMENT_NAME]\n[/]"
2730 )
2731 if PREFECT_UI_URL:
2732 help_message_bottom += (
2733 "\nYou can also trigger your deployments via the Prefect UI:"
2734 f" [blue]{PREFECT_UI_URL.value()}/deployments[/]\n"
2735 )
2737 console = Console()
2738 console.print(Group(help_message_top, table, help_message_bottom), soft_wrap=True)
2741@client_injector 1a
2742async def load_flow_from_flow_run( 1a
2743 client: "PrefectClient",
2744 flow_run: "FlowRun",
2745 ignore_storage: bool = False,
2746 storage_base_path: Optional[str] = None,
2747 use_placeholder_flow: bool = True,
2748) -> Flow[..., Any]:
2749 """
2750 Load a flow from the location/script provided in a deployment's storage document.
2752 If `ignore_storage=True` is provided, no pull from remote storage occurs. This flag
2753 is largely for testing, and assumes the flow is already available locally.
2754 """
2755 if flow_run.deployment_id is None:
2756 raise ValueError("Flow run does not have an associated deployment")
2758 deployment = await client.read_deployment(flow_run.deployment_id)
2760 if deployment.entrypoint is None:
2761 raise ValueError(
2762 f"Deployment {deployment.id} does not have an entrypoint and can not be run."
2763 )
2765 run_logger = flow_run_logger(flow_run)
2767 runner_storage_base_path = storage_base_path or os.environ.get(
2768 "PREFECT__STORAGE_BASE_PATH"
2769 )
2771 # If there's no colon, assume it's a module path
2772 if ":" not in deployment.entrypoint:
2773 run_logger.debug(
2774 f"Importing flow code from module path {deployment.entrypoint}"
2775 )
2776 flow = await run_sync_in_worker_thread(
2777 load_flow_from_entrypoint,
2778 deployment.entrypoint,
2779 use_placeholder_flow=use_placeholder_flow,
2780 )
2781 return flow
2783 if not ignore_storage and not deployment.pull_steps:
2784 sys.path.insert(0, ".")
2785 if deployment.storage_document_id:
2786 storage_document = await client.read_block_document(
2787 deployment.storage_document_id
2788 )
2789 from prefect.blocks.core import Block
2791 storage_block = Block._from_block_document(storage_document)
2792 else:
2793 basepath = deployment.path
2794 if runner_storage_base_path:
2795 basepath = str(basepath).replace(
2796 "$STORAGE_BASE_PATH", runner_storage_base_path
2797 )
2798 storage_block = LocalFileSystem(basepath=basepath)
2800 from_path = (
2801 str(deployment.path).replace("$STORAGE_BASE_PATH", runner_storage_base_path)
2802 if runner_storage_base_path and deployment.path
2803 else deployment.path
2804 )
2805 run_logger.info(f"Downloading flow code from storage at {from_path!r}")
2806 await storage_block.get_directory(from_path=from_path, local_path=".")
2808 if deployment.pull_steps:
2809 run_logger.debug(
2810 f"Running {len(deployment.pull_steps)} deployment pull step(s)"
2811 )
2813 from prefect.deployments.steps.core import StepExecutionError, run_steps
2815 try:
2816 output = await run_steps(
2817 deployment.pull_steps,
2818 print_function=run_logger.info,
2819 deployment=deployment,
2820 flow_run=flow_run,
2821 logger=run_logger,
2822 )
2823 except StepExecutionError as e:
2824 e = e.__cause__ or e
2825 run_logger.error(str(e))
2826 raise
2828 if output.get("directory"):
2829 run_logger.debug(f"Changing working directory to {output['directory']!r}")
2830 os.chdir(output["directory"])
2832 import_path = relative_path_to_current_platform(deployment.entrypoint)
2833 run_logger.debug(f"Importing flow code from '{import_path}'")
2835 try:
2836 flow = await run_sync_in_worker_thread(
2837 load_flow_from_entrypoint,
2838 str(import_path),
2839 use_placeholder_flow=use_placeholder_flow,
2840 )
2841 except MissingFlowError:
2842 flow = await run_sync_in_worker_thread(
2843 load_function_and_convert_to_flow,
2844 str(import_path),
2845 )
2847 return flow
2850def load_placeholder_flow(entrypoint: str, raises: Exception) -> Flow[P, Any]: 1a
2851 """
2852 Load a placeholder flow that is initialized with the same arguments as the
2853 flow specified in the entrypoint. If called the flow will raise `raises`.
2855 This is useful when a flow can't be loaded due to missing dependencies or
2856 other issues but the base metadata defining the flow is still needed.
2858 Args:
2859 entrypoint: a string in the format `<path_to_script>:<flow_func_name>`
2860 or a module path to a flow function
2861 raises: an exception to raise when the flow is called
2862 """
2864 def _base_placeholder():
2865 raise raises
2867 def sync_placeholder_flow(*args: "P.args", **kwargs: "P.kwargs"):
2868 _base_placeholder()
2870 async def async_placeholder_flow(*args: "P.args", **kwargs: "P.kwargs"):
2871 _base_placeholder()
2873 placeholder_flow = (
2874 async_placeholder_flow
2875 if is_entrypoint_async(entrypoint)
2876 else sync_placeholder_flow
2877 )
2879 arguments = load_flow_arguments_from_entrypoint(entrypoint)
2880 arguments["fn"] = placeholder_flow
2882 return Flow(**arguments)
2885def safe_load_flow_from_entrypoint(entrypoint: str) -> Optional[Flow[P, Any]]: 1a
2886 """
2887 Safely load a Prefect flow from an entrypoint string. Returns None if loading fails.
2889 Args:
2890 entrypoint (str): A string identifying the flow to load. Can be in one of the following formats:
2891 - `<path_to_script>:<flow_func_name>`
2892 - `<path_to_script>:<class_name>.<flow_method_name>`
2893 - `<module_path>.<flow_func_name>`
2895 Returns:
2896 Optional[Flow]: The loaded Prefect flow object, or None if loading fails due to errors
2897 (e.g. unresolved dependencies, syntax errors, or missing objects).
2898 """
2899 func_or_cls_def, source_code, parts = _entrypoint_definition_and_source(entrypoint)
2901 path = entrypoint.rsplit(":", maxsplit=1)[0] if ":" in entrypoint else None
2902 namespace = safe_load_namespace(source_code, filepath=path)
2904 if parts[0] not in namespace:
2905 # If the object is not in the namespace, it may be due to missing dependencies
2906 # in annotations or default values. We will attempt to sanitize them by removing
2907 # anything that cannot be compiled, and then recompile the function or class.
2908 if isinstance(func_or_cls_def, (ast.FunctionDef, ast.AsyncFunctionDef)):
2909 return _sanitize_and_load_flow(func_or_cls_def, namespace)
2910 elif (
2911 isinstance(func_or_cls_def, ast.ClassDef)
2912 and len(parts) >= 2
2913 and func_or_cls_def.name == parts[0]
2914 ):
2915 method_name = parts[1]
2916 method_def = next(
2917 (
2918 stmt
2919 for stmt in func_or_cls_def.body
2920 if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef))
2921 and stmt.name == method_name
2922 ),
2923 None,
2924 )
2925 if method_def is not None:
2926 return _sanitize_and_load_flow(method_def, namespace)
2927 else:
2928 return None
2930 obj = namespace.get(parts[0])
2931 for part in parts[1:]:
2932 obj = getattr(obj, part, None)
2933 if obj is None:
2934 return None
2935 return obj
2938def _sanitize_and_load_flow( 1a
2939 func_def: Union[ast.FunctionDef, ast.AsyncFunctionDef], namespace: dict[str, Any]
2940) -> Optional[Flow[P, Any]]:
2941 """
2942 Attempt to load a flow from the function definition after sanitizing the annotations
2943 and defaults that can't be compiled.
2945 Args:
2946 func_def: the function definition
2947 namespace: the namespace to load the function into
2949 Returns:
2950 The loaded function or None if the function can't be loaded
2951 after sanitizing the annotations and defaults.
2952 """
2953 args = func_def.args.posonlyargs + func_def.args.args + func_def.args.kwonlyargs
2954 if func_def.args.vararg:
2955 args.append(func_def.args.vararg)
2956 if func_def.args.kwarg:
2957 args.append(func_def.args.kwarg)
2958 # Remove annotations that can't be compiled
2959 for arg in args:
2960 if arg.annotation is not None:
2961 try:
2962 code = compile(
2963 ast.Expression(arg.annotation),
2964 filename="<ast>",
2965 mode="eval",
2966 )
2967 exec(code, namespace)
2968 except Exception as e:
2969 logger.debug(
2970 "Failed to evaluate annotation for argument %s due to the following error. Ignoring annotation.",
2971 arg.arg,
2972 exc_info=e,
2973 )
2974 arg.annotation = None
2976 # Remove defaults that can't be compiled
2977 new_defaults: list[Any] = []
2978 for default in func_def.args.defaults:
2979 try:
2980 code = compile(ast.Expression(default), "<ast>", "eval")
2981 exec(code, namespace)
2982 new_defaults.append(default)
2983 except Exception as e:
2984 logger.debug(
2985 "Failed to evaluate default value %s due to the following error. Ignoring default.",
2986 default,
2987 exc_info=e,
2988 )
2989 new_defaults.append(
2990 ast.Constant(
2991 value=None, lineno=default.lineno, col_offset=default.col_offset
2992 )
2993 )
2994 func_def.args.defaults = new_defaults
2996 # Remove kw_defaults that can't be compiled
2997 new_kw_defaults: list[Any] = []
2998 for default in func_def.args.kw_defaults:
2999 if default is not None:
3000 try:
3001 code = compile(ast.Expression(default), "<ast>", "eval")
3002 exec(code, namespace)
3003 new_kw_defaults.append(default)
3004 except Exception as e:
3005 logger.debug(
3006 "Failed to evaluate default value %s due to the following error. Ignoring default.",
3007 default,
3008 exc_info=e,
3009 )
3010 new_kw_defaults.append(
3011 ast.Constant(
3012 value=None,
3013 lineno=default.lineno,
3014 col_offset=default.col_offset,
3015 )
3016 )
3017 else:
3018 new_kw_defaults.append(
3019 ast.Constant(
3020 value=None,
3021 lineno=func_def.lineno,
3022 col_offset=func_def.col_offset,
3023 )
3024 )
3025 func_def.args.kw_defaults = new_kw_defaults
3027 if func_def.returns is not None:
3028 try:
3029 code = compile(
3030 ast.Expression(func_def.returns), filename="<ast>", mode="eval"
3031 )
3032 exec(code, namespace)
3033 except Exception as e:
3034 logger.debug(
3035 "Failed to evaluate return annotation due to the following error. Ignoring annotation.",
3036 exc_info=e,
3037 )
3038 func_def.returns = None
3040 # Attempt to compile the function without annotations and defaults that
3041 # can't be compiled
3042 try:
3043 code = compile(
3044 ast.Module(body=[func_def], type_ignores=[]),
3045 filename="<ast>",
3046 mode="exec",
3047 )
3048 exec(code, namespace)
3049 except Exception as e:
3050 logger.debug("Failed to compile: %s", e)
3051 else:
3052 return namespace.get(func_def.name)
3055def load_flow_arguments_from_entrypoint( 1a
3056 entrypoint: str, arguments: Optional[Union[list[str], set[str]]] = None
3057) -> dict[str, Any]:
3058 """
3059 Extract flow arguments from an entrypoint string.
3061 Loads the source code of the entrypoint and extracts the flow arguments
3062 from the `flow` decorator.
3064 Args:
3065 entrypoint: a string in the format `<path_to_script>:<flow_func_name>`
3066 or a module path to a flow function
3067 """
3069 func_def, source_code, _ = _entrypoint_definition_and_source(entrypoint)
3070 path = None
3071 if ":" in entrypoint:
3072 path = entrypoint.rsplit(":")[0]
3074 if arguments is None:
3075 # If no arguments are provided default to known arguments that are of
3076 # built-in types.
3077 arguments = {
3078 "name",
3079 "version",
3080 "retries",
3081 "retry_delay_seconds",
3082 "description",
3083 "timeout_seconds",
3084 "validate_parameters",
3085 "persist_result",
3086 "cache_result_in_memory",
3087 "log_prints",
3088 }
3090 result: dict[str, Any] = {}
3092 for decorator in func_def.decorator_list:
3093 if (
3094 isinstance(decorator, ast.Call)
3095 and getattr(decorator.func, "id", "") == "flow"
3096 ):
3097 for keyword in decorator.keywords:
3098 if keyword.arg not in arguments:
3099 continue
3101 if isinstance(keyword.value, ast.Constant):
3102 # Use the string value of the argument
3103 result[cast(str, keyword.arg)] = str(keyword.value.value)
3104 continue
3106 # if the arg value is not a raw str (i.e. a variable or expression),
3107 # then attempt to evaluate it
3108 namespace = safe_load_namespace(source_code, filepath=path)
3109 literal_arg_value = ast.get_source_segment(source_code, keyword.value)
3110 cleaned_value = (
3111 literal_arg_value.replace("\n", "") if literal_arg_value else ""
3112 )
3114 try:
3115 evaluated_value = eval(cleaned_value, namespace) # type: ignore
3116 result[cast(str, keyword.arg)] = str(evaluated_value)
3117 except Exception as e:
3118 logger.info(
3119 "Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.",
3120 keyword.arg,
3121 literal_arg_value,
3122 exc_info=e,
3123 )
3124 # ignore the decorator arg and fallback to default behavior
3125 continue
3127 if "name" in arguments and "name" not in result:
3128 # If no matching decorator or keyword argument for `name' is found
3129 # fallback to the function name.
3130 result["name"] = func_def.name.replace("_", "-")
3132 return result
3135def is_entrypoint_async(entrypoint: str) -> bool: 1a
3136 """
3137 Determine if the function specified in the entrypoint is asynchronous.
3139 Args:
3140 entrypoint: A string in the format `<path_to_script>:<func_name>` or
3141 a module path to a function.
3143 Returns:
3144 True if the function is asynchronous, False otherwise.
3145 """
3146 func_def, _, _ = _entrypoint_definition_and_source(entrypoint)
3147 return isinstance(func_def, ast.AsyncFunctionDef)
3150def _entrypoint_definition_and_source( 1a
3151 entrypoint: str,
3152) -> Tuple[Union[ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef], str, List[str]]:
3153 """
3154 Resolves and parses the source definition of a given entrypoint.
3156 The entrypoint can be provided in one of the following formats:
3157 - '<path_to_script>:<flow_func_name>'
3158 - '<path_to_script>:<class_name>.<flow_method_name>'
3159 - '<module_path.to.flow_function>'
3161 Returns:
3162 A tuple containing:
3163 - The AST node (FunctionDef, AsyncFunctionDef, or ClassDef) of the base object.
3164 - The full source code of the file or module as a string.
3165 - A list of attribute access parts from the object path (e.g., ['MyFlowClass', 'run']).
3167 Raises:
3168 ValueError: If the module or target object cannot be found.
3169 """
3170 if ":" in entrypoint:
3171 path, object_path = entrypoint.rsplit(":", maxsplit=1)
3172 source_code = Path(path).read_text()
3173 else:
3174 path, object_path = entrypoint.rsplit(".", maxsplit=1)
3175 spec = importlib.util.find_spec(path)
3176 if not spec or not spec.origin:
3177 raise ValueError(f"Could not find module {path!r}")
3178 source_code = Path(spec.origin).read_text()
3180 parsed_code = ast.parse(source_code)
3181 parts = object_path.split(".")
3182 base_name = parts[0]
3184 base_def = next(
3185 (
3186 node
3187 for node in ast.walk(parsed_code)
3188 if isinstance(
3189 node,
3190 (
3191 ast.FunctionDef,
3192 ast.AsyncFunctionDef,
3193 ast.ClassDef, # flow can be staticmethod/classmethod
3194 ),
3195 )
3196 and node.name == base_name
3197 ),
3198 None,
3199 )
3201 if not base_def:
3202 raise ValueError(f"Could not find object {base_name!r} in {path!r}")
3204 return base_def, source_code, parts