Coverage for /usr/local/lib/python3.12/site-packages/prefect/tasks.py: 40%
474 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Module containing the base workflow task class and decorator - for most use cases, using the `@task` decorator is preferred.
3"""
5# This file requires type-checking with pyright because mypy does not yet support PEP612
6# See https://github.com/python/mypy/issues/8645
7from __future__ import annotations 1a
9import datetime 1a
10import inspect 1a
11from copy import copy 1a
12from functools import partial, update_wrapper 1a
13from typing import ( 1a
14 TYPE_CHECKING,
15 Any,
16 Awaitable,
17 Callable,
18 Coroutine,
19 Generic,
20 Iterable,
21 NoReturn,
22 Optional,
23 Protocol,
24 TypeVar,
25 Union,
26 cast,
27 overload,
28)
29from uuid import UUID, uuid4 1a
31from typing_extensions import ( 1a
32 Literal,
33 ParamSpec,
34 Self,
35 Sequence,
36 TypeAlias,
37 TypedDict,
38 TypeIs,
39 Unpack,
40)
42import prefect.states 1a
43from prefect._internal.uuid7 import uuid7 1a
44from prefect.assets import Asset 1a
45from prefect.cache_policies import DEFAULT, NO_CACHE, CachePolicy 1a
46from prefect.client.orchestration import get_client 1a
47from prefect.client.schemas import TaskRun 1a
48from prefect.client.schemas.objects import ( 1a
49 RunInput,
50 StateDetails,
51 TaskRunPolicy,
52 TaskRunResult,
53)
54from prefect.context import ( 1a
55 FlowRunContext,
56 TagsContext,
57 TaskRunContext,
58 serialize_context,
59)
60from prefect.futures import PrefectDistributedFuture, PrefectFuture, PrefectFutureList 1a
61from prefect.logging.loggers import get_logger 1a
62from prefect.results import ( 1a
63 ResultSerializer,
64 ResultStorage,
65 ResultStore,
66 get_or_create_default_task_scheduling_storage,
67)
68from prefect.settings.context import get_current_settings 1a
69from prefect.states import Pending, Scheduled, State 1a
70from prefect.utilities.annotations import NotSet 1a
71from prefect.utilities.asyncutils import run_coro_as_sync, sync_compatible 1a
72from prefect.utilities.callables import ( 1a
73 expand_mapping_parameters,
74 get_call_parameters,
75 raise_for_reserved_arguments,
76)
77from prefect.utilities.hashing import hash_objects 1a
78from prefect.utilities.importtools import to_qualified_name 1a
79from prefect.utilities.urls import url_for 1a
81if TYPE_CHECKING: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true1a
82 import logging
84 from prefect.client.orchestration import PrefectClient
85 from prefect.context import TaskRunContext
86 from prefect.transactions import Transaction
88T = TypeVar("T") 1a
89R = TypeVar("R") # The return type of the user's function 1a
90P = ParamSpec("P") # The parameters of the task 1a
92NUM_CHARS_DYNAMIC_KEY = 8 1a
94logger: "logging.Logger" = get_logger("tasks") 1a
96FutureOrResult: TypeAlias = Union[PrefectFuture[T], T] 1a
97OneOrManyFutureOrResult: TypeAlias = Union[ 1a
98 FutureOrResult[T], Iterable[FutureOrResult[T]]
99]
102class TaskRunNameCallbackWithParameters(Protocol): 1a
103 @classmethod 1a
104 def is_callback_with_parameters(cls, callable: Callable[..., str]) -> TypeIs[Self]: 1a
105 sig = inspect.signature(callable)
106 return "parameters" in sig.parameters
108 def __call__(self, parameters: dict[str, Any]) -> str: ... 108 ↛ exitline 108 didn't return from function '__call__' because 1a
111StateHookCallable: TypeAlias = Callable[ 1a
112 ["Task[..., Any]", TaskRun, State], Union[Awaitable[None], None]
113]
114RetryConditionCallable: TypeAlias = Callable[ 1a
115 ["Task[..., Any]", TaskRun, State], Union[Awaitable[bool], bool]
116]
117TaskRunNameValueOrCallable: TypeAlias = Union[ 1a
118 Callable[[], str], TaskRunNameCallbackWithParameters, str
119]
122class TaskOptions(TypedDict, total=False): 1a
123 """
124 A TypedDict representing all available task configuration options.
126 This can be used with `Unpack` to provide type hints for **kwargs.
127 """
129 name: Optional[str] 1a
130 description: Optional[str] 1a
131 tags: Optional[Iterable[str]] 1a
132 version: Optional[str] 1a
133 cache_policy: Union[CachePolicy, type[NotSet]] 1a
134 cache_key_fn: Union[ 1a
135 Callable[["TaskRunContext", dict[str, Any]], Optional[str]], None
136 ]
137 cache_expiration: Optional[datetime.timedelta] 1a
138 task_run_name: Optional[TaskRunNameValueOrCallable] 1a
139 retries: Optional[int] 1a
140 retry_delay_seconds: Union[ 1a
141 float, int, list[float], Callable[[int], list[float]], None
142 ]
143 retry_jitter_factor: Optional[float] 1a
144 persist_result: Optional[bool] 1a
145 result_storage: Optional[ResultStorage] 1a
146 result_serializer: Optional[ResultSerializer] 1a
147 result_storage_key: Optional[str] 1a
148 cache_result_in_memory: bool 1a
149 timeout_seconds: Union[int, float, None] 1a
150 log_prints: Optional[bool] 1a
151 refresh_cache: Optional[bool] 1a
152 on_completion: Optional[list[StateHookCallable]] 1a
153 on_failure: Optional[list[StateHookCallable]] 1a
154 on_running: Optional[list[StateHookCallable]] 1a
155 on_rollback: Optional[list[Callable[["Transaction"], None]]] 1a
156 on_commit: Optional[list[Callable[["Transaction"], None]]] 1a
157 retry_condition_fn: Optional[RetryConditionCallable] 1a
158 viz_return_value: Any 1a
159 asset_deps: Optional[list[Union[Asset, str]]] 1a
162def task_input_hash( 1a
163 context: "TaskRunContext", arguments: dict[str, Any]
164) -> Optional[str]:
165 """
166 A task cache key implementation which hashes all inputs to the task using a JSON or
167 cloudpickle serializer. If any arguments are not JSON serializable, the pickle
168 serializer is used as a fallback. If cloudpickle fails, this will return a null key
169 indicating that a cache key could not be generated for the given inputs.
171 Arguments:
172 context: the active `TaskRunContext`
173 arguments: a dictionary of arguments to be passed to the underlying task
175 Returns:
176 a string hash if hashing succeeded, else `None`
177 """
178 return hash_objects(
179 # We use the task key to get the qualified name for the task and include the
180 # task functions `co_code` bytes to avoid caching when the underlying function
181 # changes
182 context.task.task_key,
183 context.task.fn.__code__.co_code.hex(),
184 arguments,
185 )
188def exponential_backoff(backoff_factor: float) -> Callable[[int], list[float]]: 1a
189 """
190 A task retry backoff utility that configures exponential backoff for task retries.
191 The exponential backoff design matches the urllib3 implementation.
193 Arguments:
194 backoff_factor: the base delay for the first retry, subsequent retries will
195 increase the delay time by powers of 2.
197 Returns:
198 a callable that can be passed to the task constructor
199 """
201 def retry_backoff_callable(retries: int) -> list[float]:
202 # no more than 50 retry delays can be configured on a task
203 retries = min(retries, 50)
205 return [backoff_factor * max(0, 2**r) for r in range(retries)]
207 return retry_backoff_callable
210def _infer_parent_task_runs( 1a
211 flow_run_context: Optional[FlowRunContext],
212 task_run_context: Optional[TaskRunContext],
213 parameters: dict[str, Any],
214) -> list[TaskRunResult]:
215 """
216 Attempt to infer the parent task runs for this task run based on the
217 provided flow run and task run contexts, as well as any parameters. It is
218 assumed that the task run is running within those contexts.
219 If any parameter comes from a running task run, that task run is considered
220 a parent. This is expected to happen when task inputs are yielded from
221 generator tasks.
222 """
223 parents: list[TaskRunResult] = []
225 # check if this task has a parent task run based on running in another
226 # task run's existing context. A task run is only considered a parent if
227 # it is in the same flow run (because otherwise presumably the child is
228 # in a subflow, so the subflow serves as the parent) or if there is no
229 # flow run
230 if task_run_context:
231 # there is no flow run
232 if not flow_run_context:
233 parents.append(TaskRunResult(id=task_run_context.task_run.id))
234 # there is a flow run and the task run is in the same flow run
235 elif flow_run_context and task_run_context.task_run.flow_run_id == getattr(
236 flow_run_context.flow_run, "id", None
237 ):
238 parents.append(TaskRunResult(id=task_run_context.task_run.id))
240 # parent dependency tracking: for every provided parameter value, try to
241 # load the corresponding task run state. If the task run state is still
242 # running, we consider it a parent task run. Note this is only done if
243 # there is an active flow run context because dependencies are only
244 # tracked within the same flow run.
245 if flow_run_context:
246 for v in parameters.values():
247 upstream_state = None
249 if isinstance(v, State):
250 upstream_state = v
251 elif isinstance(v, PrefectFuture):
252 upstream_state = v.state
253 else:
254 res = flow_run_context.run_results.get(id(v))
255 if res:
256 upstream_state, _ = res
258 if upstream_state and upstream_state.is_running():
259 parents.append(
260 TaskRunResult(id=upstream_state.state_details.task_run_id)
261 )
263 return parents
266def _generate_task_key(fn: Callable[..., Any]) -> str: 1a
267 """Generate a task key based on the function name and source code.
269 We may eventually want some sort of top-level namespace here to
270 disambiguate tasks with the same function name in different modules,
271 in a more human-readable way, while avoiding relative import problems (see #12337).
273 As long as the task implementations are unique (even if named the same), we should
274 not have any collisions.
276 Args:
277 fn: The function to generate a task key for.
278 """
279 if not hasattr(fn, "__qualname__"): 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true1b
280 return to_qualified_name(type(fn))
282 qualname = fn.__qualname__.split(".")[-1] 1b
284 try: 1b
285 code_obj = getattr(fn, "__code__", None) 1b
286 if code_obj is None: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true1b
287 code_obj = fn.__call__.__code__
288 except AttributeError:
289 raise AttributeError(
290 f"{fn} is not a standard Python function object and could not be converted to a task."
291 ) from None
293 code_hash = ( 1b
294 h[:NUM_CHARS_DYNAMIC_KEY] if (h := hash_objects(code_obj)) else "unknown"
295 )
297 return f"{qualname}-{code_hash}" 1b
300class Task(Generic[P, R]): 1a
301 """
302 A Prefect task definition.
304 Wraps a function with an entrypoint to the Prefect engine. Calling this class within a flow function
305 creates a new task run.
307 To preserve the input and output types, we use the generic type variables P and R for "Parameters" and
308 "Returns" respectively.
310 Args:
311 fn: The function defining the task.
312 name: An optional name for the task; if not provided, the name will be inferred
313 from the given function.
314 description: An optional string description for the task.
315 tags: An optional set of tags to be associated with runs of this task. These
316 tags are combined with any tags defined by a `prefect.tags` context at
317 task runtime.
318 version: An optional string specifying the version of this task definition
319 cache_policy: A cache policy that determines the level of caching for this task
320 cache_key_fn: An optional callable that, given the task run context and call
321 parameters, generates a string key; if the key matches a previous completed
322 state, that state result will be restored instead of running the task again.
323 cache_expiration: An optional amount of time indicating how long cached states
324 for this task should be restorable; if not provided, cached states will
325 never expire.
326 task_run_name: An optional name to distinguish runs of this task; this name can be provided
327 as a string template with the task's keyword arguments as variables,
328 or a function that returns a string.
329 retries: An optional number of times to retry on task run failure.
330 retry_delay_seconds: Optionally configures how long to wait before retrying the
331 task after failure. This is only applicable if `retries` is nonzero. This
332 setting can either be a number of seconds, a list of retry delays, or a
333 callable that, given the total number of retries, generates a list of retry
334 delays. If a number of seconds, that delay will be applied to all retries.
335 If a list, each retry will wait for the corresponding delay before retrying.
336 When passing a callable or a list, the number of configured retry delays
337 cannot exceed 50.
338 retry_jitter_factor: An optional factor that defines the factor to which a retry
339 can be jittered in order to avoid a "thundering herd".
340 persist_result: A toggle indicating whether the result of this task
341 should be persisted to result storage. Defaults to `None`, which
342 indicates that the global default should be used (which is `True` by
343 default).
344 result_storage: An optional block to use to persist the result of this task.
345 Defaults to the value set in the flow the task is called in.
346 result_storage_key: An optional key to store the result in storage at when persisted.
347 Defaults to a unique identifier.
348 result_serializer: An optional serializer to use to serialize the result of this
349 task for persistence. Defaults to the value set in the flow the task is
350 called in.
351 timeout_seconds: An optional number of seconds indicating a maximum runtime for
352 the task. If the task exceeds this runtime, it will be marked as failed.
353 log_prints: If set, `print` statements in the task will be redirected to the
354 Prefect logger for the task run. Defaults to `None`, which indicates
355 that the value from the flow should be used.
356 refresh_cache: If set, cached results for the cache key are not used.
357 Defaults to `None`, which indicates that a cached result from a previous
358 execution with matching cache key is used.
359 on_failure: An optional list of callables to run when the task enters a failed state.
360 on_completion: An optional list of callables to run when the task enters a completed state.
361 on_commit: An optional list of callables to run when the task's idempotency record is committed.
362 on_rollback: An optional list of callables to run when the task rolls back.
363 retry_condition_fn: An optional callable run when a task run returns a Failed state. Should
364 return `True` if the task should continue to its retry policy (e.g. `retries=3`), and `False` if the task
365 should end as failed. Defaults to `None`, indicating the task should always continue
366 to its retry policy.
367 viz_return_value: An optional value to return when the task dependency tree is visualized.
368 asset_deps: An optional list of upstream assets that this task depends on.
369 """
371 # NOTE: These parameters (types, defaults, and docstrings) should be duplicated
372 # exactly in the @task decorator
373 def __init__( 1a
374 self,
375 fn: Callable[P, R] | "classmethod[Any, P, R]" | "staticmethod[P, R]",
376 name: Optional[str] = None,
377 description: Optional[str] = None,
378 tags: Optional[Iterable[str]] = None,
379 version: Optional[str] = None,
380 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet,
381 cache_key_fn: Optional[
382 Callable[["TaskRunContext", dict[str, Any]], Optional[str]]
383 ] = None,
384 cache_expiration: Optional[datetime.timedelta] = None,
385 task_run_name: Optional[TaskRunNameValueOrCallable] = None,
386 retries: Optional[int] = None,
387 retry_delay_seconds: Optional[
388 Union[
389 float,
390 int,
391 list[float],
392 Callable[[int], list[float]],
393 ]
394 ] = None,
395 retry_jitter_factor: Optional[float] = None,
396 persist_result: Optional[bool] = None,
397 result_storage: Optional[ResultStorage] = None,
398 result_serializer: Optional[ResultSerializer] = None,
399 result_storage_key: Optional[str] = None,
400 cache_result_in_memory: bool = True,
401 timeout_seconds: Union[int, float, None] = None,
402 log_prints: Optional[bool] = False,
403 refresh_cache: Optional[bool] = None,
404 on_completion: Optional[list[StateHookCallable]] = None,
405 on_failure: Optional[list[StateHookCallable]] = None,
406 on_running: Optional[list[StateHookCallable]] = None,
407 on_rollback: Optional[list[Callable[["Transaction"], None]]] = None,
408 on_commit: Optional[list[Callable[["Transaction"], None]]] = None,
409 retry_condition_fn: Optional[RetryConditionCallable] = None,
410 viz_return_value: Optional[Any] = None,
411 asset_deps: Optional[list[Union[str, Asset]]] = None,
412 ):
413 # Validate if hook passed is list and contains callables
414 hook_categories = [on_completion, on_failure, on_running] 1b
415 hook_names = ["on_completion", "on_failure", "on_running"] 1b
416 for hooks, hook_name in zip(hook_categories, hook_names): 1b
417 if hooks is not None: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true1b
418 try:
419 hooks = list(hooks)
420 except TypeError:
421 raise TypeError(
422 f"Expected iterable for '{hook_name}'; got"
423 f" {type(hooks).__name__} instead. Please provide a list of"
424 f" hooks to '{hook_name}':\n\n"
425 f"@task({hook_name}=[hook1, hook2])\ndef"
426 " my_task():\n\tpass"
427 )
429 for hook in hooks:
430 if not callable(hook):
431 raise TypeError(
432 f"Expected callables in '{hook_name}'; got"
433 f" {type(hook).__name__} instead. Please provide a list of"
434 f" hooks to '{hook_name}':\n\n"
435 f"@task({hook_name}=[hook1, hook2])\ndef"
436 " my_task():\n\tpass"
437 )
439 if isinstance(fn, classmethod): 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true1b
440 fn = cast(Callable[P, R], fn.__func__)
441 self._isclassmethod = True
443 if isinstance(fn, staticmethod): 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true1b
444 fn = cast(Callable[P, R], fn.__func__)
445 self._isstaticmethod = True
447 if not callable(fn): 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true1b
448 raise TypeError("'fn' must be callable")
450 self.description: str | None = description or inspect.getdoc(fn) 1b
451 update_wrapper(self, fn) 1b
452 self.fn = fn 1b
454 # the task is considered async if its function is async or an async
455 # generator
456 self.isasync: bool = inspect.iscoroutinefunction( 1b
457 self.fn
458 ) or inspect.isasyncgenfunction(self.fn)
460 # the task is considered a generator if its function is a generator or
461 # an async generator
462 self.isgenerator: bool = inspect.isgeneratorfunction( 1b
463 self.fn
464 ) or inspect.isasyncgenfunction(self.fn)
466 if not name: 466 ↛ 472line 466 didn't jump to line 472 because the condition on line 466 was always true1b
467 if not hasattr(self.fn, "__name__"): 467 ↛ 468line 467 didn't jump to line 468 because the condition on line 467 was never true1b
468 self.name = type(self.fn).__name__
469 else:
470 self.name = self.fn.__name__ 1b
471 else:
472 self.name: str = name
474 if task_run_name is not None: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true1b
475 if not isinstance(task_run_name, str) and not callable(task_run_name):
476 raise TypeError(
477 "Expected string or callable for 'task_run_name'; got"
478 f" {type(task_run_name).__name__} instead."
479 )
480 self.task_run_name = task_run_name 1b
482 self.version = version 1b
483 self.log_prints = log_prints 1b
485 raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"]) 1b
487 self.tags: set[str] = set(tags if tags else []) 1b
489 self.task_key: str = _generate_task_key(self.fn) 1b
491 # determine cache and result configuration
492 settings = get_current_settings() 1b
493 if settings.tasks.default_no_cache and cache_policy is NotSet: 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true1b
494 cache_policy = NO_CACHE
496 if cache_policy is not NotSet and cache_key_fn is not None: 496 ↛ 497line 496 didn't jump to line 497 because the condition on line 496 was never true1b
497 logger.warning(
498 f"Both `cache_policy` and `cache_key_fn` are set on task {self}. `cache_key_fn` will be used."
499 )
501 if cache_key_fn: 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true1b
502 cache_policy = CachePolicy.from_cache_key_fn(cache_key_fn)
504 # TODO: manage expiration and cache refresh
505 self.cache_key_fn = cache_key_fn 1b
506 self.cache_expiration = cache_expiration 1b
507 self.refresh_cache = refresh_cache 1b
509 # result persistence settings
510 if persist_result is None: 510 ↛ 525line 510 didn't jump to line 525 because the condition on line 510 was always true1b
511 if any( 511 ↛ 522line 511 didn't jump to line 522 because the condition on line 511 was never true1b
512 [
513 cache_policy
514 and cache_policy != NO_CACHE
515 and cache_policy != NotSet,
516 cache_key_fn is not None,
517 result_storage_key is not None,
518 result_storage is not None,
519 result_serializer is not None,
520 ]
521 ):
522 persist_result = True
524 # Check for global cache disable setting
525 if settings.tasks.disable_caching: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true1b
526 cache_policy = NO_CACHE
528 if persist_result is False: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true1b
529 self.cache_policy = None if cache_policy is None else NO_CACHE
530 if cache_policy and cache_policy is not NotSet and cache_policy != NO_CACHE:
531 logger.warning(
532 "Ignoring `cache_policy` because `persist_result` is False"
533 )
534 elif cache_policy is NotSet and result_storage_key is None: 534 ↛ 536line 534 didn't jump to line 536 because the condition on line 534 was always true1b
535 self.cache_policy = DEFAULT 1b
536 elif cache_policy != NO_CACHE and result_storage_key:
537 # TODO: handle this situation with double storage
538 self.cache_policy = None
539 else:
540 self.cache_policy: Union[CachePolicy, type[NotSet], None] = cache_policy
542 # TaskRunPolicy settings
543 # TODO: We can instantiate a `TaskRunPolicy` and add Pydantic bound checks to
544 # validate that the user passes positive numbers here
546 self.retries: int = ( 1b
547 retries if retries is not None else settings.tasks.default_retries
548 )
549 if retry_delay_seconds is None: 549 ↛ 552line 549 didn't jump to line 552 because the condition on line 549 was always true1b
550 retry_delay_seconds = settings.tasks.default_retry_delay_seconds 1b
552 if callable(retry_delay_seconds): 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true1b
553 self.retry_delay_seconds = retry_delay_seconds(self.retries)
554 elif not isinstance(retry_delay_seconds, (list, int, float, type(None))): 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true1b
555 raise TypeError(
556 f"Invalid `retry_delay_seconds` provided; must be an int, float, list or callable. Received type {type(retry_delay_seconds)}"
557 )
558 else:
559 self.retry_delay_seconds: Union[float, int, list[float], None] = ( 1b
560 retry_delay_seconds
561 )
563 if isinstance(self.retry_delay_seconds, list) and ( 563 ↛ 566line 563 didn't jump to line 566 because the condition on line 563 was never true1b
564 len(self.retry_delay_seconds) > 50
565 ):
566 raise ValueError("Can not configure more than 50 retry delays per task.")
568 if retry_jitter_factor is not None and retry_jitter_factor < 0: 568 ↛ 569line 568 didn't jump to line 569 because the condition on line 568 was never true1b
569 raise ValueError("`retry_jitter_factor` must be >= 0.")
571 self.retry_jitter_factor = retry_jitter_factor 1b
572 self.persist_result = persist_result 1b
574 if result_storage and not isinstance(result_storage, str): 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true1b
575 if getattr(result_storage, "_block_document_id", None) is None:
576 raise TypeError(
577 "Result storage configuration must be persisted server-side."
578 " Please call `.save()` on your block before passing it in."
579 )
581 self.result_storage = result_storage 1b
582 self.result_serializer = result_serializer 1b
583 self.result_storage_key = result_storage_key 1b
584 self.cache_result_in_memory = cache_result_in_memory 1b
585 self.timeout_seconds: Union[float, None] = ( 1b
586 float(timeout_seconds) if timeout_seconds else None
587 )
588 self.on_rollback_hooks: list[Callable[["Transaction"], None]] = ( 1b
589 on_rollback or []
590 )
591 self.on_commit_hooks: list[Callable[["Transaction"], None]] = on_commit or [] 1b
592 self.on_completion_hooks: list[StateHookCallable] = on_completion or [] 1b
593 self.on_failure_hooks: list[StateHookCallable] = on_failure or [] 1b
594 self.on_running_hooks: list[StateHookCallable] = on_running or [] 1b
596 # retry_condition_fn must be a callable or None. If it is neither, raise a TypeError
597 if retry_condition_fn is not None and not (callable(retry_condition_fn)): 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true1b
598 raise TypeError(
599 "Expected `retry_condition_fn` to be callable, got"
600 f" {type(retry_condition_fn).__name__} instead."
601 )
603 self.retry_condition_fn = retry_condition_fn 1b
604 self.viz_return_value = viz_return_value 1b
606 from prefect.assets import Asset 1b
608 self.asset_deps: list[Asset] = ( 1b
609 [Asset(key=a) if isinstance(a, str) else a for a in asset_deps]
610 if asset_deps
611 else []
612 )
614 @property 1a
615 def ismethod(self) -> bool: 1a
616 return hasattr(self.fn, "__prefect_self__")
618 @property 1a
619 def isclassmethod(self) -> bool: 1a
620 return getattr(self, "_isclassmethod", False)
622 @property 1a
623 def isstaticmethod(self) -> bool: 1a
624 return getattr(self, "_isstaticmethod", False)
626 def __get__(self, instance: Any, owner: Any) -> "Task[P, R]": 1a
627 """
628 Implement the descriptor protocol so that the task can be used as an instance method.
629 When an instance method is loaded, this method is called with the "self" instance as
630 an argument. We return a copy of the task with that instance bound to the task's function.
631 """
632 # wrapped function is a classmethod
633 if self.isclassmethod:
634 bound_task = copy(self)
635 setattr(bound_task.fn, "__prefect_cls__", owner)
636 return bound_task
638 # if the task is being accessed on an instance, bind the instance to the __prefect_self__ attribute
639 # of the task's function. This will allow it to be automatically added to the task's parameters
640 if instance:
641 bound_task = copy(self)
642 bound_task.fn.__prefect_self__ = instance # type: ignore[attr-defined]
643 return bound_task
645 return self
647 def with_options( 1a
648 self,
649 *,
650 name: Optional[str] = None,
651 description: Optional[str] = None,
652 tags: Optional[Iterable[str]] = None,
653 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet,
654 cache_key_fn: Optional[
655 Callable[["TaskRunContext", dict[str, Any]], Optional[str]]
656 ] = None,
657 task_run_name: Optional[
658 Union[TaskRunNameValueOrCallable, type[NotSet]]
659 ] = NotSet,
660 cache_expiration: Optional[datetime.timedelta] = None,
661 retries: Union[int, type[NotSet]] = NotSet,
662 retry_delay_seconds: Union[
663 float,
664 int,
665 list[float],
666 Callable[[int], list[float]],
667 type[NotSet],
668 ] = NotSet,
669 retry_jitter_factor: Union[float, type[NotSet]] = NotSet,
670 persist_result: Union[bool, type[NotSet]] = NotSet,
671 result_storage: Union[ResultStorage, type[NotSet]] = NotSet,
672 result_serializer: Union[ResultSerializer, type[NotSet]] = NotSet,
673 result_storage_key: Union[str, type[NotSet]] = NotSet,
674 cache_result_in_memory: Optional[bool] = None,
675 timeout_seconds: Union[int, float, None] = None,
676 log_prints: Union[bool, type[NotSet]] = NotSet,
677 refresh_cache: Union[bool, type[NotSet]] = NotSet,
678 on_completion: Optional[list[StateHookCallable]] = None,
679 on_failure: Optional[list[StateHookCallable]] = None,
680 on_running: Optional[list[StateHookCallable]] = None,
681 retry_condition_fn: Optional[RetryConditionCallable] = None,
682 viz_return_value: Optional[Any] = None,
683 asset_deps: Optional[list[Union[str, Asset]]] = None,
684 ) -> "Task[P, R]":
685 """
686 Create a new task from the current object, updating provided options.
688 Args:
689 name: A new name for the task.
690 description: A new description for the task.
691 tags: A new set of tags for the task. If given, existing tags are ignored,
692 not merged.
693 cache_key_fn: A new cache key function for the task.
694 cache_expiration: A new cache expiration time for the task.
695 task_run_name: An optional name to distinguish runs of this task; this name can be provided
696 as a string template with the task's keyword arguments as variables,
697 or a function that returns a string.
698 retries: A new number of times to retry on task run failure.
699 retry_delay_seconds: Optionally configures how long to wait before retrying
700 the task after failure. This is only applicable if `retries` is nonzero.
701 This setting can either be a number of seconds, a list of retry delays,
702 or a callable that, given the total number of retries, generates a list
703 of retry delays. If a number of seconds, that delay will be applied to
704 all retries. If a list, each retry will wait for the corresponding delay
705 before retrying. When passing a callable or a list, the number of
706 configured retry delays cannot exceed 50.
707 retry_jitter_factor: An optional factor that defines the factor to which a
708 retry can be jittered in order to avoid a "thundering herd".
709 persist_result: A new option for enabling or disabling result persistence.
710 result_storage: A new storage type to use for results.
711 result_serializer: A new serializer to use for results.
712 result_storage_key: A new key for the persisted result to be stored at.
713 timeout_seconds: A new maximum time for the task to complete in seconds.
714 log_prints: A new option for enabling or disabling redirection of `print` statements.
715 refresh_cache: A new option for enabling or disabling cache refresh.
716 on_completion: A new list of callables to run when the task enters a completed state.
717 on_failure: A new list of callables to run when the task enters a failed state.
718 retry_condition_fn: An optional callable run when a task run returns a Failed state.
719 Should return `True` if the task should continue to its retry policy, and `False`
720 if the task should end as failed. Defaults to `None`, indicating the task should
721 always continue to its retry policy.
722 viz_return_value: An optional value to return when the task dependency tree is visualized.
724 Returns:
725 A new `Task` instance.
727 Examples:
729 Create a new task from an existing task and update the name:
731 ```python
732 @task(name="My task")
733 def my_task():
734 return 1
736 new_task = my_task.with_options(name="My new task")
737 ```
739 Create a new task from an existing task and update the retry settings:
741 ```python
742 from random import randint
744 @task(retries=1, retry_delay_seconds=5)
745 def my_task():
746 x = randint(0, 5)
747 if x >= 3: # Make a task that fails sometimes
748 raise ValueError("Retry me please!")
749 return x
751 new_task = my_task.with_options(retries=5, retry_delay_seconds=2)
752 ```
754 Use a task with updated options within a flow:
756 ```python
757 @task(name="My task")
758 def my_task():
759 return 1
761 @flow
762 my_flow():
763 new_task = my_task.with_options(name="My new task")
764 new_task()
765 ```
767 """
768 return Task(
769 fn=self.fn,
770 name=name or self.name,
771 description=description or self.description,
772 tags=tags or copy(self.tags),
773 cache_policy=cache_policy
774 if cache_policy is not NotSet
775 else self.cache_policy,
776 cache_key_fn=cache_key_fn or self.cache_key_fn,
777 cache_expiration=cache_expiration or self.cache_expiration,
778 task_run_name=task_run_name
779 if task_run_name is not NotSet
780 else self.task_run_name,
781 retries=retries if retries is not NotSet else self.retries,
782 retry_delay_seconds=(
783 retry_delay_seconds
784 if retry_delay_seconds is not NotSet
785 else self.retry_delay_seconds
786 ),
787 retry_jitter_factor=(
788 retry_jitter_factor
789 if retry_jitter_factor is not NotSet
790 else self.retry_jitter_factor
791 ),
792 persist_result=(
793 persist_result if persist_result is not NotSet else self.persist_result
794 ),
795 result_storage=(
796 result_storage if result_storage is not NotSet else self.result_storage
797 ),
798 result_storage_key=(
799 result_storage_key
800 if result_storage_key is not NotSet
801 else self.result_storage_key
802 ),
803 result_serializer=(
804 result_serializer
805 if result_serializer is not NotSet
806 else self.result_serializer
807 ),
808 cache_result_in_memory=(
809 cache_result_in_memory
810 if cache_result_in_memory is not None
811 else self.cache_result_in_memory
812 ),
813 timeout_seconds=(
814 timeout_seconds if timeout_seconds is not None else self.timeout_seconds
815 ),
816 log_prints=(log_prints if log_prints is not NotSet else self.log_prints),
817 refresh_cache=(
818 refresh_cache if refresh_cache is not NotSet else self.refresh_cache
819 ),
820 on_completion=on_completion or self.on_completion_hooks,
821 on_failure=on_failure or self.on_failure_hooks,
822 on_running=on_running or self.on_running_hooks,
823 retry_condition_fn=retry_condition_fn or self.retry_condition_fn,
824 viz_return_value=viz_return_value or self.viz_return_value,
825 asset_deps=asset_deps or self.asset_deps,
826 )
828 def on_completion(self, fn: StateHookCallable) -> StateHookCallable: 1a
829 self.on_completion_hooks.append(fn)
830 return fn
832 def on_failure(self, fn: StateHookCallable) -> StateHookCallable: 1a
833 self.on_failure_hooks.append(fn)
834 return fn
836 def on_running(self, fn: StateHookCallable) -> StateHookCallable: 1a
837 self.on_running_hooks.append(fn)
838 return fn
840 def on_commit( 1a
841 self, fn: Callable[["Transaction"], None]
842 ) -> Callable[["Transaction"], None]:
843 self.on_commit_hooks.append(fn)
844 return fn
846 def on_rollback( 1a
847 self, fn: Callable[["Transaction"], None]
848 ) -> Callable[["Transaction"], None]:
849 self.on_rollback_hooks.append(fn)
850 return fn
852 async def create_run( 1a
853 self,
854 client: Optional["PrefectClient"] = None,
855 id: Optional[UUID] = None,
856 parameters: Optional[dict[str, Any]] = None,
857 flow_run_context: Optional[FlowRunContext] = None,
858 parent_task_run_context: Optional[TaskRunContext] = None,
859 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
860 extra_task_inputs: Optional[dict[str, set[RunInput]]] = None,
861 deferred: bool = False,
862 ) -> TaskRun:
863 from prefect.utilities._engine import dynamic_key_for_task_run
864 from prefect.utilities.engine import collect_task_run_inputs_sync
866 if flow_run_context is None:
867 flow_run_context = FlowRunContext.get()
868 if parent_task_run_context is None:
869 parent_task_run_context = TaskRunContext.get()
870 if parameters is None:
871 parameters = {}
872 if client is None:
873 client = get_client()
875 async with client:
876 if not flow_run_context:
877 dynamic_key = f"{self.task_key}-{str(uuid4().hex)}"
878 task_run_name = self.name
879 else:
880 dynamic_key = dynamic_key_for_task_run(
881 context=flow_run_context, task=self
882 )
883 task_run_name = f"{self.name}-{dynamic_key}"
885 if deferred:
886 state = Scheduled()
887 state.state_details.deferred = True
888 else:
889 state = Pending()
891 # store parameters for background tasks so that task worker
892 # can retrieve them at runtime
893 if deferred and (parameters or wait_for):
894 from prefect.task_worker import store_parameters
896 parameters_id = uuid4()
897 state.state_details.task_parameters_id = parameters_id
899 # TODO: Improve use of result storage for parameter storage / reference
900 self.persist_result = True
902 store = await ResultStore(
903 result_storage=await get_or_create_default_task_scheduling_storage()
904 ).update_for_task(self)
905 context = serialize_context()
906 data: dict[str, Any] = {"context": context}
907 if parameters:
908 data["parameters"] = parameters
909 if wait_for:
910 data["wait_for"] = wait_for
911 await store_parameters(store, parameters_id, data)
913 # collect task inputs
914 task_inputs = {
915 k: collect_task_run_inputs_sync(v) for k, v in parameters.items()
916 }
918 # collect all parent dependencies
919 if task_parents := _infer_parent_task_runs(
920 flow_run_context=flow_run_context,
921 task_run_context=parent_task_run_context,
922 parameters=parameters,
923 ):
924 task_inputs["__parents__"] = task_parents
926 # check wait for dependencies
927 if wait_for:
928 task_inputs["wait_for"] = collect_task_run_inputs_sync(wait_for)
930 # Join extra task inputs
931 for k, extras in (extra_task_inputs or {}).items():
932 task_inputs[k] = task_inputs[k].union(extras)
934 # create the task run
935 task_run = client.create_task_run(
936 task=self,
937 name=task_run_name,
938 flow_run_id=(
939 getattr(flow_run_context.flow_run, "id", None)
940 if flow_run_context and flow_run_context.flow_run
941 else None
942 ),
943 dynamic_key=str(dynamic_key),
944 id=id,
945 state=state,
946 task_inputs=task_inputs,
947 extra_tags=TagsContext.get().current_tags,
948 )
949 # the new engine uses sync clients but old engines use async clients
950 if inspect.isawaitable(task_run):
951 task_run = await task_run
953 return task_run
955 async def create_local_run( 1a
956 self,
957 client: Optional["PrefectClient"] = None,
958 id: Optional[UUID] = None,
959 parameters: Optional[dict[str, Any]] = None,
960 flow_run_context: Optional[FlowRunContext] = None,
961 parent_task_run_context: Optional[TaskRunContext] = None,
962 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
963 extra_task_inputs: Optional[dict[str, set[RunInput]]] = None,
964 deferred: bool = False,
965 ) -> TaskRun:
966 from prefect.utilities._engine import dynamic_key_for_task_run
967 from prefect.utilities.engine import (
968 collect_task_run_inputs_sync,
969 )
971 if flow_run_context is None:
972 flow_run_context = FlowRunContext.get()
973 if parent_task_run_context is None:
974 parent_task_run_context = TaskRunContext.get()
975 if parameters is None:
976 parameters = {}
977 if client is None:
978 client = get_client()
980 async with client:
981 if not flow_run_context:
982 dynamic_key = f"{self.task_key}-{str(uuid4().hex)}"
983 task_run_name = self.name
984 else:
985 dynamic_key = dynamic_key_for_task_run(
986 context=flow_run_context, task=self, stable=False
987 )
988 task_run_name = f"{self.name}-{dynamic_key[:3]}"
990 if deferred:
991 state = Scheduled()
992 state.state_details.deferred = True
993 else:
994 state = Pending()
996 # store parameters for background tasks so that task worker
997 # can retrieve them at runtime
998 if deferred and (parameters or wait_for):
999 from prefect.task_worker import store_parameters
1001 parameters_id = uuid4()
1002 state.state_details.task_parameters_id = parameters_id
1004 # TODO: Improve use of result storage for parameter storage / reference
1005 self.persist_result = True
1007 store = await ResultStore(
1008 result_storage=await get_or_create_default_task_scheduling_storage()
1009 ).update_for_task(self)
1010 context = serialize_context()
1011 data: dict[str, Any] = {"context": context}
1012 if parameters:
1013 data["parameters"] = parameters
1014 if wait_for:
1015 data["wait_for"] = wait_for
1016 await store_parameters(store, parameters_id, data)
1018 # collect task inputs
1019 task_inputs = {
1020 k: collect_task_run_inputs_sync(v) for k, v in parameters.items()
1021 }
1023 # collect all parent dependencies
1024 if task_parents := _infer_parent_task_runs(
1025 flow_run_context=flow_run_context,
1026 task_run_context=parent_task_run_context,
1027 parameters=parameters,
1028 ):
1029 task_inputs["__parents__"] = task_parents
1031 # check wait for dependencies
1032 if wait_for:
1033 task_inputs["wait_for"] = collect_task_run_inputs_sync(wait_for)
1035 # Join extra task inputs
1036 for k, extras in (extra_task_inputs or {}).items():
1037 task_inputs[k] = task_inputs[k].union(extras)
1039 flow_run_id = (
1040 getattr(flow_run_context.flow_run, "id", None)
1041 if flow_run_context and flow_run_context.flow_run
1042 else None
1043 )
1044 task_run_id = id or uuid7()
1046 state = prefect.states.Pending(
1047 state_details=StateDetails(
1048 task_run_id=task_run_id,
1049 flow_run_id=flow_run_id,
1050 )
1051 )
1052 task_run = TaskRun(
1053 id=task_run_id,
1054 name=task_run_name,
1055 flow_run_id=flow_run_id,
1056 task_key=self.task_key,
1057 dynamic_key=str(dynamic_key),
1058 task_version=self.version,
1059 empirical_policy=TaskRunPolicy(
1060 retries=self.retries,
1061 retry_delay=self.retry_delay_seconds,
1062 retry_jitter_factor=self.retry_jitter_factor,
1063 ),
1064 tags=list(set(self.tags).union(TagsContext.get().current_tags or [])),
1065 task_inputs=task_inputs or {},
1066 expected_start_time=state.timestamp,
1067 state_id=state.id,
1068 state_type=state.type,
1069 state_name=state.name,
1070 state=state,
1071 created=state.timestamp,
1072 updated=state.timestamp,
1073 )
1075 return task_run
1077 # PRIORITY OVERLOADS: Clean ParamSpec signatures for normal usage (no return_state/wait_for)
1078 # These preserve full parameter type checking when users call tasks normally
1079 @overload 1a
1080 def __call__( 1080 ↛ exitline 1080 didn't return from function '__call__' because 1a
1081 self: "Task[P, Coroutine[Any, Any, R]]",
1082 *args: P.args,
1083 **kwargs: P.kwargs,
1084 ) -> Coroutine[Any, Any, R]: ...
1086 @overload 1a
1087 def __call__( 1087 ↛ exitline 1087 didn't return from function '__call__' because 1a
1088 self: "Task[P, R]",
1089 *args: P.args,
1090 **kwargs: P.kwargs,
1091 ) -> R: ...
1093 @overload 1a
1094 def __call__( 1a
1095 self: "Task[P, NoReturn]",
1096 *args: P.args,
1097 **kwargs: P.kwargs,
1098 ) -> None:
1099 # `NoReturn` matches if a type can't be inferred for the function which stops a
1100 # sync function from matching the `Coroutine` overload
1101 ...
1103 # SECONDARY OVERLOADS: With return_state/wait_for using Any
1104 # When return_state or wait_for are used, we can't preserve ParamSpec semantics,
1105 # so we use Any for parameters. This is an acceptable tradeoff since these
1106 # are advanced use cases.
1107 @overload 1a
1108 def __call__( 1108 ↛ exitline 1108 didn't return from function '__call__' because 1a
1109 self: "Task[..., Coroutine[Any, Any, R]]",
1110 *args: Any,
1111 return_state: Literal[False],
1112 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1113 **kwargs: Any,
1114 ) -> Coroutine[Any, Any, R]: ...
1116 @overload 1a
1117 def __call__( 1117 ↛ exitline 1117 didn't return from function '__call__' because 1a
1118 self: "Task[..., Coroutine[Any, Any, R]]",
1119 *args: Any,
1120 return_state: Literal[True],
1121 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1122 **kwargs: Any,
1123 ) -> State[R]: ...
1125 @overload 1a
1126 def __call__( 1126 ↛ exitline 1126 didn't return from function '__call__' because 1a
1127 self: "Task[..., R]",
1128 *args: Any,
1129 return_state: Literal[False],
1130 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1131 **kwargs: Any,
1132 ) -> R: ...
1134 @overload 1a
1135 def __call__( 1135 ↛ exitline 1135 didn't return from function '__call__' because 1a
1136 self: "Task[..., R]",
1137 *args: Any,
1138 return_state: Literal[True],
1139 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1140 **kwargs: Any,
1141 ) -> State[R]: ...
1143 @overload 1a
1144 def __call__( 1144 ↛ exitline 1144 didn't return from function '__call__' because 1a
1145 self: "Task[..., Coroutine[Any, Any, R]]",
1146 *args: Any,
1147 wait_for: OneOrManyFutureOrResult[Any],
1148 return_state: Literal[False] = False,
1149 **kwargs: Any,
1150 ) -> Coroutine[Any, Any, R]: ...
1152 @overload 1a
1153 def __call__( 1153 ↛ exitline 1153 didn't return from function '__call__' because 1a
1154 self: "Task[..., R]",
1155 *args: Any,
1156 wait_for: OneOrManyFutureOrResult[Any],
1157 return_state: Literal[False] = False,
1158 **kwargs: Any,
1159 ) -> R: ...
1161 def __call__( 1a
1162 self: "Union[Task[..., R], Task[..., NoReturn]]",
1163 *args: Any,
1164 return_state: bool = False,
1165 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1166 **kwargs: Any,
1167 ) -> Union[R, State[R], None]:
1168 """
1169 Run the task and return the result. If `return_state` is True returns
1170 the result is wrapped in a Prefect State which provides error handling.
1171 """
1172 from prefect.utilities.visualization import (
1173 get_task_viz_tracker,
1174 track_viz_task,
1175 )
1177 # Convert the call args/kwargs to a parameter dict
1178 parameters = get_call_parameters(self.fn, args, kwargs)
1180 return_type = "state" if return_state else "result"
1182 task_run_tracker = get_task_viz_tracker()
1183 if task_run_tracker:
1184 return track_viz_task(
1185 self.isasync, self.name, parameters, self.viz_return_value
1186 )
1188 from prefect.task_engine import run_task
1190 return run_task(
1191 task=self,
1192 parameters=parameters,
1193 wait_for=wait_for,
1194 return_type=return_type,
1195 )
1197 @overload 1a
1198 def submit( 1198 ↛ exitline 1198 didn't return from function 'submit' because 1a
1199 self: "Task[P, R]",
1200 *args: P.args,
1201 **kwargs: P.kwargs,
1202 ) -> PrefectFuture[R]: ...
1204 @overload 1a
1205 def submit( 1205 ↛ exitline 1205 didn't return from function 'submit' because 1a
1206 self: "Task[P, Coroutine[Any, Any, R]]",
1207 *args: P.args,
1208 return_state: Literal[False],
1209 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1210 **kwargs: P.kwargs,
1211 ) -> PrefectFuture[R]: ...
1213 @overload 1a
1214 def submit( 1214 ↛ exitline 1214 didn't return from function 'submit' because 1a
1215 self: "Task[P, R]",
1216 *args: P.args,
1217 return_state: Literal[False],
1218 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1219 **kwargs: P.kwargs,
1220 ) -> PrefectFuture[R]: ...
1222 @overload 1a
1223 def submit( 1223 ↛ exitline 1223 didn't return from function 'submit' because 1a
1224 self: "Task[P, Coroutine[Any, Any, R]]",
1225 *args: P.args,
1226 return_state: Literal[True],
1227 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1228 **kwargs: P.kwargs,
1229 ) -> State[R]: ...
1231 @overload 1a
1232 def submit( 1232 ↛ exitline 1232 didn't return from function 'submit' because 1a
1233 self: "Task[P, R]",
1234 *args: P.args,
1235 return_state: Literal[True],
1236 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1237 **kwargs: P.kwargs,
1238 ) -> State[R]: ...
1240 def submit( 1a
1241 self: "Union[Task[P, R], Task[P, Coroutine[Any, Any, R]]]",
1242 *args: Any,
1243 return_state: bool = False,
1244 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None,
1245 **kwargs: Any,
1246 ):
1247 """
1248 Submit a run of the task to the engine.
1250 Will create a new task run in the backing API and submit the task to the flow's
1251 task runner. This call only blocks execution while the task is being submitted,
1252 once it is submitted, the flow function will continue executing.
1254 This method is always synchronous, even if the underlying user function is asynchronous.
1256 Args:
1257 *args: Arguments to run the task with
1258 return_state: Return the result of the flow run wrapped in a
1259 Prefect State.
1260 wait_for: Upstream task futures to wait for before starting the task
1261 **kwargs: Keyword arguments to run the task with
1263 Returns:
1264 If `return_state` is False a future allowing asynchronous access to
1265 the state of the task
1266 If `return_state` is True a future wrapped in a Prefect State allowing asynchronous access to
1267 the state of the task
1269 Examples:
1271 Define a task
1273 ```python
1274 from prefect import task
1275 @task
1276 def my_task():
1277 return "hello"
1278 ```
1280 Run a task in a flow
1282 ```python
1283 from prefect import flow
1284 @flow
1285 def my_flow():
1286 my_task.submit()
1287 ```
1289 Wait for a task to finish
1291 ```python
1292 @flow
1293 def my_flow():
1294 my_task.submit().wait()
1295 ```
1297 Use the result from a task in a flow
1299 ```python
1300 @flow
1301 def my_flow():
1302 print(my_task.submit().result())
1304 my_flow()
1305 # hello
1306 ```
1308 Run an async task in an async flow
1310 ```python
1311 @task
1312 async def my_async_task():
1313 pass
1315 @flow
1316 async def my_flow():
1317 my_async_task.submit()
1318 ```
1320 Run a sync task in an async flow
1322 ```python
1323 @flow
1324 async def my_flow():
1325 my_task.submit()
1326 ```
1328 Enforce ordering between tasks that do not exchange data
1330 ```python
1331 @task
1332 def task_1():
1333 pass
1335 @task
1336 def task_2():
1337 pass
1339 @flow
1340 def my_flow():
1341 x = task_1.submit()
1343 # task 2 will wait for task_1 to complete
1344 y = task_2.submit(wait_for=[x])
1345 ```
1347 """
1349 from prefect.utilities.visualization import (
1350 VisualizationUnsupportedError,
1351 get_task_viz_tracker,
1352 )
1354 # Convert the call args/kwargs to a parameter dict
1355 parameters = get_call_parameters(self.fn, args, kwargs)
1356 flow_run_context = FlowRunContext.get()
1358 if not flow_run_context:
1359 raise RuntimeError(
1360 "Unable to determine task runner to use for submission. If you are"
1361 " submitting a task outside of a flow, please use `.delay`"
1362 " to submit the task run for deferred execution."
1363 )
1365 task_viz_tracker = get_task_viz_tracker()
1366 if task_viz_tracker:
1367 raise VisualizationUnsupportedError(
1368 "`task.submit()` is not currently supported by `flow.visualize()`"
1369 )
1371 task_runner = flow_run_context.task_runner
1372 future = task_runner.submit(self, parameters, wait_for)
1373 if return_state:
1374 future.wait()
1375 return future.state
1376 else:
1377 return future
1379 @overload 1a
1380 def map( 1380 ↛ exitline 1380 didn't return from function 'map' because 1a
1381 self: "Task[P, R]",
1382 *args: Any,
1383 return_state: Literal[True],
1384 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ...,
1385 deferred: bool = ...,
1386 **kwargs: Any,
1387 ) -> list[State[R]]: ...
1389 @overload 1a
1390 def map( 1390 ↛ exitline 1390 didn't return from function 'map' because 1a
1391 self: "Task[P, R]",
1392 *args: Any,
1393 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ...,
1394 deferred: bool = ...,
1395 **kwargs: Any,
1396 ) -> PrefectFutureList[R]: ...
1398 @overload 1a
1399 def map( 1399 ↛ exitline 1399 didn't return from function 'map' because 1a
1400 self: "Task[P, R]",
1401 *args: Any,
1402 return_state: Literal[True],
1403 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ...,
1404 deferred: bool = ...,
1405 **kwargs: Any,
1406 ) -> list[State[R]]: ...
1408 @overload 1a
1409 def map( 1409 ↛ exitline 1409 didn't return from function 'map' because 1a
1410 self: "Task[P, R]",
1411 *args: Any,
1412 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ...,
1413 deferred: bool = ...,
1414 **kwargs: Any,
1415 ) -> PrefectFutureList[R]: ...
1417 @overload 1a
1418 def map( 1418 ↛ exitline 1418 didn't return from function 'map' because 1a
1419 self: "Task[P, Coroutine[Any, Any, R]]",
1420 *args: Any,
1421 return_state: Literal[True],
1422 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ...,
1423 deferred: bool = ...,
1424 **kwargs: Any,
1425 ) -> list[State[R]]: ...
1427 @overload 1a
1428 def map( 1428 ↛ exitline 1428 didn't return from function 'map' because 1a
1429 self: "Task[P, Coroutine[Any, Any, R]]",
1430 *args: Any,
1431 return_state: Literal[False],
1432 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ...,
1433 deferred: bool = ...,
1434 **kwargs: Any,
1435 ) -> PrefectFutureList[R]: ...
1437 def map( 1a
1438 self,
1439 *args: Any,
1440 return_state: bool = False,
1441 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = None,
1442 deferred: bool = False,
1443 **kwargs: Any,
1444 ) -> Union[list[State[R]], PrefectFutureList[R]]:
1445 """
1446 Submit a mapped run of the task to a worker.
1448 Must be called within a flow run context. Will return a list of futures
1449 that should be waited on before exiting the flow context to ensure all
1450 mapped tasks have completed.
1452 Must be called with at least one iterable and all iterables must be
1453 the same length. Any arguments that are not iterable will be treated as
1454 a static value and each task run will receive the same value.
1456 Will create as many task runs as the length of the iterable(s) in the
1457 backing API and submit the task runs to the flow's task runner. This
1458 call blocks if given a future as input while the future is resolved. It
1459 also blocks while the tasks are being submitted, once they are
1460 submitted, the flow function will continue executing.
1462 This method is always synchronous, even if the underlying user function is asynchronous.
1464 Args:
1465 *args: Iterable and static arguments to run the tasks with
1466 return_state: Return a list of Prefect States that wrap the results
1467 of each task run.
1468 wait_for: Upstream task futures to wait for before starting the
1469 task
1470 **kwargs: Keyword iterable arguments to run the task with
1472 Returns:
1473 A list of futures allowing asynchronous access to the state of the
1474 tasks
1476 Examples:
1478 Define a task
1480 ```python
1481 from prefect import task
1482 @task
1483 def my_task(x):
1484 return x + 1
1485 ```
1487 Create mapped tasks
1489 ```python
1490 from prefect import flow
1491 @flow
1492 def my_flow():
1493 return my_task.map([1, 2, 3])
1494 ```
1496 Wait for all mapped tasks to finish
1498 ```python
1499 @flow
1500 def my_flow():
1501 futures = my_task.map([1, 2, 3])
1502 futures.wait():
1503 # Now all of the mapped tasks have finished
1504 my_task(10)
1505 ```
1507 Use the result from mapped tasks in a flow
1509 ```python
1510 @flow
1511 def my_flow():
1512 futures = my_task.map([1, 2, 3])
1513 for x in futures.result():
1514 print(x)
1515 my_flow()
1516 # 2
1517 # 3
1518 # 4
1519 ```
1521 Enforce ordering between tasks that do not exchange data
1523 ```python
1524 @task
1525 def task_1(x):
1526 pass
1528 @task
1529 def task_2(y):
1530 pass
1532 @flow
1533 def my_flow():
1534 x = task_1.submit()
1536 # task 2 will wait for task_1 to complete
1537 y = task_2.map([1, 2, 3], wait_for=[x])
1538 return y
1539 ```
1541 Use a non-iterable input as a constant across mapped tasks
1543 ```python
1544 @task
1545 def display(prefix, item):
1546 print(prefix, item)
1548 @flow
1549 def my_flow():
1550 return display.map("Check it out: ", [1, 2, 3])
1552 my_flow()
1553 # Check it out: 1
1554 # Check it out: 2
1555 # Check it out: 3
1556 ```
1558 Use `unmapped` to treat an iterable argument as a constant
1560 ```python
1561 from prefect import unmapped
1563 @task
1564 def add_n_to_items(items, n):
1565 return [item + n for item in items]
1567 @flow
1568 def my_flow():
1569 return add_n_to_items.map(unmapped([10, 20]), n=[1, 2, 3])
1571 my_flow()
1572 # [[11, 21], [12, 22], [13, 23]]
1573 ```
1574 """
1576 from prefect.task_runners import TaskRunner
1577 from prefect.utilities.visualization import (
1578 VisualizationUnsupportedError,
1579 get_task_viz_tracker,
1580 )
1582 # Convert the call args/kwargs to a parameter dict; do not apply defaults
1583 # since they should not be mapped over
1584 parameters = get_call_parameters(self.fn, args, kwargs, apply_defaults=False)
1585 flow_run_context = FlowRunContext.get()
1587 task_viz_tracker = get_task_viz_tracker()
1588 if task_viz_tracker:
1589 raise VisualizationUnsupportedError(
1590 "`task.map()` is not currently supported by `flow.visualize()`"
1591 )
1593 if deferred:
1594 parameters_list = expand_mapping_parameters(self.fn, parameters)
1595 futures = [
1596 self.apply_async(kwargs=parameters, wait_for=wait_for)
1597 for parameters in parameters_list
1598 ]
1599 elif task_runner := getattr(flow_run_context, "task_runner", None):
1600 assert isinstance(task_runner, TaskRunner)
1601 futures = task_runner.map(self, parameters, wait_for)
1602 else:
1603 raise RuntimeError(
1604 "Unable to determine task runner to use for mapped task runs. If"
1605 " you are mapping a task outside of a flow, please provide"
1606 " `deferred=True` to submit the mapped task runs for deferred"
1607 " execution."
1608 )
1609 if return_state:
1610 states: list[State[R]] = []
1611 for future in futures:
1612 future.wait()
1613 states.append(future.state)
1614 return states
1615 else:
1616 return futures
1618 # Background task methods
1620 def apply_async( 1a
1621 self,
1622 args: Optional[tuple[Any, ...]] = None,
1623 kwargs: Optional[dict[str, Any]] = None,
1624 wait_for: Optional[Iterable[PrefectFuture[R]]] = None,
1625 dependencies: Optional[dict[str, set[RunInput]]] = None,
1626 ) -> PrefectDistributedFuture[R]:
1627 """
1628 Create a pending task run for a task worker to execute.
1630 Args:
1631 args: Arguments to run the task with
1632 kwargs: Keyword arguments to run the task with
1634 Returns:
1635 A PrefectDistributedFuture object representing the pending task run
1637 Examples:
1639 Define a task
1641 ```python
1642 from prefect import task
1643 @task
1644 def my_task(name: str = "world"):
1645 return f"hello {name}"
1646 ```
1648 Create a pending task run for the task
1650 ```python
1651 from prefect import flow
1652 @flow
1653 def my_flow():
1654 my_task.apply_async(("marvin",))
1655 ```
1657 Wait for a task to finish
1659 ```python
1660 @flow
1661 def my_flow():
1662 my_task.apply_async(("marvin",)).wait()
1663 ```
1665 ```python
1666 @flow
1667 def my_flow():
1668 print(my_task.apply_async(("marvin",)).result())
1670 my_flow()
1671 # hello marvin
1672 ```
1674 TODO: Enforce ordering between tasks that do not exchange data
1675 ```python
1676 @task
1677 def task_1():
1678 pass
1680 @task
1681 def task_2():
1682 pass
1684 @flow
1685 def my_flow():
1686 x = task_1.apply_async()
1688 # task 2 will wait for task_1 to complete
1689 y = task_2.apply_async(wait_for=[x])
1690 ```
1692 """
1693 from prefect.utilities.visualization import (
1694 VisualizationUnsupportedError,
1695 get_task_viz_tracker,
1696 )
1698 task_viz_tracker = get_task_viz_tracker()
1699 if task_viz_tracker:
1700 raise VisualizationUnsupportedError(
1701 "`task.apply_async()` is not currently supported by `flow.visualize()`"
1702 )
1703 args = args or ()
1704 kwargs = kwargs or {}
1706 # Convert the call args/kwargs to a parameter dict
1707 parameters = get_call_parameters(self.fn, args, kwargs)
1709 task_run: TaskRun = run_coro_as_sync(
1710 self.create_run(
1711 parameters=parameters,
1712 deferred=True,
1713 wait_for=wait_for,
1714 extra_task_inputs=dependencies,
1715 )
1716 ) # type: ignore
1718 from prefect.utilities.engine import emit_task_run_state_change_event
1720 # emit a `SCHEDULED` event for the task run
1721 emit_task_run_state_change_event(
1722 task_run=task_run,
1723 initial_state=None,
1724 validated_state=task_run.state,
1725 )
1727 if get_current_settings().ui_url and (task_run_url := url_for(task_run)):
1728 logger.info(
1729 f"Created task run {task_run.name!r}. View it in the UI at {task_run_url!r}"
1730 )
1732 return PrefectDistributedFuture(task_run_id=task_run.id)
1734 def delay(self, *args: P.args, **kwargs: P.kwargs) -> PrefectDistributedFuture[R]: 1a
1735 """
1736 An alias for `apply_async` with simpler calling semantics.
1738 Avoids having to use explicit "args" and "kwargs" arguments. Arguments
1739 will pass through as-is to the task.
1741 Examples:
1743 Define a task
1745 ```python
1746 from prefect import task
1747 @task
1748 def my_task(name: str = "world"):
1749 return f"hello {name}"
1750 ```
1752 Create a pending task run for the task
1754 ```python
1755 from prefect import flow
1756 @flow
1757 def my_flow():
1758 my_task.delay("marvin")
1759 ```
1761 Wait for a task to finish
1763 ```python
1764 @flow
1765 def my_flow():
1766 my_task.delay("marvin").wait()
1767 ```
1769 Use the result from a task in a flow
1771 ```python
1772 @flow
1773 def my_flow():
1774 print(my_task.delay("marvin").result())
1776 my_flow()
1777 # hello marvin
1778 ```
1779 """
1780 return self.apply_async(args=args, kwargs=kwargs)
1782 @sync_compatible 1a
1783 async def serve(self) -> NoReturn: 1a
1784 """Serve the task using the provided task runner. This method is used to
1785 establish a websocket connection with the Prefect server and listen for
1786 submitted task runs to execute.
1788 Args:
1789 task_runner: The task runner to use for serving the task. If not provided,
1790 the default task runner will be used.
1792 Examples:
1793 Serve a task using the default task runner
1794 ```python
1795 @task
1796 def my_task():
1797 return 1
1799 my_task.serve()
1800 ```
1801 """
1802 from prefect.task_worker import serve
1804 await serve(self)
1807@overload 1a
1808def task(__fn: Callable[P, R]) -> Task[P, R]: ... 1808 ↛ exitline 1808 didn't return from function 'task' because 1a
1811# see https://github.com/PrefectHQ/prefect/issues/16380
1812@overload 1a
1813def task( 1813 ↛ exitline 1813 didn't return from function 'task' because 1a
1814 __fn: Literal[None] = None,
1815 *,
1816 name: Optional[str] = None,
1817 description: Optional[str] = None,
1818 tags: Optional[Iterable[str]] = None,
1819 version: Optional[str] = None,
1820 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet,
1821 cache_key_fn: Optional[
1822 Callable[["TaskRunContext", dict[str, Any]], Optional[str]]
1823 ] = None,
1824 cache_expiration: Optional[datetime.timedelta] = None,
1825 task_run_name: Optional[TaskRunNameValueOrCallable] = None,
1826 retries: int = 0,
1827 retry_delay_seconds: Union[
1828 float, int, list[float], Callable[[int], list[float]], None
1829 ] = None,
1830 retry_jitter_factor: Optional[float] = None,
1831 persist_result: Optional[bool] = None,
1832 result_storage: Optional[ResultStorage] = None,
1833 result_storage_key: Optional[str] = None,
1834 result_serializer: Optional[ResultSerializer] = None,
1835 cache_result_in_memory: bool = True,
1836 timeout_seconds: Union[int, float, None] = None,
1837 log_prints: Optional[bool] = None,
1838 refresh_cache: Optional[bool] = None,
1839 on_completion: Optional[list[StateHookCallable]] = None,
1840 on_failure: Optional[list[StateHookCallable]] = None,
1841 on_running: Optional[list[StateHookCallable]] = None,
1842 retry_condition_fn: Optional[RetryConditionCallable] = None,
1843 viz_return_value: Any = None,
1844 asset_deps: Optional[list[Union[str, Asset]]] = None,
1845) -> Callable[[Callable[P, R]], Task[P, R]]: ...
1848# see https://github.com/PrefectHQ/prefect/issues/16380
1849@overload 1a
1850def task( 1850 ↛ exitline 1850 didn't return from function 'task' because 1a
1851 __fn: Literal[None] = None,
1852 *,
1853 name: Optional[str] = None,
1854 description: Optional[str] = None,
1855 tags: Optional[Iterable[str]] = None,
1856 version: Optional[str] = None,
1857 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet,
1858 cache_key_fn: Optional[
1859 Callable[["TaskRunContext", dict[str, Any]], Optional[str]]
1860 ] = None,
1861 cache_expiration: Optional[datetime.timedelta] = None,
1862 task_run_name: Optional[TaskRunNameValueOrCallable] = None,
1863 retries: int = 0,
1864 retry_delay_seconds: Union[
1865 float, int, list[float], Callable[[int], list[float]], None
1866 ] = None,
1867 retry_jitter_factor: Optional[float] = None,
1868 persist_result: Optional[bool] = None,
1869 result_storage: Optional[ResultStorage] = None,
1870 result_storage_key: Optional[str] = None,
1871 result_serializer: Optional[ResultSerializer] = None,
1872 cache_result_in_memory: bool = True,
1873 timeout_seconds: Union[int, float, None] = None,
1874 log_prints: Optional[bool] = None,
1875 refresh_cache: Optional[bool] = None,
1876 on_completion: Optional[list[StateHookCallable]] = None,
1877 on_failure: Optional[list[StateHookCallable]] = None,
1878 on_running: Optional[list[StateHookCallable]] = None,
1879 retry_condition_fn: Optional[RetryConditionCallable] = None,
1880 viz_return_value: Any = None,
1881 asset_deps: Optional[list[Union[str, Asset]]] = None,
1882) -> Callable[[Callable[P, R]], Task[P, R]]: ...
1885@overload # TODO: do we need this overload? 1a
1886def task( 1886 ↛ exitline 1886 didn't return from function 'task' because 1a
1887 *,
1888 name: Optional[str] = None,
1889 description: Optional[str] = None,
1890 tags: Optional[Iterable[str]] = None,
1891 version: Optional[str] = None,
1892 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet,
1893 cache_key_fn: Optional[
1894 Callable[["TaskRunContext", dict[str, Any]], Optional[str]]
1895 ] = None,
1896 cache_expiration: Optional[datetime.timedelta] = None,
1897 task_run_name: Optional[TaskRunNameValueOrCallable] = None,
1898 retries: int = 0,
1899 retry_delay_seconds: Union[
1900 float,
1901 int,
1902 list[float],
1903 Callable[[int], list[float]],
1904 ] = 0,
1905 retry_jitter_factor: Optional[float] = None,
1906 persist_result: Optional[bool] = None,
1907 result_storage: Optional[ResultStorage] = None,
1908 result_storage_key: Optional[str] = None,
1909 result_serializer: Optional[ResultSerializer] = None,
1910 cache_result_in_memory: bool = True,
1911 timeout_seconds: Union[int, float, None] = None,
1912 log_prints: Optional[bool] = None,
1913 refresh_cache: Optional[bool] = None,
1914 on_completion: Optional[list[StateHookCallable]] = None,
1915 on_failure: Optional[list[StateHookCallable]] = None,
1916 on_running: Optional[list[StateHookCallable]] = None,
1917 retry_condition_fn: Optional[RetryConditionCallable] = None,
1918 viz_return_value: Any = None,
1919 asset_deps: Optional[list[Union[str, Asset]]] = None,
1920) -> Callable[[Callable[P, R]], Task[P, R]]: ...
1923def task( 1a
1924 __fn: Optional[Callable[P, R]] = None,
1925 *,
1926 name: Optional[str] = None,
1927 description: Optional[str] = None,
1928 tags: Optional[Iterable[str]] = None,
1929 version: Optional[str] = None,
1930 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet,
1931 cache_key_fn: Union[
1932 Callable[["TaskRunContext", dict[str, Any]], Optional[str]], None
1933 ] = None,
1934 cache_expiration: Optional[datetime.timedelta] = None,
1935 task_run_name: Optional[TaskRunNameValueOrCallable] = None,
1936 retries: Optional[int] = None,
1937 retry_delay_seconds: Union[
1938 float, int, list[float], Callable[[int], list[float]], None
1939 ] = None,
1940 retry_jitter_factor: Optional[float] = None,
1941 persist_result: Optional[bool] = None,
1942 result_storage: Optional[ResultStorage] = None,
1943 result_storage_key: Optional[str] = None,
1944 result_serializer: Optional[ResultSerializer] = None,
1945 cache_result_in_memory: bool = True,
1946 timeout_seconds: Union[int, float, None] = None,
1947 log_prints: Optional[bool] = None,
1948 refresh_cache: Optional[bool] = None,
1949 on_completion: Optional[list[StateHookCallable]] = None,
1950 on_failure: Optional[list[StateHookCallable]] = None,
1951 on_running: Optional[list[StateHookCallable]] = None,
1952 retry_condition_fn: Optional[RetryConditionCallable] = None,
1953 viz_return_value: Any = None,
1954 asset_deps: Optional[list[Union[str, Asset]]] = None,
1955):
1956 """
1957 Decorator to designate a function as a task in a Prefect workflow.
1959 This decorator may be used for asynchronous or synchronous functions.
1961 Args:
1962 name: An optional name for the task; if not provided, the name will be inferred
1963 from the given function.
1964 description: An optional string description for the task.
1965 tags: An optional set of tags to be associated with runs of this task. These
1966 tags are combined with any tags defined by a `prefect.tags` context at
1967 task runtime.
1968 version: An optional string specifying the version of this task definition
1969 cache_key_fn: An optional callable that, given the task run context and call
1970 parameters, generates a string key; if the key matches a previous completed
1971 state, that state result will be restored instead of running the task again.
1972 cache_expiration: An optional amount of time indicating how long cached states
1973 for this task should be restorable; if not provided, cached states will
1974 never expire.
1975 task_run_name: An optional name to distinguish runs of this task; this name can be provided
1976 as a string template with the task's keyword arguments as variables,
1977 or a function that returns a string.
1978 retries: An optional number of times to retry on task run failure
1979 retry_delay_seconds: Optionally configures how long to wait before retrying the
1980 task after failure. This is only applicable if `retries` is nonzero. This
1981 setting can either be a number of seconds, a list of retry delays, or a
1982 callable that, given the total number of retries, generates a list of retry
1983 delays. If a number of seconds, that delay will be applied to all retries.
1984 If a list, each retry will wait for the corresponding delay before retrying.
1985 When passing a callable or a list, the number of
1986 configured retry delays cannot exceed 50.
1987 retry_jitter_factor: An optional factor that defines the factor to which a
1988 retry can be jittered in order to avoid a "thundering herd".
1989 persist_result: A toggle indicating whether the result of this task
1990 should be persisted to result storage. Defaults to `None`, which
1991 indicates that the global default should be used (which is `True` by
1992 default).
1993 result_storage: An optional block to use to persist the result of this task.
1994 Defaults to the value set in the flow the task is called in.
1995 result_storage_key: An optional key to store the result in storage at when persisted.
1996 Defaults to a unique identifier.
1997 result_serializer: An optional serializer to use to serialize the result of this
1998 task for persistence. Defaults to the value set in the flow the task is
1999 called in.
2000 timeout_seconds: An optional number of seconds indicating a maximum runtime for
2001 the task. If the task exceeds this runtime, it will be marked as failed.
2002 log_prints: If set, `print` statements in the task will be redirected to the
2003 Prefect logger for the task run. Defaults to `None`, which indicates
2004 that the value from the flow should be used.
2005 refresh_cache: If set, cached results for the cache key are not used.
2006 Defaults to `None`, which indicates that a cached result from a previous
2007 execution with matching cache key is used.
2008 on_failure: An optional list of callables to run when the task enters a failed state.
2009 on_completion: An optional list of callables to run when the task enters a completed state.
2010 retry_condition_fn: An optional callable run when a task run returns a Failed state. Should
2011 return `True` if the task should continue to its retry policy (e.g. `retries=3`), and `False` if the task
2012 should end as failed. Defaults to `None`, indicating the task should always continue
2013 to its retry policy.
2014 viz_return_value: An optional value to return when the task dependency tree is visualized.
2015 asset_deps: An optional list of upstream assets that this task depends on.
2017 Returns:
2018 A callable `Task` object which, when called, will submit the task for execution.
2020 Examples:
2021 Define a simple task
2023 ```python
2024 @task
2025 def add(x, y):
2026 return x + y
2027 ```
2029 Define an async task
2031 ```python
2032 @task
2033 async def add(x, y):
2034 return x + y
2035 ```
2037 Define a task with tags and a description
2039 ```python
2040 @task(tags={"a", "b"}, description="This task is empty but its my first!")
2041 def my_task():
2042 pass
2043 ```
2045 Define a task with a custom name
2047 ```python
2048 @task(name="The Ultimate Task")
2049 def my_task():
2050 pass
2051 ```
2053 Define a task that retries 3 times with a 5 second delay between attempts
2055 ```python
2056 from random import randint
2058 @task(retries=3, retry_delay_seconds=5)
2059 def my_task():
2060 x = randint(0, 5)
2061 if x >= 3: # Make a task that fails sometimes
2062 raise ValueError("Retry me please!")
2063 return x
2064 ```
2066 Define a task that is cached for a day based on its inputs
2068 ```python
2069 from prefect.tasks import task_input_hash
2070 from datetime import timedelta
2072 @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
2073 def my_task():
2074 return "hello"
2075 ```
2076 """
2078 if __fn: 2078 ↛ 2108line 2078 didn't jump to line 2108 because the condition on line 2078 was always true1b
2079 return Task( 1b
2080 fn=__fn,
2081 name=name,
2082 description=description,
2083 tags=tags,
2084 version=version,
2085 cache_policy=cache_policy,
2086 cache_key_fn=cache_key_fn,
2087 cache_expiration=cache_expiration,
2088 task_run_name=task_run_name,
2089 retries=retries,
2090 retry_delay_seconds=retry_delay_seconds,
2091 retry_jitter_factor=retry_jitter_factor,
2092 persist_result=persist_result,
2093 result_storage=result_storage,
2094 result_storage_key=result_storage_key,
2095 result_serializer=result_serializer,
2096 cache_result_in_memory=cache_result_in_memory,
2097 timeout_seconds=timeout_seconds,
2098 log_prints=log_prints,
2099 refresh_cache=refresh_cache,
2100 on_completion=on_completion,
2101 on_failure=on_failure,
2102 on_running=on_running,
2103 retry_condition_fn=retry_condition_fn,
2104 viz_return_value=viz_return_value,
2105 asset_deps=asset_deps,
2106 )
2107 else:
2108 return cast(
2109 Callable[[Callable[P, R]], Task[P, R]],
2110 partial(
2111 task,
2112 name=name,
2113 description=description,
2114 tags=tags,
2115 version=version,
2116 cache_policy=cache_policy,
2117 cache_key_fn=cache_key_fn,
2118 cache_expiration=cache_expiration,
2119 task_run_name=task_run_name,
2120 retries=retries,
2121 retry_delay_seconds=retry_delay_seconds,
2122 retry_jitter_factor=retry_jitter_factor,
2123 persist_result=persist_result,
2124 result_storage=result_storage,
2125 result_storage_key=result_storage_key,
2126 result_serializer=result_serializer,
2127 cache_result_in_memory=cache_result_in_memory,
2128 timeout_seconds=timeout_seconds,
2129 log_prints=log_prints,
2130 refresh_cache=refresh_cache,
2131 on_completion=on_completion,
2132 on_failure=on_failure,
2133 on_running=on_running,
2134 retry_condition_fn=retry_condition_fn,
2135 viz_return_value=viz_return_value,
2136 asset_deps=asset_deps,
2137 ),
2138 )
2141class MaterializingTask(Task[P, R]): 1a
2142 """
2143 A task that materializes Assets.
2145 Args:
2146 assets: List of Assets that this task materializes (can be str or Asset)
2147 materialized_by: An optional tool that materialized the asset e.g. "dbt" or "spark"
2148 **task_kwargs: All other Task arguments
2149 """
2151 def __init__( 1a
2152 self,
2153 fn: Callable[P, R],
2154 *,
2155 assets: Sequence[Union[str, Asset]],
2156 materialized_by: str | None = None,
2157 **task_kwargs: Unpack[TaskOptions],
2158 ):
2159 super().__init__(fn=fn, **task_kwargs)
2161 self.assets: list[Asset] = [
2162 Asset(key=a) if isinstance(a, str) else a for a in assets
2163 ]
2164 self.materialized_by = materialized_by
2166 def with_options( 1a
2167 self,
2168 assets: Optional[Sequence[Union[str, Asset]]] = None,
2169 **task_kwargs: Unpack[TaskOptions],
2170 ) -> "MaterializingTask[P, R]":
2171 import inspect
2173 sig = inspect.signature(Task.__init__)
2175 # Map parameter names to attribute names where they differ
2176 # from parameter to attribute.
2177 param_to_attr = {
2178 "on_completion": "on_completion_hooks",
2179 "on_failure": "on_failure_hooks",
2180 "on_running": "on_running_hooks",
2181 "on_rollback": "on_rollback_hooks",
2182 "on_commit": "on_commit_hooks",
2183 }
2185 # Build kwargs for Task constructor
2186 init_kwargs = {}
2187 for param_name in sig.parameters:
2188 if param_name in ("self", "fn", "assets", "materialized_by"):
2189 continue
2191 attr_name = param_to_attr.get(param_name, param_name)
2192 init_kwargs[param_name] = task_kwargs.get(
2193 param_name, getattr(self, attr_name)
2194 )
2196 return MaterializingTask(
2197 fn=self.fn,
2198 assets=(
2199 [Asset(key=a) if isinstance(a, str) else a for a in assets]
2200 if assets is not None
2201 else self.assets
2202 ),
2203 materialized_by=self.materialized_by,
2204 # Now, the rest
2205 **init_kwargs,
2206 )