Coverage for /usr/local/lib/python3.12/site-packages/prefect/tasks.py: 40%

474 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Module containing the base workflow task class and decorator - for most use cases, using the `@task` decorator is preferred. 

3""" 

4 

5# This file requires type-checking with pyright because mypy does not yet support PEP612 

6# See https://github.com/python/mypy/issues/8645 

7from __future__ import annotations 1a

8 

9import datetime 1a

10import inspect 1a

11from copy import copy 1a

12from functools import partial, update_wrapper 1a

13from typing import ( 1a

14 TYPE_CHECKING, 

15 Any, 

16 Awaitable, 

17 Callable, 

18 Coroutine, 

19 Generic, 

20 Iterable, 

21 NoReturn, 

22 Optional, 

23 Protocol, 

24 TypeVar, 

25 Union, 

26 cast, 

27 overload, 

28) 

29from uuid import UUID, uuid4 1a

30 

31from typing_extensions import ( 1a

32 Literal, 

33 ParamSpec, 

34 Self, 

35 Sequence, 

36 TypeAlias, 

37 TypedDict, 

38 TypeIs, 

39 Unpack, 

40) 

41 

42import prefect.states 1a

43from prefect._internal.uuid7 import uuid7 1a

44from prefect.assets import Asset 1a

45from prefect.cache_policies import DEFAULT, NO_CACHE, CachePolicy 1a

46from prefect.client.orchestration import get_client 1a

47from prefect.client.schemas import TaskRun 1a

48from prefect.client.schemas.objects import ( 1a

49 RunInput, 

50 StateDetails, 

51 TaskRunPolicy, 

52 TaskRunResult, 

53) 

54from prefect.context import ( 1a

55 FlowRunContext, 

56 TagsContext, 

57 TaskRunContext, 

58 serialize_context, 

59) 

60from prefect.futures import PrefectDistributedFuture, PrefectFuture, PrefectFutureList 1a

61from prefect.logging.loggers import get_logger 1a

62from prefect.results import ( 1a

63 ResultSerializer, 

64 ResultStorage, 

65 ResultStore, 

66 get_or_create_default_task_scheduling_storage, 

67) 

68from prefect.settings.context import get_current_settings 1a

69from prefect.states import Pending, Scheduled, State 1a

70from prefect.utilities.annotations import NotSet 1a

71from prefect.utilities.asyncutils import run_coro_as_sync, sync_compatible 1a

72from prefect.utilities.callables import ( 1a

73 expand_mapping_parameters, 

74 get_call_parameters, 

75 raise_for_reserved_arguments, 

76) 

77from prefect.utilities.hashing import hash_objects 1a

78from prefect.utilities.importtools import to_qualified_name 1a

79from prefect.utilities.urls import url_for 1a

80 

81if TYPE_CHECKING: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true1a

82 import logging 

83 

84 from prefect.client.orchestration import PrefectClient 

85 from prefect.context import TaskRunContext 

86 from prefect.transactions import Transaction 

87 

88T = TypeVar("T") 1a

89R = TypeVar("R") # The return type of the user's function 1a

90P = ParamSpec("P") # The parameters of the task 1a

91 

92NUM_CHARS_DYNAMIC_KEY = 8 1a

93 

94logger: "logging.Logger" = get_logger("tasks") 1a

95 

96FutureOrResult: TypeAlias = Union[PrefectFuture[T], T] 1a

97OneOrManyFutureOrResult: TypeAlias = Union[ 1a

98 FutureOrResult[T], Iterable[FutureOrResult[T]] 

99] 

100 

101 

102class TaskRunNameCallbackWithParameters(Protocol): 1a

103 @classmethod 1a

104 def is_callback_with_parameters(cls, callable: Callable[..., str]) -> TypeIs[Self]: 1a

105 sig = inspect.signature(callable) 

106 return "parameters" in sig.parameters 

107 

108 def __call__(self, parameters: dict[str, Any]) -> str: ... 108 ↛ exitline 108 didn't return from function '__call__' because 1a

109 

110 

111StateHookCallable: TypeAlias = Callable[ 1a

112 ["Task[..., Any]", TaskRun, State], Union[Awaitable[None], None] 

113] 

114RetryConditionCallable: TypeAlias = Callable[ 1a

115 ["Task[..., Any]", TaskRun, State], Union[Awaitable[bool], bool] 

116] 

117TaskRunNameValueOrCallable: TypeAlias = Union[ 1a

118 Callable[[], str], TaskRunNameCallbackWithParameters, str 

119] 

120 

121 

122class TaskOptions(TypedDict, total=False): 1a

123 """ 

124 A TypedDict representing all available task configuration options. 

125 

126 This can be used with `Unpack` to provide type hints for **kwargs. 

127 """ 

128 

129 name: Optional[str] 1a

130 description: Optional[str] 1a

131 tags: Optional[Iterable[str]] 1a

132 version: Optional[str] 1a

133 cache_policy: Union[CachePolicy, type[NotSet]] 1a

134 cache_key_fn: Union[ 1a

135 Callable[["TaskRunContext", dict[str, Any]], Optional[str]], None 

136 ] 

137 cache_expiration: Optional[datetime.timedelta] 1a

138 task_run_name: Optional[TaskRunNameValueOrCallable] 1a

139 retries: Optional[int] 1a

140 retry_delay_seconds: Union[ 1a

141 float, int, list[float], Callable[[int], list[float]], None 

142 ] 

143 retry_jitter_factor: Optional[float] 1a

144 persist_result: Optional[bool] 1a

145 result_storage: Optional[ResultStorage] 1a

146 result_serializer: Optional[ResultSerializer] 1a

147 result_storage_key: Optional[str] 1a

148 cache_result_in_memory: bool 1a

149 timeout_seconds: Union[int, float, None] 1a

150 log_prints: Optional[bool] 1a

151 refresh_cache: Optional[bool] 1a

152 on_completion: Optional[list[StateHookCallable]] 1a

153 on_failure: Optional[list[StateHookCallable]] 1a

154 on_running: Optional[list[StateHookCallable]] 1a

155 on_rollback: Optional[list[Callable[["Transaction"], None]]] 1a

156 on_commit: Optional[list[Callable[["Transaction"], None]]] 1a

157 retry_condition_fn: Optional[RetryConditionCallable] 1a

158 viz_return_value: Any 1a

159 asset_deps: Optional[list[Union[Asset, str]]] 1a

160 

161 

162def task_input_hash( 1a

163 context: "TaskRunContext", arguments: dict[str, Any] 

164) -> Optional[str]: 

165 """ 

166 A task cache key implementation which hashes all inputs to the task using a JSON or 

167 cloudpickle serializer. If any arguments are not JSON serializable, the pickle 

168 serializer is used as a fallback. If cloudpickle fails, this will return a null key 

169 indicating that a cache key could not be generated for the given inputs. 

170 

171 Arguments: 

172 context: the active `TaskRunContext` 

173 arguments: a dictionary of arguments to be passed to the underlying task 

174 

175 Returns: 

176 a string hash if hashing succeeded, else `None` 

177 """ 

178 return hash_objects( 

179 # We use the task key to get the qualified name for the task and include the 

180 # task functions `co_code` bytes to avoid caching when the underlying function 

181 # changes 

182 context.task.task_key, 

183 context.task.fn.__code__.co_code.hex(), 

184 arguments, 

185 ) 

186 

187 

188def exponential_backoff(backoff_factor: float) -> Callable[[int], list[float]]: 1a

189 """ 

190 A task retry backoff utility that configures exponential backoff for task retries. 

191 The exponential backoff design matches the urllib3 implementation. 

192 

193 Arguments: 

194 backoff_factor: the base delay for the first retry, subsequent retries will 

195 increase the delay time by powers of 2. 

196 

197 Returns: 

198 a callable that can be passed to the task constructor 

199 """ 

200 

201 def retry_backoff_callable(retries: int) -> list[float]: 

202 # no more than 50 retry delays can be configured on a task 

203 retries = min(retries, 50) 

204 

205 return [backoff_factor * max(0, 2**r) for r in range(retries)] 

206 

207 return retry_backoff_callable 

208 

209 

210def _infer_parent_task_runs( 1a

211 flow_run_context: Optional[FlowRunContext], 

212 task_run_context: Optional[TaskRunContext], 

213 parameters: dict[str, Any], 

214) -> list[TaskRunResult]: 

215 """ 

216 Attempt to infer the parent task runs for this task run based on the 

217 provided flow run and task run contexts, as well as any parameters. It is 

218 assumed that the task run is running within those contexts. 

219 If any parameter comes from a running task run, that task run is considered 

220 a parent. This is expected to happen when task inputs are yielded from 

221 generator tasks. 

222 """ 

223 parents: list[TaskRunResult] = [] 

224 

225 # check if this task has a parent task run based on running in another 

226 # task run's existing context. A task run is only considered a parent if 

227 # it is in the same flow run (because otherwise presumably the child is 

228 # in a subflow, so the subflow serves as the parent) or if there is no 

229 # flow run 

230 if task_run_context: 

231 # there is no flow run 

232 if not flow_run_context: 

233 parents.append(TaskRunResult(id=task_run_context.task_run.id)) 

234 # there is a flow run and the task run is in the same flow run 

235 elif flow_run_context and task_run_context.task_run.flow_run_id == getattr( 

236 flow_run_context.flow_run, "id", None 

237 ): 

238 parents.append(TaskRunResult(id=task_run_context.task_run.id)) 

239 

240 # parent dependency tracking: for every provided parameter value, try to 

241 # load the corresponding task run state. If the task run state is still 

242 # running, we consider it a parent task run. Note this is only done if 

243 # there is an active flow run context because dependencies are only 

244 # tracked within the same flow run. 

245 if flow_run_context: 

246 for v in parameters.values(): 

247 upstream_state = None 

248 

249 if isinstance(v, State): 

250 upstream_state = v 

251 elif isinstance(v, PrefectFuture): 

252 upstream_state = v.state 

253 else: 

254 res = flow_run_context.run_results.get(id(v)) 

255 if res: 

256 upstream_state, _ = res 

257 

258 if upstream_state and upstream_state.is_running(): 

259 parents.append( 

260 TaskRunResult(id=upstream_state.state_details.task_run_id) 

261 ) 

262 

263 return parents 

264 

265 

266def _generate_task_key(fn: Callable[..., Any]) -> str: 1a

267 """Generate a task key based on the function name and source code. 

268 

269 We may eventually want some sort of top-level namespace here to 

270 disambiguate tasks with the same function name in different modules, 

271 in a more human-readable way, while avoiding relative import problems (see #12337). 

272 

273 As long as the task implementations are unique (even if named the same), we should 

274 not have any collisions. 

275 

276 Args: 

277 fn: The function to generate a task key for. 

278 """ 

279 if not hasattr(fn, "__qualname__"): 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true1b

280 return to_qualified_name(type(fn)) 

281 

282 qualname = fn.__qualname__.split(".")[-1] 1b

283 

284 try: 1b

285 code_obj = getattr(fn, "__code__", None) 1b

286 if code_obj is None: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true1b

287 code_obj = fn.__call__.__code__ 

288 except AttributeError: 

289 raise AttributeError( 

290 f"{fn} is not a standard Python function object and could not be converted to a task." 

291 ) from None 

292 

293 code_hash = ( 1b

294 h[:NUM_CHARS_DYNAMIC_KEY] if (h := hash_objects(code_obj)) else "unknown" 

295 ) 

296 

297 return f"{qualname}-{code_hash}" 1b

298 

299 

300class Task(Generic[P, R]): 1a

301 """ 

302 A Prefect task definition. 

303 

304 Wraps a function with an entrypoint to the Prefect engine. Calling this class within a flow function 

305 creates a new task run. 

306 

307 To preserve the input and output types, we use the generic type variables P and R for "Parameters" and 

308 "Returns" respectively. 

309 

310 Args: 

311 fn: The function defining the task. 

312 name: An optional name for the task; if not provided, the name will be inferred 

313 from the given function. 

314 description: An optional string description for the task. 

315 tags: An optional set of tags to be associated with runs of this task. These 

316 tags are combined with any tags defined by a `prefect.tags` context at 

317 task runtime. 

318 version: An optional string specifying the version of this task definition 

319 cache_policy: A cache policy that determines the level of caching for this task 

320 cache_key_fn: An optional callable that, given the task run context and call 

321 parameters, generates a string key; if the key matches a previous completed 

322 state, that state result will be restored instead of running the task again. 

323 cache_expiration: An optional amount of time indicating how long cached states 

324 for this task should be restorable; if not provided, cached states will 

325 never expire. 

326 task_run_name: An optional name to distinguish runs of this task; this name can be provided 

327 as a string template with the task's keyword arguments as variables, 

328 or a function that returns a string. 

329 retries: An optional number of times to retry on task run failure. 

330 retry_delay_seconds: Optionally configures how long to wait before retrying the 

331 task after failure. This is only applicable if `retries` is nonzero. This 

332 setting can either be a number of seconds, a list of retry delays, or a 

333 callable that, given the total number of retries, generates a list of retry 

334 delays. If a number of seconds, that delay will be applied to all retries. 

335 If a list, each retry will wait for the corresponding delay before retrying. 

336 When passing a callable or a list, the number of configured retry delays 

337 cannot exceed 50. 

338 retry_jitter_factor: An optional factor that defines the factor to which a retry 

339 can be jittered in order to avoid a "thundering herd". 

340 persist_result: A toggle indicating whether the result of this task 

341 should be persisted to result storage. Defaults to `None`, which 

342 indicates that the global default should be used (which is `True` by 

343 default). 

344 result_storage: An optional block to use to persist the result of this task. 

345 Defaults to the value set in the flow the task is called in. 

346 result_storage_key: An optional key to store the result in storage at when persisted. 

347 Defaults to a unique identifier. 

348 result_serializer: An optional serializer to use to serialize the result of this 

349 task for persistence. Defaults to the value set in the flow the task is 

350 called in. 

351 timeout_seconds: An optional number of seconds indicating a maximum runtime for 

352 the task. If the task exceeds this runtime, it will be marked as failed. 

353 log_prints: If set, `print` statements in the task will be redirected to the 

354 Prefect logger for the task run. Defaults to `None`, which indicates 

355 that the value from the flow should be used. 

356 refresh_cache: If set, cached results for the cache key are not used. 

357 Defaults to `None`, which indicates that a cached result from a previous 

358 execution with matching cache key is used. 

359 on_failure: An optional list of callables to run when the task enters a failed state. 

360 on_completion: An optional list of callables to run when the task enters a completed state. 

361 on_commit: An optional list of callables to run when the task's idempotency record is committed. 

362 on_rollback: An optional list of callables to run when the task rolls back. 

363 retry_condition_fn: An optional callable run when a task run returns a Failed state. Should 

364 return `True` if the task should continue to its retry policy (e.g. `retries=3`), and `False` if the task 

365 should end as failed. Defaults to `None`, indicating the task should always continue 

366 to its retry policy. 

367 viz_return_value: An optional value to return when the task dependency tree is visualized. 

368 asset_deps: An optional list of upstream assets that this task depends on. 

369 """ 

370 

371 # NOTE: These parameters (types, defaults, and docstrings) should be duplicated 

372 # exactly in the @task decorator 

373 def __init__( 1a

374 self, 

375 fn: Callable[P, R] | "classmethod[Any, P, R]" | "staticmethod[P, R]", 

376 name: Optional[str] = None, 

377 description: Optional[str] = None, 

378 tags: Optional[Iterable[str]] = None, 

379 version: Optional[str] = None, 

380 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet, 

381 cache_key_fn: Optional[ 

382 Callable[["TaskRunContext", dict[str, Any]], Optional[str]] 

383 ] = None, 

384 cache_expiration: Optional[datetime.timedelta] = None, 

385 task_run_name: Optional[TaskRunNameValueOrCallable] = None, 

386 retries: Optional[int] = None, 

387 retry_delay_seconds: Optional[ 

388 Union[ 

389 float, 

390 int, 

391 list[float], 

392 Callable[[int], list[float]], 

393 ] 

394 ] = None, 

395 retry_jitter_factor: Optional[float] = None, 

396 persist_result: Optional[bool] = None, 

397 result_storage: Optional[ResultStorage] = None, 

398 result_serializer: Optional[ResultSerializer] = None, 

399 result_storage_key: Optional[str] = None, 

400 cache_result_in_memory: bool = True, 

401 timeout_seconds: Union[int, float, None] = None, 

402 log_prints: Optional[bool] = False, 

403 refresh_cache: Optional[bool] = None, 

404 on_completion: Optional[list[StateHookCallable]] = None, 

405 on_failure: Optional[list[StateHookCallable]] = None, 

406 on_running: Optional[list[StateHookCallable]] = None, 

407 on_rollback: Optional[list[Callable[["Transaction"], None]]] = None, 

408 on_commit: Optional[list[Callable[["Transaction"], None]]] = None, 

409 retry_condition_fn: Optional[RetryConditionCallable] = None, 

410 viz_return_value: Optional[Any] = None, 

411 asset_deps: Optional[list[Union[str, Asset]]] = None, 

412 ): 

413 # Validate if hook passed is list and contains callables 

414 hook_categories = [on_completion, on_failure, on_running] 1b

415 hook_names = ["on_completion", "on_failure", "on_running"] 1b

416 for hooks, hook_name in zip(hook_categories, hook_names): 1b

417 if hooks is not None: 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true1b

418 try: 

419 hooks = list(hooks) 

420 except TypeError: 

421 raise TypeError( 

422 f"Expected iterable for '{hook_name}'; got" 

423 f" {type(hooks).__name__} instead. Please provide a list of" 

424 f" hooks to '{hook_name}':\n\n" 

425 f"@task({hook_name}=[hook1, hook2])\ndef" 

426 " my_task():\n\tpass" 

427 ) 

428 

429 for hook in hooks: 

430 if not callable(hook): 

431 raise TypeError( 

432 f"Expected callables in '{hook_name}'; got" 

433 f" {type(hook).__name__} instead. Please provide a list of" 

434 f" hooks to '{hook_name}':\n\n" 

435 f"@task({hook_name}=[hook1, hook2])\ndef" 

436 " my_task():\n\tpass" 

437 ) 

438 

439 if isinstance(fn, classmethod): 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true1b

440 fn = cast(Callable[P, R], fn.__func__) 

441 self._isclassmethod = True 

442 

443 if isinstance(fn, staticmethod): 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true1b

444 fn = cast(Callable[P, R], fn.__func__) 

445 self._isstaticmethod = True 

446 

447 if not callable(fn): 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true1b

448 raise TypeError("'fn' must be callable") 

449 

450 self.description: str | None = description or inspect.getdoc(fn) 1b

451 update_wrapper(self, fn) 1b

452 self.fn = fn 1b

453 

454 # the task is considered async if its function is async or an async 

455 # generator 

456 self.isasync: bool = inspect.iscoroutinefunction( 1b

457 self.fn 

458 ) or inspect.isasyncgenfunction(self.fn) 

459 

460 # the task is considered a generator if its function is a generator or 

461 # an async generator 

462 self.isgenerator: bool = inspect.isgeneratorfunction( 1b

463 self.fn 

464 ) or inspect.isasyncgenfunction(self.fn) 

465 

466 if not name: 466 ↛ 472line 466 didn't jump to line 472 because the condition on line 466 was always true1b

467 if not hasattr(self.fn, "__name__"): 467 ↛ 468line 467 didn't jump to line 468 because the condition on line 467 was never true1b

468 self.name = type(self.fn).__name__ 

469 else: 

470 self.name = self.fn.__name__ 1b

471 else: 

472 self.name: str = name 

473 

474 if task_run_name is not None: 474 ↛ 475line 474 didn't jump to line 475 because the condition on line 474 was never true1b

475 if not isinstance(task_run_name, str) and not callable(task_run_name): 

476 raise TypeError( 

477 "Expected string or callable for 'task_run_name'; got" 

478 f" {type(task_run_name).__name__} instead." 

479 ) 

480 self.task_run_name = task_run_name 1b

481 

482 self.version = version 1b

483 self.log_prints = log_prints 1b

484 

485 raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"]) 1b

486 

487 self.tags: set[str] = set(tags if tags else []) 1b

488 

489 self.task_key: str = _generate_task_key(self.fn) 1b

490 

491 # determine cache and result configuration 

492 settings = get_current_settings() 1b

493 if settings.tasks.default_no_cache and cache_policy is NotSet: 493 ↛ 494line 493 didn't jump to line 494 because the condition on line 493 was never true1b

494 cache_policy = NO_CACHE 

495 

496 if cache_policy is not NotSet and cache_key_fn is not None: 496 ↛ 497line 496 didn't jump to line 497 because the condition on line 496 was never true1b

497 logger.warning( 

498 f"Both `cache_policy` and `cache_key_fn` are set on task {self}. `cache_key_fn` will be used." 

499 ) 

500 

501 if cache_key_fn: 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true1b

502 cache_policy = CachePolicy.from_cache_key_fn(cache_key_fn) 

503 

504 # TODO: manage expiration and cache refresh 

505 self.cache_key_fn = cache_key_fn 1b

506 self.cache_expiration = cache_expiration 1b

507 self.refresh_cache = refresh_cache 1b

508 

509 # result persistence settings 

510 if persist_result is None: 510 ↛ 525line 510 didn't jump to line 525 because the condition on line 510 was always true1b

511 if any( 511 ↛ 522line 511 didn't jump to line 522 because the condition on line 511 was never true1b

512 [ 

513 cache_policy 

514 and cache_policy != NO_CACHE 

515 and cache_policy != NotSet, 

516 cache_key_fn is not None, 

517 result_storage_key is not None, 

518 result_storage is not None, 

519 result_serializer is not None, 

520 ] 

521 ): 

522 persist_result = True 

523 

524 # Check for global cache disable setting 

525 if settings.tasks.disable_caching: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true1b

526 cache_policy = NO_CACHE 

527 

528 if persist_result is False: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true1b

529 self.cache_policy = None if cache_policy is None else NO_CACHE 

530 if cache_policy and cache_policy is not NotSet and cache_policy != NO_CACHE: 

531 logger.warning( 

532 "Ignoring `cache_policy` because `persist_result` is False" 

533 ) 

534 elif cache_policy is NotSet and result_storage_key is None: 534 ↛ 536line 534 didn't jump to line 536 because the condition on line 534 was always true1b

535 self.cache_policy = DEFAULT 1b

536 elif cache_policy != NO_CACHE and result_storage_key: 

537 # TODO: handle this situation with double storage 

538 self.cache_policy = None 

539 else: 

540 self.cache_policy: Union[CachePolicy, type[NotSet], None] = cache_policy 

541 

542 # TaskRunPolicy settings 

543 # TODO: We can instantiate a `TaskRunPolicy` and add Pydantic bound checks to 

544 # validate that the user passes positive numbers here 

545 

546 self.retries: int = ( 1b

547 retries if retries is not None else settings.tasks.default_retries 

548 ) 

549 if retry_delay_seconds is None: 549 ↛ 552line 549 didn't jump to line 552 because the condition on line 549 was always true1b

550 retry_delay_seconds = settings.tasks.default_retry_delay_seconds 1b

551 

552 if callable(retry_delay_seconds): 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true1b

553 self.retry_delay_seconds = retry_delay_seconds(self.retries) 

554 elif not isinstance(retry_delay_seconds, (list, int, float, type(None))): 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true1b

555 raise TypeError( 

556 f"Invalid `retry_delay_seconds` provided; must be an int, float, list or callable. Received type {type(retry_delay_seconds)}" 

557 ) 

558 else: 

559 self.retry_delay_seconds: Union[float, int, list[float], None] = ( 1b

560 retry_delay_seconds 

561 ) 

562 

563 if isinstance(self.retry_delay_seconds, list) and ( 563 ↛ 566line 563 didn't jump to line 566 because the condition on line 563 was never true1b

564 len(self.retry_delay_seconds) > 50 

565 ): 

566 raise ValueError("Can not configure more than 50 retry delays per task.") 

567 

568 if retry_jitter_factor is not None and retry_jitter_factor < 0: 568 ↛ 569line 568 didn't jump to line 569 because the condition on line 568 was never true1b

569 raise ValueError("`retry_jitter_factor` must be >= 0.") 

570 

571 self.retry_jitter_factor = retry_jitter_factor 1b

572 self.persist_result = persist_result 1b

573 

574 if result_storage and not isinstance(result_storage, str): 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true1b

575 if getattr(result_storage, "_block_document_id", None) is None: 

576 raise TypeError( 

577 "Result storage configuration must be persisted server-side." 

578 " Please call `.save()` on your block before passing it in." 

579 ) 

580 

581 self.result_storage = result_storage 1b

582 self.result_serializer = result_serializer 1b

583 self.result_storage_key = result_storage_key 1b

584 self.cache_result_in_memory = cache_result_in_memory 1b

585 self.timeout_seconds: Union[float, None] = ( 1b

586 float(timeout_seconds) if timeout_seconds else None 

587 ) 

588 self.on_rollback_hooks: list[Callable[["Transaction"], None]] = ( 1b

589 on_rollback or [] 

590 ) 

591 self.on_commit_hooks: list[Callable[["Transaction"], None]] = on_commit or [] 1b

592 self.on_completion_hooks: list[StateHookCallable] = on_completion or [] 1b

593 self.on_failure_hooks: list[StateHookCallable] = on_failure or [] 1b

594 self.on_running_hooks: list[StateHookCallable] = on_running or [] 1b

595 

596 # retry_condition_fn must be a callable or None. If it is neither, raise a TypeError 

597 if retry_condition_fn is not None and not (callable(retry_condition_fn)): 597 ↛ 598line 597 didn't jump to line 598 because the condition on line 597 was never true1b

598 raise TypeError( 

599 "Expected `retry_condition_fn` to be callable, got" 

600 f" {type(retry_condition_fn).__name__} instead." 

601 ) 

602 

603 self.retry_condition_fn = retry_condition_fn 1b

604 self.viz_return_value = viz_return_value 1b

605 

606 from prefect.assets import Asset 1b

607 

608 self.asset_deps: list[Asset] = ( 1b

609 [Asset(key=a) if isinstance(a, str) else a for a in asset_deps] 

610 if asset_deps 

611 else [] 

612 ) 

613 

614 @property 1a

615 def ismethod(self) -> bool: 1a

616 return hasattr(self.fn, "__prefect_self__") 

617 

618 @property 1a

619 def isclassmethod(self) -> bool: 1a

620 return getattr(self, "_isclassmethod", False) 

621 

622 @property 1a

623 def isstaticmethod(self) -> bool: 1a

624 return getattr(self, "_isstaticmethod", False) 

625 

626 def __get__(self, instance: Any, owner: Any) -> "Task[P, R]": 1a

627 """ 

628 Implement the descriptor protocol so that the task can be used as an instance method. 

629 When an instance method is loaded, this method is called with the "self" instance as 

630 an argument. We return a copy of the task with that instance bound to the task's function. 

631 """ 

632 # wrapped function is a classmethod 

633 if self.isclassmethod: 

634 bound_task = copy(self) 

635 setattr(bound_task.fn, "__prefect_cls__", owner) 

636 return bound_task 

637 

638 # if the task is being accessed on an instance, bind the instance to the __prefect_self__ attribute 

639 # of the task's function. This will allow it to be automatically added to the task's parameters 

640 if instance: 

641 bound_task = copy(self) 

642 bound_task.fn.__prefect_self__ = instance # type: ignore[attr-defined] 

643 return bound_task 

644 

645 return self 

646 

647 def with_options( 1a

648 self, 

649 *, 

650 name: Optional[str] = None, 

651 description: Optional[str] = None, 

652 tags: Optional[Iterable[str]] = None, 

653 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet, 

654 cache_key_fn: Optional[ 

655 Callable[["TaskRunContext", dict[str, Any]], Optional[str]] 

656 ] = None, 

657 task_run_name: Optional[ 

658 Union[TaskRunNameValueOrCallable, type[NotSet]] 

659 ] = NotSet, 

660 cache_expiration: Optional[datetime.timedelta] = None, 

661 retries: Union[int, type[NotSet]] = NotSet, 

662 retry_delay_seconds: Union[ 

663 float, 

664 int, 

665 list[float], 

666 Callable[[int], list[float]], 

667 type[NotSet], 

668 ] = NotSet, 

669 retry_jitter_factor: Union[float, type[NotSet]] = NotSet, 

670 persist_result: Union[bool, type[NotSet]] = NotSet, 

671 result_storage: Union[ResultStorage, type[NotSet]] = NotSet, 

672 result_serializer: Union[ResultSerializer, type[NotSet]] = NotSet, 

673 result_storage_key: Union[str, type[NotSet]] = NotSet, 

674 cache_result_in_memory: Optional[bool] = None, 

675 timeout_seconds: Union[int, float, None] = None, 

676 log_prints: Union[bool, type[NotSet]] = NotSet, 

677 refresh_cache: Union[bool, type[NotSet]] = NotSet, 

678 on_completion: Optional[list[StateHookCallable]] = None, 

679 on_failure: Optional[list[StateHookCallable]] = None, 

680 on_running: Optional[list[StateHookCallable]] = None, 

681 retry_condition_fn: Optional[RetryConditionCallable] = None, 

682 viz_return_value: Optional[Any] = None, 

683 asset_deps: Optional[list[Union[str, Asset]]] = None, 

684 ) -> "Task[P, R]": 

685 """ 

686 Create a new task from the current object, updating provided options. 

687 

688 Args: 

689 name: A new name for the task. 

690 description: A new description for the task. 

691 tags: A new set of tags for the task. If given, existing tags are ignored, 

692 not merged. 

693 cache_key_fn: A new cache key function for the task. 

694 cache_expiration: A new cache expiration time for the task. 

695 task_run_name: An optional name to distinguish runs of this task; this name can be provided 

696 as a string template with the task's keyword arguments as variables, 

697 or a function that returns a string. 

698 retries: A new number of times to retry on task run failure. 

699 retry_delay_seconds: Optionally configures how long to wait before retrying 

700 the task after failure. This is only applicable if `retries` is nonzero. 

701 This setting can either be a number of seconds, a list of retry delays, 

702 or a callable that, given the total number of retries, generates a list 

703 of retry delays. If a number of seconds, that delay will be applied to 

704 all retries. If a list, each retry will wait for the corresponding delay 

705 before retrying. When passing a callable or a list, the number of 

706 configured retry delays cannot exceed 50. 

707 retry_jitter_factor: An optional factor that defines the factor to which a 

708 retry can be jittered in order to avoid a "thundering herd". 

709 persist_result: A new option for enabling or disabling result persistence. 

710 result_storage: A new storage type to use for results. 

711 result_serializer: A new serializer to use for results. 

712 result_storage_key: A new key for the persisted result to be stored at. 

713 timeout_seconds: A new maximum time for the task to complete in seconds. 

714 log_prints: A new option for enabling or disabling redirection of `print` statements. 

715 refresh_cache: A new option for enabling or disabling cache refresh. 

716 on_completion: A new list of callables to run when the task enters a completed state. 

717 on_failure: A new list of callables to run when the task enters a failed state. 

718 retry_condition_fn: An optional callable run when a task run returns a Failed state. 

719 Should return `True` if the task should continue to its retry policy, and `False` 

720 if the task should end as failed. Defaults to `None`, indicating the task should 

721 always continue to its retry policy. 

722 viz_return_value: An optional value to return when the task dependency tree is visualized. 

723 

724 Returns: 

725 A new `Task` instance. 

726 

727 Examples: 

728 

729 Create a new task from an existing task and update the name: 

730 

731 ```python 

732 @task(name="My task") 

733 def my_task(): 

734 return 1 

735 

736 new_task = my_task.with_options(name="My new task") 

737 ``` 

738 

739 Create a new task from an existing task and update the retry settings: 

740 

741 ```python 

742 from random import randint 

743 

744 @task(retries=1, retry_delay_seconds=5) 

745 def my_task(): 

746 x = randint(0, 5) 

747 if x >= 3: # Make a task that fails sometimes 

748 raise ValueError("Retry me please!") 

749 return x 

750 

751 new_task = my_task.with_options(retries=5, retry_delay_seconds=2) 

752 ``` 

753 

754 Use a task with updated options within a flow: 

755 

756 ```python 

757 @task(name="My task") 

758 def my_task(): 

759 return 1 

760 

761 @flow 

762 my_flow(): 

763 new_task = my_task.with_options(name="My new task") 

764 new_task() 

765 ``` 

766 

767 """ 

768 return Task( 

769 fn=self.fn, 

770 name=name or self.name, 

771 description=description or self.description, 

772 tags=tags or copy(self.tags), 

773 cache_policy=cache_policy 

774 if cache_policy is not NotSet 

775 else self.cache_policy, 

776 cache_key_fn=cache_key_fn or self.cache_key_fn, 

777 cache_expiration=cache_expiration or self.cache_expiration, 

778 task_run_name=task_run_name 

779 if task_run_name is not NotSet 

780 else self.task_run_name, 

781 retries=retries if retries is not NotSet else self.retries, 

782 retry_delay_seconds=( 

783 retry_delay_seconds 

784 if retry_delay_seconds is not NotSet 

785 else self.retry_delay_seconds 

786 ), 

787 retry_jitter_factor=( 

788 retry_jitter_factor 

789 if retry_jitter_factor is not NotSet 

790 else self.retry_jitter_factor 

791 ), 

792 persist_result=( 

793 persist_result if persist_result is not NotSet else self.persist_result 

794 ), 

795 result_storage=( 

796 result_storage if result_storage is not NotSet else self.result_storage 

797 ), 

798 result_storage_key=( 

799 result_storage_key 

800 if result_storage_key is not NotSet 

801 else self.result_storage_key 

802 ), 

803 result_serializer=( 

804 result_serializer 

805 if result_serializer is not NotSet 

806 else self.result_serializer 

807 ), 

808 cache_result_in_memory=( 

809 cache_result_in_memory 

810 if cache_result_in_memory is not None 

811 else self.cache_result_in_memory 

812 ), 

813 timeout_seconds=( 

814 timeout_seconds if timeout_seconds is not None else self.timeout_seconds 

815 ), 

816 log_prints=(log_prints if log_prints is not NotSet else self.log_prints), 

817 refresh_cache=( 

818 refresh_cache if refresh_cache is not NotSet else self.refresh_cache 

819 ), 

820 on_completion=on_completion or self.on_completion_hooks, 

821 on_failure=on_failure or self.on_failure_hooks, 

822 on_running=on_running or self.on_running_hooks, 

823 retry_condition_fn=retry_condition_fn or self.retry_condition_fn, 

824 viz_return_value=viz_return_value or self.viz_return_value, 

825 asset_deps=asset_deps or self.asset_deps, 

826 ) 

827 

828 def on_completion(self, fn: StateHookCallable) -> StateHookCallable: 1a

829 self.on_completion_hooks.append(fn) 

830 return fn 

831 

832 def on_failure(self, fn: StateHookCallable) -> StateHookCallable: 1a

833 self.on_failure_hooks.append(fn) 

834 return fn 

835 

836 def on_running(self, fn: StateHookCallable) -> StateHookCallable: 1a

837 self.on_running_hooks.append(fn) 

838 return fn 

839 

840 def on_commit( 1a

841 self, fn: Callable[["Transaction"], None] 

842 ) -> Callable[["Transaction"], None]: 

843 self.on_commit_hooks.append(fn) 

844 return fn 

845 

846 def on_rollback( 1a

847 self, fn: Callable[["Transaction"], None] 

848 ) -> Callable[["Transaction"], None]: 

849 self.on_rollback_hooks.append(fn) 

850 return fn 

851 

852 async def create_run( 1a

853 self, 

854 client: Optional["PrefectClient"] = None, 

855 id: Optional[UUID] = None, 

856 parameters: Optional[dict[str, Any]] = None, 

857 flow_run_context: Optional[FlowRunContext] = None, 

858 parent_task_run_context: Optional[TaskRunContext] = None, 

859 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

860 extra_task_inputs: Optional[dict[str, set[RunInput]]] = None, 

861 deferred: bool = False, 

862 ) -> TaskRun: 

863 from prefect.utilities._engine import dynamic_key_for_task_run 

864 from prefect.utilities.engine import collect_task_run_inputs_sync 

865 

866 if flow_run_context is None: 

867 flow_run_context = FlowRunContext.get() 

868 if parent_task_run_context is None: 

869 parent_task_run_context = TaskRunContext.get() 

870 if parameters is None: 

871 parameters = {} 

872 if client is None: 

873 client = get_client() 

874 

875 async with client: 

876 if not flow_run_context: 

877 dynamic_key = f"{self.task_key}-{str(uuid4().hex)}" 

878 task_run_name = self.name 

879 else: 

880 dynamic_key = dynamic_key_for_task_run( 

881 context=flow_run_context, task=self 

882 ) 

883 task_run_name = f"{self.name}-{dynamic_key}" 

884 

885 if deferred: 

886 state = Scheduled() 

887 state.state_details.deferred = True 

888 else: 

889 state = Pending() 

890 

891 # store parameters for background tasks so that task worker 

892 # can retrieve them at runtime 

893 if deferred and (parameters or wait_for): 

894 from prefect.task_worker import store_parameters 

895 

896 parameters_id = uuid4() 

897 state.state_details.task_parameters_id = parameters_id 

898 

899 # TODO: Improve use of result storage for parameter storage / reference 

900 self.persist_result = True 

901 

902 store = await ResultStore( 

903 result_storage=await get_or_create_default_task_scheduling_storage() 

904 ).update_for_task(self) 

905 context = serialize_context() 

906 data: dict[str, Any] = {"context": context} 

907 if parameters: 

908 data["parameters"] = parameters 

909 if wait_for: 

910 data["wait_for"] = wait_for 

911 await store_parameters(store, parameters_id, data) 

912 

913 # collect task inputs 

914 task_inputs = { 

915 k: collect_task_run_inputs_sync(v) for k, v in parameters.items() 

916 } 

917 

918 # collect all parent dependencies 

919 if task_parents := _infer_parent_task_runs( 

920 flow_run_context=flow_run_context, 

921 task_run_context=parent_task_run_context, 

922 parameters=parameters, 

923 ): 

924 task_inputs["__parents__"] = task_parents 

925 

926 # check wait for dependencies 

927 if wait_for: 

928 task_inputs["wait_for"] = collect_task_run_inputs_sync(wait_for) 

929 

930 # Join extra task inputs 

931 for k, extras in (extra_task_inputs or {}).items(): 

932 task_inputs[k] = task_inputs[k].union(extras) 

933 

934 # create the task run 

935 task_run = client.create_task_run( 

936 task=self, 

937 name=task_run_name, 

938 flow_run_id=( 

939 getattr(flow_run_context.flow_run, "id", None) 

940 if flow_run_context and flow_run_context.flow_run 

941 else None 

942 ), 

943 dynamic_key=str(dynamic_key), 

944 id=id, 

945 state=state, 

946 task_inputs=task_inputs, 

947 extra_tags=TagsContext.get().current_tags, 

948 ) 

949 # the new engine uses sync clients but old engines use async clients 

950 if inspect.isawaitable(task_run): 

951 task_run = await task_run 

952 

953 return task_run 

954 

955 async def create_local_run( 1a

956 self, 

957 client: Optional["PrefectClient"] = None, 

958 id: Optional[UUID] = None, 

959 parameters: Optional[dict[str, Any]] = None, 

960 flow_run_context: Optional[FlowRunContext] = None, 

961 parent_task_run_context: Optional[TaskRunContext] = None, 

962 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

963 extra_task_inputs: Optional[dict[str, set[RunInput]]] = None, 

964 deferred: bool = False, 

965 ) -> TaskRun: 

966 from prefect.utilities._engine import dynamic_key_for_task_run 

967 from prefect.utilities.engine import ( 

968 collect_task_run_inputs_sync, 

969 ) 

970 

971 if flow_run_context is None: 

972 flow_run_context = FlowRunContext.get() 

973 if parent_task_run_context is None: 

974 parent_task_run_context = TaskRunContext.get() 

975 if parameters is None: 

976 parameters = {} 

977 if client is None: 

978 client = get_client() 

979 

980 async with client: 

981 if not flow_run_context: 

982 dynamic_key = f"{self.task_key}-{str(uuid4().hex)}" 

983 task_run_name = self.name 

984 else: 

985 dynamic_key = dynamic_key_for_task_run( 

986 context=flow_run_context, task=self, stable=False 

987 ) 

988 task_run_name = f"{self.name}-{dynamic_key[:3]}" 

989 

990 if deferred: 

991 state = Scheduled() 

992 state.state_details.deferred = True 

993 else: 

994 state = Pending() 

995 

996 # store parameters for background tasks so that task worker 

997 # can retrieve them at runtime 

998 if deferred and (parameters or wait_for): 

999 from prefect.task_worker import store_parameters 

1000 

1001 parameters_id = uuid4() 

1002 state.state_details.task_parameters_id = parameters_id 

1003 

1004 # TODO: Improve use of result storage for parameter storage / reference 

1005 self.persist_result = True 

1006 

1007 store = await ResultStore( 

1008 result_storage=await get_or_create_default_task_scheduling_storage() 

1009 ).update_for_task(self) 

1010 context = serialize_context() 

1011 data: dict[str, Any] = {"context": context} 

1012 if parameters: 

1013 data["parameters"] = parameters 

1014 if wait_for: 

1015 data["wait_for"] = wait_for 

1016 await store_parameters(store, parameters_id, data) 

1017 

1018 # collect task inputs 

1019 task_inputs = { 

1020 k: collect_task_run_inputs_sync(v) for k, v in parameters.items() 

1021 } 

1022 

1023 # collect all parent dependencies 

1024 if task_parents := _infer_parent_task_runs( 

1025 flow_run_context=flow_run_context, 

1026 task_run_context=parent_task_run_context, 

1027 parameters=parameters, 

1028 ): 

1029 task_inputs["__parents__"] = task_parents 

1030 

1031 # check wait for dependencies 

1032 if wait_for: 

1033 task_inputs["wait_for"] = collect_task_run_inputs_sync(wait_for) 

1034 

1035 # Join extra task inputs 

1036 for k, extras in (extra_task_inputs or {}).items(): 

1037 task_inputs[k] = task_inputs[k].union(extras) 

1038 

1039 flow_run_id = ( 

1040 getattr(flow_run_context.flow_run, "id", None) 

1041 if flow_run_context and flow_run_context.flow_run 

1042 else None 

1043 ) 

1044 task_run_id = id or uuid7() 

1045 

1046 state = prefect.states.Pending( 

1047 state_details=StateDetails( 

1048 task_run_id=task_run_id, 

1049 flow_run_id=flow_run_id, 

1050 ) 

1051 ) 

1052 task_run = TaskRun( 

1053 id=task_run_id, 

1054 name=task_run_name, 

1055 flow_run_id=flow_run_id, 

1056 task_key=self.task_key, 

1057 dynamic_key=str(dynamic_key), 

1058 task_version=self.version, 

1059 empirical_policy=TaskRunPolicy( 

1060 retries=self.retries, 

1061 retry_delay=self.retry_delay_seconds, 

1062 retry_jitter_factor=self.retry_jitter_factor, 

1063 ), 

1064 tags=list(set(self.tags).union(TagsContext.get().current_tags or [])), 

1065 task_inputs=task_inputs or {}, 

1066 expected_start_time=state.timestamp, 

1067 state_id=state.id, 

1068 state_type=state.type, 

1069 state_name=state.name, 

1070 state=state, 

1071 created=state.timestamp, 

1072 updated=state.timestamp, 

1073 ) 

1074 

1075 return task_run 

1076 

1077 # PRIORITY OVERLOADS: Clean ParamSpec signatures for normal usage (no return_state/wait_for) 

1078 # These preserve full parameter type checking when users call tasks normally 

1079 @overload 1a

1080 def __call__( 1080 ↛ exitline 1080 didn't return from function '__call__' because 1a

1081 self: "Task[P, Coroutine[Any, Any, R]]", 

1082 *args: P.args, 

1083 **kwargs: P.kwargs, 

1084 ) -> Coroutine[Any, Any, R]: ... 

1085 

1086 @overload 1a

1087 def __call__( 1087 ↛ exitline 1087 didn't return from function '__call__' because 1a

1088 self: "Task[P, R]", 

1089 *args: P.args, 

1090 **kwargs: P.kwargs, 

1091 ) -> R: ... 

1092 

1093 @overload 1a

1094 def __call__( 1a

1095 self: "Task[P, NoReturn]", 

1096 *args: P.args, 

1097 **kwargs: P.kwargs, 

1098 ) -> None: 

1099 # `NoReturn` matches if a type can't be inferred for the function which stops a 

1100 # sync function from matching the `Coroutine` overload 

1101 ... 

1102 

1103 # SECONDARY OVERLOADS: With return_state/wait_for using Any 

1104 # When return_state or wait_for are used, we can't preserve ParamSpec semantics, 

1105 # so we use Any for parameters. This is an acceptable tradeoff since these 

1106 # are advanced use cases. 

1107 @overload 1a

1108 def __call__( 1108 ↛ exitline 1108 didn't return from function '__call__' because 1a

1109 self: "Task[..., Coroutine[Any, Any, R]]", 

1110 *args: Any, 

1111 return_state: Literal[False], 

1112 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1113 **kwargs: Any, 

1114 ) -> Coroutine[Any, Any, R]: ... 

1115 

1116 @overload 1a

1117 def __call__( 1117 ↛ exitline 1117 didn't return from function '__call__' because 1a

1118 self: "Task[..., Coroutine[Any, Any, R]]", 

1119 *args: Any, 

1120 return_state: Literal[True], 

1121 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1122 **kwargs: Any, 

1123 ) -> State[R]: ... 

1124 

1125 @overload 1a

1126 def __call__( 1126 ↛ exitline 1126 didn't return from function '__call__' because 1a

1127 self: "Task[..., R]", 

1128 *args: Any, 

1129 return_state: Literal[False], 

1130 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1131 **kwargs: Any, 

1132 ) -> R: ... 

1133 

1134 @overload 1a

1135 def __call__( 1135 ↛ exitline 1135 didn't return from function '__call__' because 1a

1136 self: "Task[..., R]", 

1137 *args: Any, 

1138 return_state: Literal[True], 

1139 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1140 **kwargs: Any, 

1141 ) -> State[R]: ... 

1142 

1143 @overload 1a

1144 def __call__( 1144 ↛ exitline 1144 didn't return from function '__call__' because 1a

1145 self: "Task[..., Coroutine[Any, Any, R]]", 

1146 *args: Any, 

1147 wait_for: OneOrManyFutureOrResult[Any], 

1148 return_state: Literal[False] = False, 

1149 **kwargs: Any, 

1150 ) -> Coroutine[Any, Any, R]: ... 

1151 

1152 @overload 1a

1153 def __call__( 1153 ↛ exitline 1153 didn't return from function '__call__' because 1a

1154 self: "Task[..., R]", 

1155 *args: Any, 

1156 wait_for: OneOrManyFutureOrResult[Any], 

1157 return_state: Literal[False] = False, 

1158 **kwargs: Any, 

1159 ) -> R: ... 

1160 

1161 def __call__( 1a

1162 self: "Union[Task[..., R], Task[..., NoReturn]]", 

1163 *args: Any, 

1164 return_state: bool = False, 

1165 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1166 **kwargs: Any, 

1167 ) -> Union[R, State[R], None]: 

1168 """ 

1169 Run the task and return the result. If `return_state` is True returns 

1170 the result is wrapped in a Prefect State which provides error handling. 

1171 """ 

1172 from prefect.utilities.visualization import ( 

1173 get_task_viz_tracker, 

1174 track_viz_task, 

1175 ) 

1176 

1177 # Convert the call args/kwargs to a parameter dict 

1178 parameters = get_call_parameters(self.fn, args, kwargs) 

1179 

1180 return_type = "state" if return_state else "result" 

1181 

1182 task_run_tracker = get_task_viz_tracker() 

1183 if task_run_tracker: 

1184 return track_viz_task( 

1185 self.isasync, self.name, parameters, self.viz_return_value 

1186 ) 

1187 

1188 from prefect.task_engine import run_task 

1189 

1190 return run_task( 

1191 task=self, 

1192 parameters=parameters, 

1193 wait_for=wait_for, 

1194 return_type=return_type, 

1195 ) 

1196 

1197 @overload 1a

1198 def submit( 1198 ↛ exitline 1198 didn't return from function 'submit' because 1a

1199 self: "Task[P, R]", 

1200 *args: P.args, 

1201 **kwargs: P.kwargs, 

1202 ) -> PrefectFuture[R]: ... 

1203 

1204 @overload 1a

1205 def submit( 1205 ↛ exitline 1205 didn't return from function 'submit' because 1a

1206 self: "Task[P, Coroutine[Any, Any, R]]", 

1207 *args: P.args, 

1208 return_state: Literal[False], 

1209 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1210 **kwargs: P.kwargs, 

1211 ) -> PrefectFuture[R]: ... 

1212 

1213 @overload 1a

1214 def submit( 1214 ↛ exitline 1214 didn't return from function 'submit' because 1a

1215 self: "Task[P, R]", 

1216 *args: P.args, 

1217 return_state: Literal[False], 

1218 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1219 **kwargs: P.kwargs, 

1220 ) -> PrefectFuture[R]: ... 

1221 

1222 @overload 1a

1223 def submit( 1223 ↛ exitline 1223 didn't return from function 'submit' because 1a

1224 self: "Task[P, Coroutine[Any, Any, R]]", 

1225 *args: P.args, 

1226 return_state: Literal[True], 

1227 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1228 **kwargs: P.kwargs, 

1229 ) -> State[R]: ... 

1230 

1231 @overload 1a

1232 def submit( 1232 ↛ exitline 1232 didn't return from function 'submit' because 1a

1233 self: "Task[P, R]", 

1234 *args: P.args, 

1235 return_state: Literal[True], 

1236 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1237 **kwargs: P.kwargs, 

1238 ) -> State[R]: ... 

1239 

1240 def submit( 1a

1241 self: "Union[Task[P, R], Task[P, Coroutine[Any, Any, R]]]", 

1242 *args: Any, 

1243 return_state: bool = False, 

1244 wait_for: Optional[OneOrManyFutureOrResult[Any]] = None, 

1245 **kwargs: Any, 

1246 ): 

1247 """ 

1248 Submit a run of the task to the engine. 

1249 

1250 Will create a new task run in the backing API and submit the task to the flow's 

1251 task runner. This call only blocks execution while the task is being submitted, 

1252 once it is submitted, the flow function will continue executing. 

1253 

1254 This method is always synchronous, even if the underlying user function is asynchronous. 

1255 

1256 Args: 

1257 *args: Arguments to run the task with 

1258 return_state: Return the result of the flow run wrapped in a 

1259 Prefect State. 

1260 wait_for: Upstream task futures to wait for before starting the task 

1261 **kwargs: Keyword arguments to run the task with 

1262 

1263 Returns: 

1264 If `return_state` is False a future allowing asynchronous access to 

1265 the state of the task 

1266 If `return_state` is True a future wrapped in a Prefect State allowing asynchronous access to 

1267 the state of the task 

1268 

1269 Examples: 

1270 

1271 Define a task 

1272 

1273 ```python 

1274 from prefect import task 

1275 @task 

1276 def my_task(): 

1277 return "hello" 

1278 ``` 

1279 

1280 Run a task in a flow 

1281 

1282 ```python 

1283 from prefect import flow 

1284 @flow 

1285 def my_flow(): 

1286 my_task.submit() 

1287 ``` 

1288 

1289 Wait for a task to finish 

1290 

1291 ```python 

1292 @flow 

1293 def my_flow(): 

1294 my_task.submit().wait() 

1295 ``` 

1296 

1297 Use the result from a task in a flow 

1298 

1299 ```python 

1300 @flow 

1301 def my_flow(): 

1302 print(my_task.submit().result()) 

1303 

1304 my_flow() 

1305 # hello 

1306 ``` 

1307 

1308 Run an async task in an async flow 

1309 

1310 ```python 

1311 @task 

1312 async def my_async_task(): 

1313 pass 

1314 

1315 @flow 

1316 async def my_flow(): 

1317 my_async_task.submit() 

1318 ``` 

1319 

1320 Run a sync task in an async flow 

1321 

1322 ```python 

1323 @flow 

1324 async def my_flow(): 

1325 my_task.submit() 

1326 ``` 

1327 

1328 Enforce ordering between tasks that do not exchange data 

1329 

1330 ```python 

1331 @task 

1332 def task_1(): 

1333 pass 

1334 

1335 @task 

1336 def task_2(): 

1337 pass 

1338 

1339 @flow 

1340 def my_flow(): 

1341 x = task_1.submit() 

1342 

1343 # task 2 will wait for task_1 to complete 

1344 y = task_2.submit(wait_for=[x]) 

1345 ``` 

1346 

1347 """ 

1348 

1349 from prefect.utilities.visualization import ( 

1350 VisualizationUnsupportedError, 

1351 get_task_viz_tracker, 

1352 ) 

1353 

1354 # Convert the call args/kwargs to a parameter dict 

1355 parameters = get_call_parameters(self.fn, args, kwargs) 

1356 flow_run_context = FlowRunContext.get() 

1357 

1358 if not flow_run_context: 

1359 raise RuntimeError( 

1360 "Unable to determine task runner to use for submission. If you are" 

1361 " submitting a task outside of a flow, please use `.delay`" 

1362 " to submit the task run for deferred execution." 

1363 ) 

1364 

1365 task_viz_tracker = get_task_viz_tracker() 

1366 if task_viz_tracker: 

1367 raise VisualizationUnsupportedError( 

1368 "`task.submit()` is not currently supported by `flow.visualize()`" 

1369 ) 

1370 

1371 task_runner = flow_run_context.task_runner 

1372 future = task_runner.submit(self, parameters, wait_for) 

1373 if return_state: 

1374 future.wait() 

1375 return future.state 

1376 else: 

1377 return future 

1378 

1379 @overload 1a

1380 def map( 1380 ↛ exitline 1380 didn't return from function 'map' because 1a

1381 self: "Task[P, R]", 

1382 *args: Any, 

1383 return_state: Literal[True], 

1384 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ..., 

1385 deferred: bool = ..., 

1386 **kwargs: Any, 

1387 ) -> list[State[R]]: ... 

1388 

1389 @overload 1a

1390 def map( 1390 ↛ exitline 1390 didn't return from function 'map' because 1a

1391 self: "Task[P, R]", 

1392 *args: Any, 

1393 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ..., 

1394 deferred: bool = ..., 

1395 **kwargs: Any, 

1396 ) -> PrefectFutureList[R]: ... 

1397 

1398 @overload 1a

1399 def map( 1399 ↛ exitline 1399 didn't return from function 'map' because 1a

1400 self: "Task[P, R]", 

1401 *args: Any, 

1402 return_state: Literal[True], 

1403 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ..., 

1404 deferred: bool = ..., 

1405 **kwargs: Any, 

1406 ) -> list[State[R]]: ... 

1407 

1408 @overload 1a

1409 def map( 1409 ↛ exitline 1409 didn't return from function 'map' because 1a

1410 self: "Task[P, R]", 

1411 *args: Any, 

1412 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ..., 

1413 deferred: bool = ..., 

1414 **kwargs: Any, 

1415 ) -> PrefectFutureList[R]: ... 

1416 

1417 @overload 1a

1418 def map( 1418 ↛ exitline 1418 didn't return from function 'map' because 1a

1419 self: "Task[P, Coroutine[Any, Any, R]]", 

1420 *args: Any, 

1421 return_state: Literal[True], 

1422 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ..., 

1423 deferred: bool = ..., 

1424 **kwargs: Any, 

1425 ) -> list[State[R]]: ... 

1426 

1427 @overload 1a

1428 def map( 1428 ↛ exitline 1428 didn't return from function 'map' because 1a

1429 self: "Task[P, Coroutine[Any, Any, R]]", 

1430 *args: Any, 

1431 return_state: Literal[False], 

1432 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = ..., 

1433 deferred: bool = ..., 

1434 **kwargs: Any, 

1435 ) -> PrefectFutureList[R]: ... 

1436 

1437 def map( 1a

1438 self, 

1439 *args: Any, 

1440 return_state: bool = False, 

1441 wait_for: Optional[Iterable[Union[PrefectFuture[R], R]]] = None, 

1442 deferred: bool = False, 

1443 **kwargs: Any, 

1444 ) -> Union[list[State[R]], PrefectFutureList[R]]: 

1445 """ 

1446 Submit a mapped run of the task to a worker. 

1447 

1448 Must be called within a flow run context. Will return a list of futures 

1449 that should be waited on before exiting the flow context to ensure all 

1450 mapped tasks have completed. 

1451 

1452 Must be called with at least one iterable and all iterables must be 

1453 the same length. Any arguments that are not iterable will be treated as 

1454 a static value and each task run will receive the same value. 

1455 

1456 Will create as many task runs as the length of the iterable(s) in the 

1457 backing API and submit the task runs to the flow's task runner. This 

1458 call blocks if given a future as input while the future is resolved. It 

1459 also blocks while the tasks are being submitted, once they are 

1460 submitted, the flow function will continue executing. 

1461 

1462 This method is always synchronous, even if the underlying user function is asynchronous. 

1463 

1464 Args: 

1465 *args: Iterable and static arguments to run the tasks with 

1466 return_state: Return a list of Prefect States that wrap the results 

1467 of each task run. 

1468 wait_for: Upstream task futures to wait for before starting the 

1469 task 

1470 **kwargs: Keyword iterable arguments to run the task with 

1471 

1472 Returns: 

1473 A list of futures allowing asynchronous access to the state of the 

1474 tasks 

1475 

1476 Examples: 

1477 

1478 Define a task 

1479 

1480 ```python 

1481 from prefect import task 

1482 @task 

1483 def my_task(x): 

1484 return x + 1 

1485 ``` 

1486 

1487 Create mapped tasks 

1488 

1489 ```python 

1490 from prefect import flow 

1491 @flow 

1492 def my_flow(): 

1493 return my_task.map([1, 2, 3]) 

1494 ``` 

1495 

1496 Wait for all mapped tasks to finish 

1497 

1498 ```python 

1499 @flow 

1500 def my_flow(): 

1501 futures = my_task.map([1, 2, 3]) 

1502 futures.wait(): 

1503 # Now all of the mapped tasks have finished 

1504 my_task(10) 

1505 ``` 

1506 

1507 Use the result from mapped tasks in a flow 

1508 

1509 ```python 

1510 @flow 

1511 def my_flow(): 

1512 futures = my_task.map([1, 2, 3]) 

1513 for x in futures.result(): 

1514 print(x) 

1515 my_flow() 

1516 # 2 

1517 # 3 

1518 # 4 

1519 ``` 

1520 

1521 Enforce ordering between tasks that do not exchange data 

1522 

1523 ```python 

1524 @task 

1525 def task_1(x): 

1526 pass 

1527 

1528 @task 

1529 def task_2(y): 

1530 pass 

1531 

1532 @flow 

1533 def my_flow(): 

1534 x = task_1.submit() 

1535 

1536 # task 2 will wait for task_1 to complete 

1537 y = task_2.map([1, 2, 3], wait_for=[x]) 

1538 return y 

1539 ``` 

1540 

1541 Use a non-iterable input as a constant across mapped tasks 

1542 

1543 ```python 

1544 @task 

1545 def display(prefix, item): 

1546 print(prefix, item) 

1547 

1548 @flow 

1549 def my_flow(): 

1550 return display.map("Check it out: ", [1, 2, 3]) 

1551 

1552 my_flow() 

1553 # Check it out: 1 

1554 # Check it out: 2 

1555 # Check it out: 3 

1556 ``` 

1557 

1558 Use `unmapped` to treat an iterable argument as a constant 

1559 

1560 ```python 

1561 from prefect import unmapped 

1562 

1563 @task 

1564 def add_n_to_items(items, n): 

1565 return [item + n for item in items] 

1566 

1567 @flow 

1568 def my_flow(): 

1569 return add_n_to_items.map(unmapped([10, 20]), n=[1, 2, 3]) 

1570 

1571 my_flow() 

1572 # [[11, 21], [12, 22], [13, 23]] 

1573 ``` 

1574 """ 

1575 

1576 from prefect.task_runners import TaskRunner 

1577 from prefect.utilities.visualization import ( 

1578 VisualizationUnsupportedError, 

1579 get_task_viz_tracker, 

1580 ) 

1581 

1582 # Convert the call args/kwargs to a parameter dict; do not apply defaults 

1583 # since they should not be mapped over 

1584 parameters = get_call_parameters(self.fn, args, kwargs, apply_defaults=False) 

1585 flow_run_context = FlowRunContext.get() 

1586 

1587 task_viz_tracker = get_task_viz_tracker() 

1588 if task_viz_tracker: 

1589 raise VisualizationUnsupportedError( 

1590 "`task.map()` is not currently supported by `flow.visualize()`" 

1591 ) 

1592 

1593 if deferred: 

1594 parameters_list = expand_mapping_parameters(self.fn, parameters) 

1595 futures = [ 

1596 self.apply_async(kwargs=parameters, wait_for=wait_for) 

1597 for parameters in parameters_list 

1598 ] 

1599 elif task_runner := getattr(flow_run_context, "task_runner", None): 

1600 assert isinstance(task_runner, TaskRunner) 

1601 futures = task_runner.map(self, parameters, wait_for) 

1602 else: 

1603 raise RuntimeError( 

1604 "Unable to determine task runner to use for mapped task runs. If" 

1605 " you are mapping a task outside of a flow, please provide" 

1606 " `deferred=True` to submit the mapped task runs for deferred" 

1607 " execution." 

1608 ) 

1609 if return_state: 

1610 states: list[State[R]] = [] 

1611 for future in futures: 

1612 future.wait() 

1613 states.append(future.state) 

1614 return states 

1615 else: 

1616 return futures 

1617 

1618 # Background task methods 

1619 

1620 def apply_async( 1a

1621 self, 

1622 args: Optional[tuple[Any, ...]] = None, 

1623 kwargs: Optional[dict[str, Any]] = None, 

1624 wait_for: Optional[Iterable[PrefectFuture[R]]] = None, 

1625 dependencies: Optional[dict[str, set[RunInput]]] = None, 

1626 ) -> PrefectDistributedFuture[R]: 

1627 """ 

1628 Create a pending task run for a task worker to execute. 

1629 

1630 Args: 

1631 args: Arguments to run the task with 

1632 kwargs: Keyword arguments to run the task with 

1633 

1634 Returns: 

1635 A PrefectDistributedFuture object representing the pending task run 

1636 

1637 Examples: 

1638 

1639 Define a task 

1640 

1641 ```python 

1642 from prefect import task 

1643 @task 

1644 def my_task(name: str = "world"): 

1645 return f"hello {name}" 

1646 ``` 

1647 

1648 Create a pending task run for the task 

1649 

1650 ```python 

1651 from prefect import flow 

1652 @flow 

1653 def my_flow(): 

1654 my_task.apply_async(("marvin",)) 

1655 ``` 

1656 

1657 Wait for a task to finish 

1658 

1659 ```python 

1660 @flow 

1661 def my_flow(): 

1662 my_task.apply_async(("marvin",)).wait() 

1663 ``` 

1664 

1665 ```python 

1666 @flow 

1667 def my_flow(): 

1668 print(my_task.apply_async(("marvin",)).result()) 

1669 

1670 my_flow() 

1671 # hello marvin 

1672 ``` 

1673 

1674 TODO: Enforce ordering between tasks that do not exchange data 

1675 ```python 

1676 @task 

1677 def task_1(): 

1678 pass 

1679 

1680 @task 

1681 def task_2(): 

1682 pass 

1683 

1684 @flow 

1685 def my_flow(): 

1686 x = task_1.apply_async() 

1687 

1688 # task 2 will wait for task_1 to complete 

1689 y = task_2.apply_async(wait_for=[x]) 

1690 ``` 

1691 

1692 """ 

1693 from prefect.utilities.visualization import ( 

1694 VisualizationUnsupportedError, 

1695 get_task_viz_tracker, 

1696 ) 

1697 

1698 task_viz_tracker = get_task_viz_tracker() 

1699 if task_viz_tracker: 

1700 raise VisualizationUnsupportedError( 

1701 "`task.apply_async()` is not currently supported by `flow.visualize()`" 

1702 ) 

1703 args = args or () 

1704 kwargs = kwargs or {} 

1705 

1706 # Convert the call args/kwargs to a parameter dict 

1707 parameters = get_call_parameters(self.fn, args, kwargs) 

1708 

1709 task_run: TaskRun = run_coro_as_sync( 

1710 self.create_run( 

1711 parameters=parameters, 

1712 deferred=True, 

1713 wait_for=wait_for, 

1714 extra_task_inputs=dependencies, 

1715 ) 

1716 ) # type: ignore 

1717 

1718 from prefect.utilities.engine import emit_task_run_state_change_event 

1719 

1720 # emit a `SCHEDULED` event for the task run 

1721 emit_task_run_state_change_event( 

1722 task_run=task_run, 

1723 initial_state=None, 

1724 validated_state=task_run.state, 

1725 ) 

1726 

1727 if get_current_settings().ui_url and (task_run_url := url_for(task_run)): 

1728 logger.info( 

1729 f"Created task run {task_run.name!r}. View it in the UI at {task_run_url!r}" 

1730 ) 

1731 

1732 return PrefectDistributedFuture(task_run_id=task_run.id) 

1733 

1734 def delay(self, *args: P.args, **kwargs: P.kwargs) -> PrefectDistributedFuture[R]: 1a

1735 """ 

1736 An alias for `apply_async` with simpler calling semantics. 

1737 

1738 Avoids having to use explicit "args" and "kwargs" arguments. Arguments 

1739 will pass through as-is to the task. 

1740 

1741 Examples: 

1742 

1743 Define a task 

1744 

1745 ```python 

1746 from prefect import task 

1747 @task 

1748 def my_task(name: str = "world"): 

1749 return f"hello {name}" 

1750 ``` 

1751 

1752 Create a pending task run for the task 

1753 

1754 ```python 

1755 from prefect import flow 

1756 @flow 

1757 def my_flow(): 

1758 my_task.delay("marvin") 

1759 ``` 

1760 

1761 Wait for a task to finish 

1762 

1763 ```python 

1764 @flow 

1765 def my_flow(): 

1766 my_task.delay("marvin").wait() 

1767 ``` 

1768 

1769 Use the result from a task in a flow 

1770 

1771 ```python 

1772 @flow 

1773 def my_flow(): 

1774 print(my_task.delay("marvin").result()) 

1775 

1776 my_flow() 

1777 # hello marvin 

1778 ``` 

1779 """ 

1780 return self.apply_async(args=args, kwargs=kwargs) 

1781 

1782 @sync_compatible 1a

1783 async def serve(self) -> NoReturn: 1a

1784 """Serve the task using the provided task runner. This method is used to 

1785 establish a websocket connection with the Prefect server and listen for 

1786 submitted task runs to execute. 

1787 

1788 Args: 

1789 task_runner: The task runner to use for serving the task. If not provided, 

1790 the default task runner will be used. 

1791 

1792 Examples: 

1793 Serve a task using the default task runner 

1794 ```python 

1795 @task 

1796 def my_task(): 

1797 return 1 

1798 

1799 my_task.serve() 

1800 ``` 

1801 """ 

1802 from prefect.task_worker import serve 

1803 

1804 await serve(self) 

1805 

1806 

1807@overload 1a

1808def task(__fn: Callable[P, R]) -> Task[P, R]: ... 1808 ↛ exitline 1808 didn't return from function 'task' because 1a

1809 

1810 

1811# see https://github.com/PrefectHQ/prefect/issues/16380 

1812@overload 1a

1813def task( 1813 ↛ exitline 1813 didn't return from function 'task' because 1a

1814 __fn: Literal[None] = None, 

1815 *, 

1816 name: Optional[str] = None, 

1817 description: Optional[str] = None, 

1818 tags: Optional[Iterable[str]] = None, 

1819 version: Optional[str] = None, 

1820 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet, 

1821 cache_key_fn: Optional[ 

1822 Callable[["TaskRunContext", dict[str, Any]], Optional[str]] 

1823 ] = None, 

1824 cache_expiration: Optional[datetime.timedelta] = None, 

1825 task_run_name: Optional[TaskRunNameValueOrCallable] = None, 

1826 retries: int = 0, 

1827 retry_delay_seconds: Union[ 

1828 float, int, list[float], Callable[[int], list[float]], None 

1829 ] = None, 

1830 retry_jitter_factor: Optional[float] = None, 

1831 persist_result: Optional[bool] = None, 

1832 result_storage: Optional[ResultStorage] = None, 

1833 result_storage_key: Optional[str] = None, 

1834 result_serializer: Optional[ResultSerializer] = None, 

1835 cache_result_in_memory: bool = True, 

1836 timeout_seconds: Union[int, float, None] = None, 

1837 log_prints: Optional[bool] = None, 

1838 refresh_cache: Optional[bool] = None, 

1839 on_completion: Optional[list[StateHookCallable]] = None, 

1840 on_failure: Optional[list[StateHookCallable]] = None, 

1841 on_running: Optional[list[StateHookCallable]] = None, 

1842 retry_condition_fn: Optional[RetryConditionCallable] = None, 

1843 viz_return_value: Any = None, 

1844 asset_deps: Optional[list[Union[str, Asset]]] = None, 

1845) -> Callable[[Callable[P, R]], Task[P, R]]: ... 

1846 

1847 

1848# see https://github.com/PrefectHQ/prefect/issues/16380 

1849@overload 1a

1850def task( 1850 ↛ exitline 1850 didn't return from function 'task' because 1a

1851 __fn: Literal[None] = None, 

1852 *, 

1853 name: Optional[str] = None, 

1854 description: Optional[str] = None, 

1855 tags: Optional[Iterable[str]] = None, 

1856 version: Optional[str] = None, 

1857 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet, 

1858 cache_key_fn: Optional[ 

1859 Callable[["TaskRunContext", dict[str, Any]], Optional[str]] 

1860 ] = None, 

1861 cache_expiration: Optional[datetime.timedelta] = None, 

1862 task_run_name: Optional[TaskRunNameValueOrCallable] = None, 

1863 retries: int = 0, 

1864 retry_delay_seconds: Union[ 

1865 float, int, list[float], Callable[[int], list[float]], None 

1866 ] = None, 

1867 retry_jitter_factor: Optional[float] = None, 

1868 persist_result: Optional[bool] = None, 

1869 result_storage: Optional[ResultStorage] = None, 

1870 result_storage_key: Optional[str] = None, 

1871 result_serializer: Optional[ResultSerializer] = None, 

1872 cache_result_in_memory: bool = True, 

1873 timeout_seconds: Union[int, float, None] = None, 

1874 log_prints: Optional[bool] = None, 

1875 refresh_cache: Optional[bool] = None, 

1876 on_completion: Optional[list[StateHookCallable]] = None, 

1877 on_failure: Optional[list[StateHookCallable]] = None, 

1878 on_running: Optional[list[StateHookCallable]] = None, 

1879 retry_condition_fn: Optional[RetryConditionCallable] = None, 

1880 viz_return_value: Any = None, 

1881 asset_deps: Optional[list[Union[str, Asset]]] = None, 

1882) -> Callable[[Callable[P, R]], Task[P, R]]: ... 

1883 

1884 

1885@overload # TODO: do we need this overload? 1a

1886def task( 1886 ↛ exitline 1886 didn't return from function 'task' because 1a

1887 *, 

1888 name: Optional[str] = None, 

1889 description: Optional[str] = None, 

1890 tags: Optional[Iterable[str]] = None, 

1891 version: Optional[str] = None, 

1892 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet, 

1893 cache_key_fn: Optional[ 

1894 Callable[["TaskRunContext", dict[str, Any]], Optional[str]] 

1895 ] = None, 

1896 cache_expiration: Optional[datetime.timedelta] = None, 

1897 task_run_name: Optional[TaskRunNameValueOrCallable] = None, 

1898 retries: int = 0, 

1899 retry_delay_seconds: Union[ 

1900 float, 

1901 int, 

1902 list[float], 

1903 Callable[[int], list[float]], 

1904 ] = 0, 

1905 retry_jitter_factor: Optional[float] = None, 

1906 persist_result: Optional[bool] = None, 

1907 result_storage: Optional[ResultStorage] = None, 

1908 result_storage_key: Optional[str] = None, 

1909 result_serializer: Optional[ResultSerializer] = None, 

1910 cache_result_in_memory: bool = True, 

1911 timeout_seconds: Union[int, float, None] = None, 

1912 log_prints: Optional[bool] = None, 

1913 refresh_cache: Optional[bool] = None, 

1914 on_completion: Optional[list[StateHookCallable]] = None, 

1915 on_failure: Optional[list[StateHookCallable]] = None, 

1916 on_running: Optional[list[StateHookCallable]] = None, 

1917 retry_condition_fn: Optional[RetryConditionCallable] = None, 

1918 viz_return_value: Any = None, 

1919 asset_deps: Optional[list[Union[str, Asset]]] = None, 

1920) -> Callable[[Callable[P, R]], Task[P, R]]: ... 

1921 

1922 

1923def task( 1a

1924 __fn: Optional[Callable[P, R]] = None, 

1925 *, 

1926 name: Optional[str] = None, 

1927 description: Optional[str] = None, 

1928 tags: Optional[Iterable[str]] = None, 

1929 version: Optional[str] = None, 

1930 cache_policy: Union[CachePolicy, type[NotSet]] = NotSet, 

1931 cache_key_fn: Union[ 

1932 Callable[["TaskRunContext", dict[str, Any]], Optional[str]], None 

1933 ] = None, 

1934 cache_expiration: Optional[datetime.timedelta] = None, 

1935 task_run_name: Optional[TaskRunNameValueOrCallable] = None, 

1936 retries: Optional[int] = None, 

1937 retry_delay_seconds: Union[ 

1938 float, int, list[float], Callable[[int], list[float]], None 

1939 ] = None, 

1940 retry_jitter_factor: Optional[float] = None, 

1941 persist_result: Optional[bool] = None, 

1942 result_storage: Optional[ResultStorage] = None, 

1943 result_storage_key: Optional[str] = None, 

1944 result_serializer: Optional[ResultSerializer] = None, 

1945 cache_result_in_memory: bool = True, 

1946 timeout_seconds: Union[int, float, None] = None, 

1947 log_prints: Optional[bool] = None, 

1948 refresh_cache: Optional[bool] = None, 

1949 on_completion: Optional[list[StateHookCallable]] = None, 

1950 on_failure: Optional[list[StateHookCallable]] = None, 

1951 on_running: Optional[list[StateHookCallable]] = None, 

1952 retry_condition_fn: Optional[RetryConditionCallable] = None, 

1953 viz_return_value: Any = None, 

1954 asset_deps: Optional[list[Union[str, Asset]]] = None, 

1955): 

1956 """ 

1957 Decorator to designate a function as a task in a Prefect workflow. 

1958 

1959 This decorator may be used for asynchronous or synchronous functions. 

1960 

1961 Args: 

1962 name: An optional name for the task; if not provided, the name will be inferred 

1963 from the given function. 

1964 description: An optional string description for the task. 

1965 tags: An optional set of tags to be associated with runs of this task. These 

1966 tags are combined with any tags defined by a `prefect.tags` context at 

1967 task runtime. 

1968 version: An optional string specifying the version of this task definition 

1969 cache_key_fn: An optional callable that, given the task run context and call 

1970 parameters, generates a string key; if the key matches a previous completed 

1971 state, that state result will be restored instead of running the task again. 

1972 cache_expiration: An optional amount of time indicating how long cached states 

1973 for this task should be restorable; if not provided, cached states will 

1974 never expire. 

1975 task_run_name: An optional name to distinguish runs of this task; this name can be provided 

1976 as a string template with the task's keyword arguments as variables, 

1977 or a function that returns a string. 

1978 retries: An optional number of times to retry on task run failure 

1979 retry_delay_seconds: Optionally configures how long to wait before retrying the 

1980 task after failure. This is only applicable if `retries` is nonzero. This 

1981 setting can either be a number of seconds, a list of retry delays, or a 

1982 callable that, given the total number of retries, generates a list of retry 

1983 delays. If a number of seconds, that delay will be applied to all retries. 

1984 If a list, each retry will wait for the corresponding delay before retrying. 

1985 When passing a callable or a list, the number of 

1986 configured retry delays cannot exceed 50. 

1987 retry_jitter_factor: An optional factor that defines the factor to which a 

1988 retry can be jittered in order to avoid a "thundering herd". 

1989 persist_result: A toggle indicating whether the result of this task 

1990 should be persisted to result storage. Defaults to `None`, which 

1991 indicates that the global default should be used (which is `True` by 

1992 default). 

1993 result_storage: An optional block to use to persist the result of this task. 

1994 Defaults to the value set in the flow the task is called in. 

1995 result_storage_key: An optional key to store the result in storage at when persisted. 

1996 Defaults to a unique identifier. 

1997 result_serializer: An optional serializer to use to serialize the result of this 

1998 task for persistence. Defaults to the value set in the flow the task is 

1999 called in. 

2000 timeout_seconds: An optional number of seconds indicating a maximum runtime for 

2001 the task. If the task exceeds this runtime, it will be marked as failed. 

2002 log_prints: If set, `print` statements in the task will be redirected to the 

2003 Prefect logger for the task run. Defaults to `None`, which indicates 

2004 that the value from the flow should be used. 

2005 refresh_cache: If set, cached results for the cache key are not used. 

2006 Defaults to `None`, which indicates that a cached result from a previous 

2007 execution with matching cache key is used. 

2008 on_failure: An optional list of callables to run when the task enters a failed state. 

2009 on_completion: An optional list of callables to run when the task enters a completed state. 

2010 retry_condition_fn: An optional callable run when a task run returns a Failed state. Should 

2011 return `True` if the task should continue to its retry policy (e.g. `retries=3`), and `False` if the task 

2012 should end as failed. Defaults to `None`, indicating the task should always continue 

2013 to its retry policy. 

2014 viz_return_value: An optional value to return when the task dependency tree is visualized. 

2015 asset_deps: An optional list of upstream assets that this task depends on. 

2016 

2017 Returns: 

2018 A callable `Task` object which, when called, will submit the task for execution. 

2019 

2020 Examples: 

2021 Define a simple task 

2022 

2023 ```python 

2024 @task 

2025 def add(x, y): 

2026 return x + y 

2027 ``` 

2028 

2029 Define an async task 

2030 

2031 ```python 

2032 @task 

2033 async def add(x, y): 

2034 return x + y 

2035 ``` 

2036 

2037 Define a task with tags and a description 

2038 

2039 ```python 

2040 @task(tags={"a", "b"}, description="This task is empty but its my first!") 

2041 def my_task(): 

2042 pass 

2043 ``` 

2044 

2045 Define a task with a custom name 

2046 

2047 ```python 

2048 @task(name="The Ultimate Task") 

2049 def my_task(): 

2050 pass 

2051 ``` 

2052 

2053 Define a task that retries 3 times with a 5 second delay between attempts 

2054 

2055 ```python 

2056 from random import randint 

2057 

2058 @task(retries=3, retry_delay_seconds=5) 

2059 def my_task(): 

2060 x = randint(0, 5) 

2061 if x >= 3: # Make a task that fails sometimes 

2062 raise ValueError("Retry me please!") 

2063 return x 

2064 ``` 

2065 

2066 Define a task that is cached for a day based on its inputs 

2067 

2068 ```python 

2069 from prefect.tasks import task_input_hash 

2070 from datetime import timedelta 

2071 

2072 @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1)) 

2073 def my_task(): 

2074 return "hello" 

2075 ``` 

2076 """ 

2077 

2078 if __fn: 2078 ↛ 2108line 2078 didn't jump to line 2108 because the condition on line 2078 was always true1b

2079 return Task( 1b

2080 fn=__fn, 

2081 name=name, 

2082 description=description, 

2083 tags=tags, 

2084 version=version, 

2085 cache_policy=cache_policy, 

2086 cache_key_fn=cache_key_fn, 

2087 cache_expiration=cache_expiration, 

2088 task_run_name=task_run_name, 

2089 retries=retries, 

2090 retry_delay_seconds=retry_delay_seconds, 

2091 retry_jitter_factor=retry_jitter_factor, 

2092 persist_result=persist_result, 

2093 result_storage=result_storage, 

2094 result_storage_key=result_storage_key, 

2095 result_serializer=result_serializer, 

2096 cache_result_in_memory=cache_result_in_memory, 

2097 timeout_seconds=timeout_seconds, 

2098 log_prints=log_prints, 

2099 refresh_cache=refresh_cache, 

2100 on_completion=on_completion, 

2101 on_failure=on_failure, 

2102 on_running=on_running, 

2103 retry_condition_fn=retry_condition_fn, 

2104 viz_return_value=viz_return_value, 

2105 asset_deps=asset_deps, 

2106 ) 

2107 else: 

2108 return cast( 

2109 Callable[[Callable[P, R]], Task[P, R]], 

2110 partial( 

2111 task, 

2112 name=name, 

2113 description=description, 

2114 tags=tags, 

2115 version=version, 

2116 cache_policy=cache_policy, 

2117 cache_key_fn=cache_key_fn, 

2118 cache_expiration=cache_expiration, 

2119 task_run_name=task_run_name, 

2120 retries=retries, 

2121 retry_delay_seconds=retry_delay_seconds, 

2122 retry_jitter_factor=retry_jitter_factor, 

2123 persist_result=persist_result, 

2124 result_storage=result_storage, 

2125 result_storage_key=result_storage_key, 

2126 result_serializer=result_serializer, 

2127 cache_result_in_memory=cache_result_in_memory, 

2128 timeout_seconds=timeout_seconds, 

2129 log_prints=log_prints, 

2130 refresh_cache=refresh_cache, 

2131 on_completion=on_completion, 

2132 on_failure=on_failure, 

2133 on_running=on_running, 

2134 retry_condition_fn=retry_condition_fn, 

2135 viz_return_value=viz_return_value, 

2136 asset_deps=asset_deps, 

2137 ), 

2138 ) 

2139 

2140 

2141class MaterializingTask(Task[P, R]): 1a

2142 """ 

2143 A task that materializes Assets. 

2144 

2145 Args: 

2146 assets: List of Assets that this task materializes (can be str or Asset) 

2147 materialized_by: An optional tool that materialized the asset e.g. "dbt" or "spark" 

2148 **task_kwargs: All other Task arguments 

2149 """ 

2150 

2151 def __init__( 1a

2152 self, 

2153 fn: Callable[P, R], 

2154 *, 

2155 assets: Sequence[Union[str, Asset]], 

2156 materialized_by: str | None = None, 

2157 **task_kwargs: Unpack[TaskOptions], 

2158 ): 

2159 super().__init__(fn=fn, **task_kwargs) 

2160 

2161 self.assets: list[Asset] = [ 

2162 Asset(key=a) if isinstance(a, str) else a for a in assets 

2163 ] 

2164 self.materialized_by = materialized_by 

2165 

2166 def with_options( 1a

2167 self, 

2168 assets: Optional[Sequence[Union[str, Asset]]] = None, 

2169 **task_kwargs: Unpack[TaskOptions], 

2170 ) -> "MaterializingTask[P, R]": 

2171 import inspect 

2172 

2173 sig = inspect.signature(Task.__init__) 

2174 

2175 # Map parameter names to attribute names where they differ 

2176 # from parameter to attribute. 

2177 param_to_attr = { 

2178 "on_completion": "on_completion_hooks", 

2179 "on_failure": "on_failure_hooks", 

2180 "on_running": "on_running_hooks", 

2181 "on_rollback": "on_rollback_hooks", 

2182 "on_commit": "on_commit_hooks", 

2183 } 

2184 

2185 # Build kwargs for Task constructor 

2186 init_kwargs = {} 

2187 for param_name in sig.parameters: 

2188 if param_name in ("self", "fn", "assets", "materialized_by"): 

2189 continue 

2190 

2191 attr_name = param_to_attr.get(param_name, param_name) 

2192 init_kwargs[param_name] = task_kwargs.get( 

2193 param_name, getattr(self, attr_name) 

2194 ) 

2195 

2196 return MaterializingTask( 

2197 fn=self.fn, 

2198 assets=( 

2199 [Asset(key=a) if isinstance(a, str) else a for a in assets] 

2200 if assets is not None 

2201 else self.assets 

2202 ), 

2203 materialized_by=self.materialized_by, 

2204 # Now, the rest 

2205 **init_kwargs, 

2206 )