Coverage for /usr/local/lib/python3.12/site-packages/prefect/flows.py: 23%

758 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Module containing the base workflow class and decorator - for most use cases, using the `@flow` decorator is preferred. 

3""" 

4 

5from __future__ import annotations 1a

6 

7# This file requires type-checking with pyright because mypy does not yet support PEP612 

8# See https://github.com/python/mypy/issues/8645 

9import ast 1a

10import asyncio 1a

11import datetime 1a

12import importlib.util 1a

13import inspect 1a

14import os 1a

15import re 1a

16import sys 1a

17import tempfile 1a

18import uuid 1a

19import warnings 1a

20from copy import copy 1a

21from functools import partial, update_wrapper 1a

22from pathlib import Path 1a

23from typing import ( 1a

24 TYPE_CHECKING, 

25 Any, 

26 Awaitable, 

27 Callable, 

28 Coroutine, 

29 Generic, 

30 Iterable, 

31 List, 

32 NoReturn, 

33 Optional, 

34 Protocol, 

35 Tuple, 

36 Type, 

37 TypeVar, 

38 Union, 

39 cast, 

40 overload, 

41) 

42from uuid import UUID 1a

43 

44import pydantic 1a

45from exceptiongroup import BaseExceptionGroup, ExceptionGroup 1a

46from rich.console import Console 1a

47from typing_extensions import Literal, ParamSpec 1a

48 

49from prefect._experimental.sla.objects import SlaTypes 1a

50from prefect._internal.concurrency.api import create_call, from_async 1a

51from prefect._versioning import VersionType 1a

52from prefect.client.schemas.filters import WorkerFilter, WorkerFilterStatus 1a

53from prefect.client.schemas.objects import ConcurrencyLimitConfig, FlowRun 1a

54from prefect.client.utilities import client_injector 1a

55from prefect.events import DeploymentTriggerTypes, TriggerTypes 1a

56from prefect.exceptions import ( 1a

57 InvalidNameError, 

58 MissingFlowError, 

59 ObjectNotFound, 

60 ParameterTypeError, 

61 ScriptError, 

62 TerminationSignal, 

63 UnspecifiedFlowError, 

64) 

65from prefect.filesystems import LocalFileSystem, ReadableDeploymentStorage 1a

66from prefect.futures import PrefectFlowRunFuture, PrefectFuture 1a

67from prefect.logging import get_logger 1a

68from prefect.logging.loggers import flow_run_logger 1a

69from prefect.results import ResultSerializer, ResultStorage 1a

70from prefect.schedules import Schedule 1a

71from prefect.settings import ( 1a

72 PREFECT_DEFAULT_WORK_POOL_NAME, 

73 PREFECT_FLOW_DEFAULT_RETRIES, 

74 PREFECT_FLOW_DEFAULT_RETRY_DELAY_SECONDS, 

75 PREFECT_TESTING_UNIT_TEST_MODE, 

76 PREFECT_UI_URL, 

77) 

78from prefect.states import State 1a

79from prefect.task_runners import TaskRunner, ThreadPoolTaskRunner 1a

80from prefect.types import BANNED_CHARACTERS, WITHOUT_BANNED_CHARACTERS 1a

81from prefect.types.entrypoint import EntrypointType 1a

82from prefect.utilities.annotations import NotSet 1a

83from prefect.utilities.asyncutils import ( 1a

84 run_coro_as_sync, 

85 run_sync_in_worker_thread, 

86 sync_compatible, 

87) 

88from prefect.utilities.callables import ( 1a

89 ParameterSchema, 

90 get_call_parameters, 

91 parameter_schema, 

92 parameters_to_args_kwargs, 

93 raise_for_reserved_arguments, 

94) 

95from prefect.utilities.collections import listrepr, visit_collection 1a

96from prefect.utilities.filesystem import relative_path_to_current_platform 1a

97from prefect.utilities.hashing import file_hash 1a

98from prefect.utilities.importtools import import_object, safe_load_namespace 1a

99 

100from ._internal.compatibility.async_dispatch import async_dispatch, is_in_async_context 1a

101from ._internal.pydantic.v2_schema import is_v2_type 1a

102from ._internal.pydantic.validated_func import ValidatedFunction 1a

103 

104if TYPE_CHECKING: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true1a

105 from prefect.docker.docker_image import DockerImage 

106 from prefect.workers.base import BaseWorker 

107 

108 

109T = TypeVar("T") # Generic type var for capturing the inner return type of async funcs 1a

110R = TypeVar("R") # The return type of the user's function 1a

111P = ParamSpec("P") # The parameters of the flow 1a

112F = TypeVar("F", bound="Flow[Any, Any]") # The type of the flow 1a

113 

114 

115class FlowStateHook(Protocol, Generic[P, R]): 1a

116 """ 

117 A callable that is invoked when a flow enters a given state. 

118 """ 

119 

120 __name__: str 1a

121 

122 def __call__( 122 ↛ exitline 122 didn't return from function '__call__' because 1a

123 self, flow: Flow[P, R], flow_run: FlowRun, state: State 

124 ) -> Awaitable[None] | None: ... 

125 

126 

127if TYPE_CHECKING: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true1a

128 import logging 

129 

130 from prefect.client.orchestration import PrefectClient 

131 from prefect.client.schemas.objects import FlowRun 

132 from prefect.client.types.flexible_schedule_list import FlexibleScheduleList 

133 from prefect.deployments.runner import RunnerDeployment 

134 from prefect.runner.storage import RunnerStorage 

135 

136logger: "logging.Logger" = get_logger("flows") 1a

137 

138 

139class Flow(Generic[P, R]): 1a

140 """ 

141 A Prefect workflow definition. 

142 

143 Wraps a function with an entrypoint to the Prefect engine. To preserve the input 

144 and output types, we use the generic type variables `P` and `R` for "Parameters" and 

145 "Returns" respectively. 

146 

147 Args: 

148 fn: The function defining the workflow. 

149 name: An optional name for the flow; if not provided, the name will be inferred 

150 from the given function. 

151 version: An optional version string for the flow; if not provided, we will 

152 attempt to create a version string as a hash of the file containing the 

153 wrapped function; if the file cannot be located, the version will be null. 

154 flow_run_name: An optional name to distinguish runs of this flow; this name can 

155 be provided as a string template with the flow's parameters as variables, 

156 or a function that returns a string. 

157 task_runner: An optional task runner to use for task execution within the flow; 

158 if not provided, a `ThreadPoolTaskRunner` will be used. 

159 description: An optional string description for the flow; if not provided, the 

160 description will be pulled from the docstring for the decorated function. 

161 timeout_seconds: An optional number of seconds indicating a maximum runtime for 

162 the flow. If the flow exceeds this runtime, it will be marked as failed. 

163 Flow execution may continue until the next task is called. 

164 validate_parameters: By default, parameters passed to flows are validated by 

165 Pydantic. This will check that input values conform to the annotated types 

166 on the function. Where possible, values will be coerced into the correct 

167 type; for example, if a parameter is defined as `x: int` and "5" is passed, 

168 it will be resolved to `5`. If set to `False`, no validation will be 

169 performed on flow parameters. 

170 retries: An optional number of times to retry on flow run failure. 

171 retry_delay_seconds: An optional number of seconds to wait before retrying the 

172 flow after failure. This is only applicable if `retries` is nonzero. 

173 persist_result: An optional toggle indicating whether the result of this flow 

174 should be persisted to result storage. Defaults to `None`, which indicates 

175 that Prefect should choose whether the result should be persisted depending on 

176 the features being used. 

177 result_storage: An optional block to use to persist the result of this flow. 

178 This value will be used as the default for any tasks in this flow. 

179 If not provided, the local file system will be used unless called as 

180 a subflow, at which point the default will be loaded from the parent flow. 

181 result_serializer: An optional serializer to use to serialize the result of this 

182 flow for persistence. This value will be used as the default for any tasks 

183 in this flow. If not provided, the value of `PREFECT_RESULTS_DEFAULT_SERIALIZER` 

184 will be used unless called as a subflow, at which point the default will be 

185 loaded from the parent flow. 

186 on_failure: An optional list of callables to run when the flow enters a failed state. 

187 on_completion: An optional list of callables to run when the flow enters a completed state. 

188 on_cancellation: An optional list of callables to run when the flow enters a cancelling state. 

189 on_crashed: An optional list of callables to run when the flow enters a crashed state. 

190 on_running: An optional list of callables to run when the flow enters a running state. 

191 """ 

192 

193 # NOTE: These parameters (types, defaults, and docstrings) should be duplicated 

194 # exactly in the @flow decorator 

195 def __init__( 1a

196 self, 

197 fn: Callable[P, R] | "classmethod[Any, P, R]" | "staticmethod[P, R]", 

198 name: Optional[str] = None, 

199 version: Optional[str] = None, 

200 flow_run_name: Optional[Union[Callable[[], str], str]] = None, 

201 retries: Optional[int] = None, 

202 retry_delay_seconds: Optional[Union[int, float]] = None, 

203 task_runner: Union[ 

204 Type[TaskRunner[PrefectFuture[Any]]], TaskRunner[PrefectFuture[Any]], None 

205 ] = None, 

206 description: Optional[str] = None, 

207 timeout_seconds: Union[int, float, None] = None, 

208 validate_parameters: bool = True, 

209 persist_result: Optional[bool] = None, 

210 result_storage: Optional[Union[ResultStorage, str]] = None, 

211 result_serializer: Optional[ResultSerializer] = None, 

212 cache_result_in_memory: bool = True, 

213 log_prints: Optional[bool] = None, 

214 on_completion: Optional[list[FlowStateHook[P, R]]] = None, 

215 on_failure: Optional[list[FlowStateHook[P, R]]] = None, 

216 on_cancellation: Optional[list[FlowStateHook[P, R]]] = None, 

217 on_crashed: Optional[list[FlowStateHook[P, R]]] = None, 

218 on_running: Optional[list[FlowStateHook[P, R]]] = None, 

219 ): 

220 if name is not None and not isinstance(name, str): # pyright: ignore[reportUnnecessaryIsInstance] 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true1a

221 raise TypeError( 

222 "Expected string for flow parameter 'name'; got {} instead. {}".format( 

223 type(name).__name__, 

224 ( 

225 "Perhaps you meant to call it? e.g." 

226 " '@flow(name=get_flow_run_name())'" 

227 if callable(name) 

228 else "" 

229 ), 

230 ) 

231 ) 

232 

233 # Validate if hook passed is list and contains callables 

234 hook_categories = [ 1a

235 on_completion, 

236 on_failure, 

237 on_cancellation, 

238 on_crashed, 

239 on_running, 

240 ] 

241 hook_names = [ 1a

242 "on_completion", 

243 "on_failure", 

244 "on_cancellation", 

245 "on_crashed", 

246 "on_running", 

247 ] 

248 for hooks, hook_name in zip(hook_categories, hook_names): 1a

249 if hooks is not None: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true1a

250 try: 

251 hooks = list(hooks) 

252 except TypeError: 

253 raise TypeError( 

254 f"Expected iterable for '{hook_name}'; got" 

255 f" {type(hooks).__name__} instead. Please provide a list of" 

256 f" hooks to '{hook_name}':\n\n" 

257 f"@flow({hook_name}=[hook1, hook2])\ndef" 

258 " my_flow():\n\tpass" 

259 ) 

260 

261 for hook in hooks: 

262 if not callable(hook): 

263 raise TypeError( 

264 f"Expected callables in '{hook_name}'; got" 

265 f" {type(hook).__name__} instead. Please provide a list of" 

266 f" hooks to '{hook_name}':\n\n" 

267 f"@flow({hook_name}=[hook1, hook2])\ndef" 

268 " my_flow():\n\tpass" 

269 ) 

270 

271 if isinstance(fn, classmethod): 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true1a

272 fn = cast(Callable[P, R], fn.__func__) 

273 self._isclassmethod = True 

274 

275 if isinstance(fn, staticmethod): 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true1a

276 fn = cast(Callable[P, R], fn.__func__) 

277 self._isstaticmethod = True 

278 

279 if not callable(fn): 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true1a

280 raise TypeError("'fn' must be callable") 

281 

282 self.name: str = name or fn.__name__.replace("_", "-").replace( 1a

283 "<lambda>", 

284 "unknown-lambda", # prefect API will not accept "<" or ">" in flow names 

285 ) 

286 _raise_on_name_with_banned_characters(self.name) 1a

287 

288 if flow_run_name is not None: 288 ↛ 289line 288 didn't jump to line 289 because the condition on line 288 was never true1a

289 if not isinstance(flow_run_name, str) and not callable(flow_run_name): 

290 raise TypeError( 

291 "Expected string or callable for 'flow_run_name'; got" 

292 f" {type(flow_run_name).__name__} instead." 

293 ) 

294 self.flow_run_name = flow_run_name 1a

295 

296 if task_runner is None: 296 ↛ 301line 296 didn't jump to line 301 because the condition on line 296 was always true1a

297 self.task_runner: TaskRunner[PrefectFuture[Any]] = cast( 1a

298 TaskRunner[PrefectFuture[Any]], ThreadPoolTaskRunner() 

299 ) 

300 else: 

301 self.task_runner: TaskRunner[PrefectFuture[Any]] = ( 

302 task_runner() if isinstance(task_runner, type) else task_runner 

303 ) 

304 

305 self.log_prints = log_prints 1a

306 

307 self.description: str | None = description or inspect.getdoc(fn) 1a

308 update_wrapper(self, fn) 1a

309 self.fn = fn 1a

310 

311 # the flow is considered async if its function is async or an async 

312 # generator 

313 self.isasync: bool = inspect.iscoroutinefunction( 1a

314 self.fn 

315 ) or inspect.isasyncgenfunction(self.fn) 

316 

317 # the flow is considered a generator if its function is a generator or 

318 # an async generator 

319 self.isgenerator: bool = inspect.isgeneratorfunction( 1a

320 self.fn 

321 ) or inspect.isasyncgenfunction(self.fn) 

322 

323 raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"]) 1a

324 

325 # Version defaults to a hash of the function's file 

326 if not version: 326 ↛ 334line 326 didn't jump to line 334 because the condition on line 326 was always true1a

327 try: 1a

328 flow_file = inspect.getsourcefile(self.fn) 1a

329 if flow_file is None: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true1a

330 raise FileNotFoundError 

331 version = file_hash(flow_file) 1a

332 except (FileNotFoundError, TypeError, OSError): 

333 pass # `getsourcefile` can return null values and "<stdin>" for objects in repls 

334 self.version = version 1a

335 

336 self.timeout_seconds: float | None = ( 1a

337 float(timeout_seconds) if timeout_seconds else None 

338 ) 

339 

340 # FlowRunPolicy settings 

341 # TODO: We can instantiate a `FlowRunPolicy` and add Pydantic bound checks to 

342 # validate that the user passes positive numbers here 

343 self.retries: int = ( 1a

344 retries if retries is not None else PREFECT_FLOW_DEFAULT_RETRIES.value() 

345 ) 

346 

347 self.retry_delay_seconds: float | int = ( 1a

348 retry_delay_seconds 

349 if retry_delay_seconds is not None 

350 else PREFECT_FLOW_DEFAULT_RETRY_DELAY_SECONDS.value() 

351 ) 

352 

353 self.parameters: ParameterSchema = parameter_schema(self.fn) 1a

354 self.should_validate_parameters = validate_parameters 1a

355 

356 if self.should_validate_parameters: 356 ↛ 370line 356 didn't jump to line 370 because the condition on line 356 was always true1a

357 # Try to create the validated function now so that incompatibility can be 

358 # raised at declaration time rather than at runtime 

359 # We cannot, however, store the validated function on the flow because it 

360 # is not picklable in some environments 

361 try: 1a

362 ValidatedFunction(self.fn, config={"arbitrary_types_allowed": True}) 1a

363 except Exception as exc: 

364 raise ValueError( 

365 "Flow function is not compatible with `validate_parameters`. " 

366 "Disable validation or change the argument names." 

367 ) from exc 

368 

369 # result persistence settings 

370 if persist_result is None: 370 ↛ 374line 370 didn't jump to line 374 because the condition on line 370 was always true1a

371 if result_storage is not None or result_serializer is not None: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true1a

372 persist_result = True 

373 

374 self.persist_result = persist_result 1a

375 if result_storage and not isinstance(result_storage, str): 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true1a

376 if getattr(result_storage, "_block_document_id", None) is None: 

377 raise TypeError( 

378 "Result storage configuration must be persisted server-side." 

379 " Please call `.save()` on your block before passing it in." 

380 ) 

381 self.result_storage = result_storage 1a

382 self.result_serializer = result_serializer 1a

383 self.cache_result_in_memory = cache_result_in_memory 1a

384 self.on_completion_hooks: list[FlowStateHook[P, R]] = on_completion or [] 1a

385 self.on_failure_hooks: list[FlowStateHook[P, R]] = on_failure or [] 1a

386 self.on_cancellation_hooks: list[FlowStateHook[P, R]] = on_cancellation or [] 1a

387 self.on_crashed_hooks: list[FlowStateHook[P, R]] = on_crashed or [] 1a

388 self.on_running_hooks: list[FlowStateHook[P, R]] = on_running or [] 1a

389 

390 # Used for flows loaded from remote storage 

391 self._storage: Optional["RunnerStorage"] = None 1a

392 self._entrypoint: Optional[str] = None 1a

393 

394 module = fn.__module__ 1a

395 if module and (module == "__main__" or module.startswith("__prefect_loader_")): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true1a

396 module_name = inspect.getfile(fn) 

397 module = module_name if module_name != "__main__" else module 

398 

399 self._entrypoint = f"{module}:{getattr(fn, '__qualname__', fn.__name__)}" 1a

400 

401 @property 1a

402 def ismethod(self) -> bool: 1a

403 return hasattr(self.fn, "__prefect_self__") 

404 

405 @property 1a

406 def isclassmethod(self) -> bool: 1a

407 return getattr(self, "_isclassmethod", False) 

408 

409 @property 1a

410 def isstaticmethod(self) -> bool: 1a

411 return getattr(self, "_isstaticmethod", False) 

412 

413 def __get__(self, instance: Any, owner: Any) -> "Flow[P, R]": 1a

414 """ 

415 Implement the descriptor protocol so that the flow can be used as an instance or class method. 

416 When an instance method is loaded, this method is called with the "self" instance as 

417 an argument. We return a copy of the flow with that instance bound to the flow's function. 

418 """ 

419 # wrapped function is a classmethod 

420 if self.isclassmethod: 

421 bound_task = copy(self) 

422 setattr(bound_task.fn, "__prefect_cls__", owner) 

423 return bound_task 

424 

425 # if the task is being accessed on an instance, bind the instance to the __prefect_self__ attribute 

426 # of the task's function. This will allow it to be automatically added to the task's parameters 

427 if instance: 

428 bound_task = copy(self) 

429 bound_task.fn.__prefect_self__ = instance # type: ignore[attr-defined] 

430 return bound_task 

431 

432 return self 

433 

434 def with_options( 1a

435 self, 

436 *, 

437 name: Optional[str] = None, 

438 version: Optional[str] = None, 

439 retries: Optional[int] = None, 

440 retry_delay_seconds: Optional[Union[int, float]] = None, 

441 description: Optional[str] = None, 

442 flow_run_name: Optional[Union[Callable[[], str], str]] = None, 

443 task_runner: Union[ 

444 Type[TaskRunner[PrefectFuture[Any]]], TaskRunner[PrefectFuture[Any]], None 

445 ] = None, 

446 timeout_seconds: Union[int, float, None] = None, 

447 validate_parameters: Optional[bool] = None, 

448 persist_result: Optional[bool] = NotSet, # type: ignore 

449 result_storage: Optional[ResultStorage] = NotSet, # type: ignore 

450 result_serializer: Optional[ResultSerializer] = NotSet, # type: ignore 

451 cache_result_in_memory: Optional[bool] = None, 

452 log_prints: Optional[bool] = NotSet, # type: ignore 

453 on_completion: Optional[list[FlowStateHook[P, R]]] = None, 

454 on_failure: Optional[list[FlowStateHook[P, R]]] = None, 

455 on_cancellation: Optional[list[FlowStateHook[P, R]]] = None, 

456 on_crashed: Optional[list[FlowStateHook[P, R]]] = None, 

457 on_running: Optional[list[FlowStateHook[P, R]]] = None, 

458 ) -> "Flow[P, R]": 

459 """ 

460 Create a new flow from the current object, updating provided options. 

461 

462 Args: 

463 name: A new name for the flow. 

464 version: A new version for the flow. 

465 description: A new description for the flow. 

466 flow_run_name: An optional name to distinguish runs of this flow; this name 

467 can be provided as a string template with the flow's parameters as variables, 

468 or a function that returns a string. 

469 task_runner: A new task runner for the flow. 

470 timeout_seconds: A new number of seconds to fail the flow after if still 

471 running. 

472 validate_parameters: A new value indicating if flow calls should validate 

473 given parameters. 

474 retries: A new number of times to retry on flow run failure. 

475 retry_delay_seconds: A new number of seconds to wait before retrying the 

476 flow after failure. This is only applicable if `retries` is nonzero. 

477 persist_result: A new option for enabling or disabling result persistence. 

478 result_storage: A new storage type to use for results. 

479 result_serializer: A new serializer to use for results. 

480 cache_result_in_memory: A new value indicating if the flow's result should 

481 be cached in memory. 

482 on_failure: A new list of callables to run when the flow enters a failed state. 

483 on_completion: A new list of callables to run when the flow enters a completed state. 

484 on_cancellation: A new list of callables to run when the flow enters a cancelling state. 

485 on_crashed: A new list of callables to run when the flow enters a crashed state. 

486 on_running: A new list of callables to run when the flow enters a running state. 

487 

488 Returns: 

489 A new `Flow` instance. 

490 

491 Examples: 

492 

493 Create a new flow from an existing flow and update the name: 

494 

495 ```python 

496 from prefect import flow 

497 

498 @flow(name="My flow") 

499 def my_flow(): 

500 return 1 

501 

502 new_flow = my_flow.with_options(name="My new flow") 

503 ``` 

504 

505 Create a new flow from an existing flow, update the task runner, and call 

506 it without an intermediate variable: 

507 

508 ```python 

509 from prefect.task_runners import ThreadPoolTaskRunner 

510 

511 @flow 

512 def my_flow(x, y): 

513 return x + y 

514 

515 state = my_flow.with_options(task_runner=ThreadPoolTaskRunner)(1, 3) 

516 assert state.result() == 4 

517 ``` 

518 """ 

519 new_task_runner = ( 

520 task_runner() if isinstance(task_runner, type) else task_runner 

521 ) 

522 if new_task_runner is None: 

523 new_task_runner = self.task_runner 

524 new_flow = Flow( 

525 fn=self.fn, 

526 name=name or self.name, 

527 description=description or self.description, 

528 flow_run_name=flow_run_name or self.flow_run_name, 

529 version=version or self.version, 

530 task_runner=new_task_runner, 

531 retries=retries if retries is not None else self.retries, 

532 retry_delay_seconds=( 

533 retry_delay_seconds 

534 if retry_delay_seconds is not None 

535 else self.retry_delay_seconds 

536 ), 

537 timeout_seconds=( 

538 timeout_seconds if timeout_seconds is not None else self.timeout_seconds 

539 ), 

540 validate_parameters=( 

541 validate_parameters 

542 if validate_parameters is not None 

543 else self.should_validate_parameters 

544 ), 

545 persist_result=( 

546 persist_result if persist_result is not NotSet else self.persist_result 

547 ), 

548 result_storage=( 

549 result_storage if result_storage is not NotSet else self.result_storage 

550 ), 

551 result_serializer=( 

552 result_serializer 

553 if result_serializer is not NotSet 

554 else self.result_serializer 

555 ), 

556 cache_result_in_memory=( 

557 cache_result_in_memory 

558 if cache_result_in_memory is not None 

559 else self.cache_result_in_memory 

560 ), 

561 log_prints=log_prints if log_prints is not NotSet else self.log_prints, 

562 on_completion=on_completion or self.on_completion_hooks, 

563 on_failure=on_failure or self.on_failure_hooks, 

564 on_cancellation=on_cancellation or self.on_cancellation_hooks, 

565 on_crashed=on_crashed or self.on_crashed_hooks, 

566 on_running=on_running or self.on_running_hooks, 

567 ) 

568 new_flow._storage = self._storage 

569 new_flow._entrypoint = self._entrypoint 

570 return new_flow 

571 

572 def validate_parameters(self, parameters: dict[str, Any]) -> dict[str, Any]: 1a

573 """ 

574 Validate parameters for compatibility with the flow by attempting to cast the inputs to the 

575 associated types specified by the function's type annotations. 

576 

577 Returns: 

578 A new dict of parameters that have been cast to the appropriate types 

579 

580 Raises: 

581 ParameterTypeError: if the provided parameters are not valid 

582 """ 

583 

584 def resolve_block_reference(data: Any | dict[str, Any]) -> Any: 

585 if isinstance(data, dict) and "$ref" in data: 

586 from prefect.blocks.core import Block 

587 

588 return Block.load_from_ref(data["$ref"], _sync=True) 

589 return data 

590 

591 try: 

592 parameters = visit_collection( 

593 parameters, resolve_block_reference, return_data=True 

594 ) 

595 except (ValueError, RuntimeError) as exc: 

596 raise ParameterTypeError( 

597 "Failed to resolve block references in parameters." 

598 ) from exc 

599 

600 args, kwargs = parameters_to_args_kwargs(self.fn, parameters) 

601 

602 if sys.version_info >= (3, 14): # Pydantic v1 is not supported in Python 3.14+ 

603 has_v1_models = False 

604 else: 

605 from pydantic.v1 import BaseModel as V1BaseModel 

606 

607 with warnings.catch_warnings(): 

608 warnings.filterwarnings( 

609 "ignore", category=pydantic.warnings.PydanticDeprecatedSince20 

610 ) 

611 has_v1_models = any(isinstance(o, V1BaseModel) for o in args) or any( 

612 isinstance(o, V1BaseModel) for o in kwargs.values() 

613 ) 

614 

615 has_v2_types = any(is_v2_type(o) for o in args) or any( 

616 is_v2_type(o) for o in kwargs.values() 

617 ) 

618 

619 if has_v1_models and has_v2_types: 

620 raise ParameterTypeError( 

621 "Cannot mix Pydantic v1 and v2 types as arguments to a flow." 

622 ) 

623 

624 try: 

625 if has_v1_models: 

626 from pydantic.v1.decorator import ( 

627 ValidatedFunction as V1ValidatedFunction, 

628 ) 

629 

630 validated_fn = V1ValidatedFunction( 

631 self.fn, config=dict(arbitrary_types_allowed=True) 

632 ) 

633 with warnings.catch_warnings(): 

634 warnings.filterwarnings( 

635 "ignore", category=pydantic.warnings.PydanticDeprecatedSince20 

636 ) 

637 model = validated_fn.init_model_instance(*args, **kwargs) 

638 

639 # Get the updated parameter dict with cast values from the model 

640 cast_parameters = { 

641 k: v 

642 for k, v in dict(iter(model)).items() 

643 if k in model.model_fields_set 

644 or type(model).model_fields[k].default_factory 

645 } 

646 return cast_parameters 

647 else: 

648 validated_fn = ValidatedFunction( 

649 self.fn, config=pydantic.ConfigDict(arbitrary_types_allowed=True) 

650 ) 

651 return validated_fn.validate_call_args(args, kwargs) 

652 

653 except pydantic.ValidationError as exc: 

654 # We capture the pydantic exception and raise our own because the pydantic 

655 # exception is not picklable when using a cythonized pydantic installation 

656 logger.error( 

657 f"Parameter validation failed for flow {self.name!r}: {exc.errors()}" 

658 f"\nParameters: {parameters}" 

659 ) 

660 raise ParameterTypeError.from_validation_error(exc) from None 

661 

662 def serialize_parameters( 1a

663 self, parameters: dict[str, Any | PrefectFuture[Any] | State] 

664 ) -> dict[str, Any]: 

665 """ 

666 Convert parameters to a serializable form. 

667 

668 Uses FastAPI's `jsonable_encoder` to convert to JSON compatible objects without 

669 converting everything directly to a string. This maintains basic types like 

670 integers during API roundtrips. 

671 """ 

672 serialized_parameters: dict[str, Any] = {} 

673 for key, value in parameters.items(): 

674 # do not serialize the bound self object 

675 if self.ismethod and value is getattr(self.fn, "__prefect_self__", None): 

676 continue 

677 if self.isclassmethod and value is getattr( 

678 self.fn, "__prefect_cls__", None 

679 ): 

680 continue 

681 if isinstance(value, (PrefectFuture, State)): 

682 # Don't call jsonable_encoder() on a PrefectFuture or State to 

683 # avoid triggering a __getitem__ call 

684 serialized_parameters[key] = f"<{type(value).__name__}>" 

685 continue 

686 try: 

687 from fastapi.encoders import jsonable_encoder 

688 

689 serialized_parameters[key] = jsonable_encoder(value) 

690 except (TypeError, ValueError): 

691 logger.debug( 

692 f"Parameter {key!r} for flow {self.name!r} is unserializable. " 

693 f"Type {type(value).__name__!r} and will not be stored " 

694 "in the backend." 

695 ) 

696 serialized_parameters[key] = f"<{type(value).__name__}>" 

697 return serialized_parameters 

698 

699 async def ato_deployment( 1a

700 self, 

701 name: str, 

702 interval: Optional[ 

703 Union[ 

704 Iterable[Union[int, float, datetime.timedelta]], 

705 int, 

706 float, 

707 datetime.timedelta, 

708 ] 

709 ] = None, 

710 cron: Optional[Union[Iterable[str], str]] = None, 

711 rrule: Optional[Union[Iterable[str], str]] = None, 

712 paused: Optional[bool] = None, 

713 schedule: Optional[Schedule] = None, 

714 schedules: Optional["FlexibleScheduleList"] = None, 

715 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, 

716 parameters: Optional[dict[str, Any]] = None, 

717 triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, 

718 description: Optional[str] = None, 

719 tags: Optional[list[str]] = None, 

720 version: Optional[str] = None, 

721 version_type: Optional[VersionType] = None, 

722 enforce_parameter_schema: bool = True, 

723 work_pool_name: Optional[str] = None, 

724 work_queue_name: Optional[str] = None, 

725 job_variables: Optional[dict[str, Any]] = None, 

726 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, 

727 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental 

728 ) -> "RunnerDeployment": 

729 """ 

730 Asynchronously creates a runner deployment object for this flow. 

731 

732 Args: 

733 name: The name to give the created deployment. 

734 interval: An interval on which to execute the new deployment. Accepts either a number 

735 or a timedelta object. If a number is given, it will be interpreted as seconds. 

736 cron: A cron schedule of when to execute runs of this deployment. 

737 rrule: An rrule schedule of when to execute runs of this deployment. 

738 paused: Whether or not to set this deployment as paused. 

739 schedule: A schedule object defining when to execute runs of this deployment. 

740 Used to provide additional scheduling options like `timezone` or `parameters`. 

741 schedules: A list of schedule objects defining when to execute runs of this deployment. 

742 Used to define multiple schedules or additional scheduling options such as `timezone`. 

743 concurrency_limit: The maximum number of runs of this deployment that can run at the same time. 

744 parameters: A dictionary of default parameter values to pass to runs of this deployment. 

745 triggers: A list of triggers that will kick off runs of this deployment. 

746 description: A description for the created deployment. Defaults to the flow's 

747 description if not provided. 

748 tags: A list of tags to associate with the created deployment for organizational 

749 purposes. 

750 version: A version for the created deployment. Defaults to the flow's version. 

751 version_type: The type of version to use for the created deployment. The version type 

752 will be inferred if not provided. 

753 enforce_parameter_schema: Whether or not the Prefect API should enforce the 

754 parameter schema for the created deployment. 

755 work_pool_name: The name of the work pool to use for this deployment. 

756 work_queue_name: The name of the work queue to use for this deployment's scheduled runs. 

757 If not provided the default work queue for the work pool will be used. 

758 job_variables: Settings used to override the values specified default base job template 

759 of the chosen work pool. Refer to the base job template of the chosen work pool for 

760 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path 

761 entrypoint, ensure that the module will be importable in the execution environment. 

762 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. 

763 

764 Examples: 

765 Prepare two deployments and serve them: 

766 

767 ```python 

768 from prefect import flow, serve 

769 

770 @flow 

771 def my_flow(name): 

772 print(f"hello {name}") 

773 

774 @flow 

775 def my_other_flow(name): 

776 print(f"goodbye {name}") 

777 

778 if __name__ == "__main__": 

779 hello_deploy = my_flow.to_deployment("hello", tags=["dev"]) 

780 bye_deploy = my_other_flow.to_deployment("goodbye", tags=["dev"]) 

781 serve(hello_deploy, bye_deploy) 

782 ``` 

783 """ 

784 from prefect.deployments.runner import RunnerDeployment 

785 

786 if not name.endswith(".py"): 

787 _raise_on_name_with_banned_characters(name) 

788 

789 if self._storage and self._entrypoint: 

790 return await RunnerDeployment.afrom_storage( 

791 storage=self._storage, 

792 entrypoint=self._entrypoint, 

793 name=name, 

794 flow_name=self.name, 

795 interval=interval, 

796 cron=cron, 

797 rrule=rrule, 

798 paused=paused, 

799 schedule=schedule, 

800 schedules=schedules, 

801 concurrency_limit=concurrency_limit, 

802 tags=tags, 

803 triggers=triggers, 

804 parameters=parameters or {}, 

805 description=description, 

806 version=version, 

807 version_type=version_type, 

808 enforce_parameter_schema=enforce_parameter_schema, 

809 work_pool_name=work_pool_name, 

810 work_queue_name=work_queue_name, 

811 job_variables=job_variables, 

812 _sla=_sla, 

813 ) 

814 else: 

815 return RunnerDeployment.from_flow( 

816 flow=self, 

817 name=name, 

818 interval=interval, 

819 cron=cron, 

820 rrule=rrule, 

821 paused=paused, 

822 schedule=schedule, 

823 schedules=schedules, 

824 concurrency_limit=concurrency_limit, 

825 tags=tags, 

826 triggers=triggers, 

827 parameters=parameters or {}, 

828 description=description, 

829 version=version, 

830 version_type=version_type, 

831 enforce_parameter_schema=enforce_parameter_schema, 

832 work_pool_name=work_pool_name, 

833 work_queue_name=work_queue_name, 

834 job_variables=job_variables, 

835 entrypoint_type=entrypoint_type, 

836 _sla=_sla, 

837 ) 

838 

839 @async_dispatch(ato_deployment) 1a

840 def to_deployment( 1a

841 self, 

842 name: str, 

843 interval: Optional[ 

844 Union[ 

845 Iterable[Union[int, float, datetime.timedelta]], 

846 int, 

847 float, 

848 datetime.timedelta, 

849 ] 

850 ] = None, 

851 cron: Optional[Union[Iterable[str], str]] = None, 

852 rrule: Optional[Union[Iterable[str], str]] = None, 

853 paused: Optional[bool] = None, 

854 schedule: Optional[Schedule] = None, 

855 schedules: Optional["FlexibleScheduleList"] = None, 

856 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, 

857 parameters: Optional[dict[str, Any]] = None, 

858 triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, 

859 description: Optional[str] = None, 

860 tags: Optional[list[str]] = None, 

861 version: Optional[str] = None, 

862 version_type: Optional[VersionType] = None, 

863 enforce_parameter_schema: bool = True, 

864 work_pool_name: Optional[str] = None, 

865 work_queue_name: Optional[str] = None, 

866 job_variables: Optional[dict[str, Any]] = None, 

867 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, 

868 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental 

869 ) -> "RunnerDeployment": 

870 """ 

871 Creates a runner deployment object for this flow. 

872 

873 Args: 

874 name: The name to give the created deployment. 

875 interval: An interval on which to execute the new deployment. Accepts either a number 

876 or a timedelta object. If a number is given, it will be interpreted as seconds. 

877 cron: A cron schedule of when to execute runs of this deployment. 

878 rrule: An rrule schedule of when to execute runs of this deployment. 

879 paused: Whether or not to set this deployment as paused. 

880 schedule: A schedule object defining when to execute runs of this deployment. 

881 Used to provide additional scheduling options like `timezone` or `parameters`. 

882 schedules: A list of schedule objects defining when to execute runs of this deployment. 

883 Used to define multiple schedules or additional scheduling options such as `timezone`. 

884 concurrency_limit: The maximum number of runs of this deployment that can run at the same time. 

885 parameters: A dictionary of default parameter values to pass to runs of this deployment. 

886 triggers: A list of triggers that will kick off runs of this deployment. 

887 description: A description for the created deployment. Defaults to the flow's 

888 description if not provided. 

889 tags: A list of tags to associate with the created deployment for organizational 

890 purposes. 

891 version: A version for the created deployment. Defaults to the flow's version. 

892 version_type: The type of version to use for the created deployment. The version type 

893 will be inferred if not provided. 

894 enforce_parameter_schema: Whether or not the Prefect API should enforce the 

895 parameter schema for the created deployment. 

896 work_pool_name: The name of the work pool to use for this deployment. 

897 work_queue_name: The name of the work queue to use for this deployment's scheduled runs. 

898 If not provided the default work queue for the work pool will be used. 

899 job_variables: Settings used to override the values specified default base job template 

900 of the chosen work pool. Refer to the base job template of the chosen work pool for 

901 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path 

902 entrypoint, ensure that the module will be importable in the execution environment. 

903 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. 

904 

905 Examples: 

906 Prepare two deployments and serve them: 

907 

908 ```python 

909 from prefect import flow, serve 

910 

911 @flow 

912 def my_flow(name): 

913 print(f"hello {name}") 

914 

915 @flow 

916 def my_other_flow(name): 

917 print(f"goodbye {name}") 

918 

919 if __name__ == "__main__": 

920 hello_deploy = my_flow.to_deployment("hello", tags=["dev"]) 

921 bye_deploy = my_other_flow.to_deployment("goodbye", tags=["dev"]) 

922 serve(hello_deploy, bye_deploy) 

923 ``` 

924 """ 

925 from prefect.deployments.runner import RunnerDeployment 

926 

927 if not name.endswith(".py"): 

928 _raise_on_name_with_banned_characters(name) 

929 

930 if self._storage and self._entrypoint: 

931 return cast( 

932 RunnerDeployment, 

933 RunnerDeployment.from_storage( 

934 storage=self._storage, 

935 entrypoint=self._entrypoint, 

936 name=name, 

937 flow_name=self.name, 

938 interval=interval, 

939 cron=cron, 

940 rrule=rrule, 

941 paused=paused, 

942 schedule=schedule, 

943 schedules=schedules, 

944 concurrency_limit=concurrency_limit, 

945 tags=tags, 

946 triggers=triggers, 

947 parameters=parameters or {}, 

948 description=description, 

949 version=version, 

950 version_type=version_type, 

951 enforce_parameter_schema=enforce_parameter_schema, 

952 work_pool_name=work_pool_name, 

953 work_queue_name=work_queue_name, 

954 job_variables=job_variables, 

955 _sla=_sla, 

956 _sync=True, # pyright: ignore[reportCallIssue] _sync is valid because .from_storage is decorated with async_dispatch 

957 ), 

958 ) 

959 else: 

960 return RunnerDeployment.from_flow( 

961 flow=self, 

962 name=name, 

963 interval=interval, 

964 cron=cron, 

965 rrule=rrule, 

966 paused=paused, 

967 schedule=schedule, 

968 schedules=schedules, 

969 concurrency_limit=concurrency_limit, 

970 tags=tags, 

971 triggers=triggers, 

972 parameters=parameters or {}, 

973 description=description, 

974 version=version, 

975 version_type=version_type, 

976 enforce_parameter_schema=enforce_parameter_schema, 

977 work_pool_name=work_pool_name, 

978 work_queue_name=work_queue_name, 

979 job_variables=job_variables, 

980 entrypoint_type=entrypoint_type, 

981 _sla=_sla, 

982 ) 

983 

984 def on_completion(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a

985 self.on_completion_hooks.append(fn) 

986 return fn 

987 

988 def on_cancellation(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a

989 self.on_cancellation_hooks.append(fn) 

990 return fn 

991 

992 def on_crashed(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a

993 self.on_crashed_hooks.append(fn) 

994 return fn 

995 

996 def on_running(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a

997 self.on_running_hooks.append(fn) 

998 return fn 

999 

1000 def on_failure(self, fn: FlowStateHook[P, R]) -> FlowStateHook[P, R]: 1a

1001 self.on_failure_hooks.append(fn) 

1002 return fn 

1003 

1004 def serve( 1a

1005 self, 

1006 name: Optional[str] = None, 

1007 interval: Optional[ 

1008 Union[ 

1009 Iterable[Union[int, float, datetime.timedelta]], 

1010 int, 

1011 float, 

1012 datetime.timedelta, 

1013 ] 

1014 ] = None, 

1015 cron: Optional[Union[Iterable[str], str]] = None, 

1016 rrule: Optional[Union[Iterable[str], str]] = None, 

1017 paused: Optional[bool] = None, 

1018 schedule: Optional[Schedule] = None, 

1019 schedules: Optional["FlexibleScheduleList"] = None, 

1020 global_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, 

1021 triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, 

1022 parameters: Optional[dict[str, Any]] = None, 

1023 description: Optional[str] = None, 

1024 tags: Optional[list[str]] = None, 

1025 version: Optional[str] = None, 

1026 enforce_parameter_schema: bool = True, 

1027 pause_on_shutdown: bool = True, 

1028 print_starting_message: bool = True, 

1029 limit: Optional[int] = None, 

1030 webserver: bool = False, 

1031 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, 

1032 ) -> None: 

1033 """ 

1034 Creates a deployment for this flow and starts a runner to monitor for scheduled work. 

1035 

1036 Args: 

1037 name: The name to give the created deployment. Defaults to the name of the flow. 

1038 interval: An interval on which to execute the deployment. Accepts a number or a 

1039 timedelta object to create a single schedule. If a number is given, it will be 

1040 interpreted as seconds. Also accepts an iterable of numbers or timedelta to create 

1041 multiple schedules. 

1042 cron: A cron schedule string of when to execute runs of this deployment. 

1043 Also accepts an iterable of cron schedule strings to create multiple schedules. 

1044 rrule: An rrule schedule string of when to execute runs of this deployment. 

1045 Also accepts an iterable of rrule schedule strings to create multiple schedules. 

1046 triggers: A list of triggers that will kick off runs of this deployment. 

1047 paused: Whether or not to set this deployment as paused. 

1048 schedule: A schedule object defining when to execute runs of this deployment. 

1049 Used to provide additional scheduling options like `timezone` or `parameters`. 

1050 schedules: A list of schedule objects defining when to execute runs of this deployment. 

1051 Used to define multiple schedules or additional scheduling options like `timezone`. 

1052 global_limit: The maximum number of concurrent runs allowed across all served flow instances associated with the same deployment. 

1053 parameters: A dictionary of default parameter values to pass to runs of this deployment. 

1054 description: A description for the created deployment. Defaults to the flow's 

1055 description if not provided. 

1056 tags: A list of tags to associate with the created deployment for organizational 

1057 purposes. 

1058 version: A version for the created deployment. Defaults to the flow's version. 

1059 enforce_parameter_schema: Whether or not the Prefect API should enforce the 

1060 parameter schema for the created deployment. 

1061 pause_on_shutdown: If True, provided schedule will be paused when the serve function is stopped. 

1062 If False, the schedules will continue running. 

1063 print_starting_message: Whether or not to print the starting message when flow is served. 

1064 limit: The maximum number of runs that can be executed concurrently by the created runner; only applies to this served flow. To apply a limit across multiple served flows, use `global_limit`. 

1065 webserver: Whether or not to start a monitoring webserver for this flow. 

1066 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path 

1067 entrypoint, ensure that the module will be importable in the execution environment. 

1068 

1069 Examples: 

1070 Serve a flow: 

1071 

1072 ```python 

1073 from prefect import flow 

1074 

1075 @flow 

1076 def my_flow(name): 

1077 print(f"hello {name}") 

1078 

1079 if __name__ == "__main__": 

1080 my_flow.serve("example-deployment") 

1081 ``` 

1082 

1083 Serve a flow and run it every hour: 

1084 

1085 ```python 

1086 from prefect import flow 

1087 

1088 @flow 

1089 def my_flow(name): 

1090 print(f"hello {name}") 

1091 

1092 if __name__ == "__main__": 

1093 my_flow.serve("example-deployment", interval=3600) 

1094 ``` 

1095 """ 

1096 from prefect.runner import Runner 

1097 

1098 if not name: 

1099 name = self.name 

1100 else: 

1101 # Only strip extension if it is a file path 

1102 if (p := Path(name)).is_file(): 

1103 name = p.stem 

1104 

1105 runner = Runner(name=name, pause_on_shutdown=pause_on_shutdown, limit=limit) 

1106 deployment_id = runner.add_flow( 

1107 self, 

1108 name=name, 

1109 triggers=triggers, 

1110 interval=interval, 

1111 cron=cron, 

1112 rrule=rrule, 

1113 paused=paused, 

1114 schedule=schedule, 

1115 schedules=schedules, 

1116 concurrency_limit=global_limit, 

1117 parameters=parameters, 

1118 description=description, 

1119 tags=tags, 

1120 version=version, 

1121 enforce_parameter_schema=enforce_parameter_schema, 

1122 entrypoint_type=entrypoint_type, 

1123 ) 

1124 if print_starting_message: 

1125 help_message = ( 

1126 f"[green]Your flow {self.name!r} is being served and polling for" 

1127 " scheduled runs!\n[/]\nTo trigger a run for this flow, use the" 

1128 " following command:\n[blue]\n\t$ prefect deployment run" 

1129 f" '{self.name}/{name}'\n[/]" 

1130 ) 

1131 if PREFECT_UI_URL: 

1132 help_message += ( 

1133 "\nYou can also run your flow via the Prefect UI:" 

1134 f" [blue]{PREFECT_UI_URL.value()}/deployments/deployment/{deployment_id}[/]\n" 

1135 ) 

1136 

1137 console = Console() 

1138 console.print(help_message, soft_wrap=True) 

1139 

1140 try: 

1141 loop = asyncio.get_running_loop() 

1142 except RuntimeError as exc: 

1143 if "no running event loop" in str(exc): 

1144 loop = None 

1145 else: 

1146 raise 

1147 

1148 try: 

1149 if loop is not None: 

1150 loop.run_until_complete(runner.start(webserver=webserver)) 

1151 else: 

1152 asyncio.run(runner.start(webserver=webserver)) 

1153 except (KeyboardInterrupt, TerminationSignal) as exc: 

1154 logger.info(f"Received {type(exc).__name__}, shutting down...") 

1155 if loop is not None: 

1156 loop.stop() 

1157 

1158 @classmethod 1a

1159 async def afrom_source( 1a

1160 cls, 

1161 source: Union[str, Path, "RunnerStorage", ReadableDeploymentStorage], 

1162 entrypoint: str, 

1163 ) -> "Flow[..., Any]": 

1164 """ 

1165 Loads a flow from a remote source asynchronously. 

1166 

1167 Args: 

1168 source: Either a URL to a git repository or a storage object. 

1169 entrypoint: The path to a file containing a flow and the name of the flow function in 

1170 the format `./path/to/file.py:flow_func_name`. 

1171 

1172 Returns: 

1173 A new `Flow` instance. 

1174 

1175 Examples: 

1176 Load a flow from a public git repository: 

1177 

1178 

1179 ```python 

1180 from prefect import flow 

1181 from prefect.runner.storage import GitRepository 

1182 from prefect.blocks.system import Secret 

1183 

1184 my_flow = flow.from_source( 

1185 source="https://github.com/org/repo.git", 

1186 entrypoint="flows.py:my_flow", 

1187 ) 

1188 

1189 my_flow() 

1190 ``` 

1191 

1192 Load a flow from a private git repository using an access token stored in a `Secret` block: 

1193 

1194 ```python 

1195 from prefect import flow 

1196 from prefect.runner.storage import GitRepository 

1197 from prefect.blocks.system import Secret 

1198 

1199 my_flow = flow.from_source( 

1200 source=GitRepository( 

1201 url="https://github.com/org/repo.git", 

1202 credentials={"access_token": Secret.load("github-access-token")} 

1203 ), 

1204 entrypoint="flows.py:my_flow", 

1205 ) 

1206 

1207 my_flow() 

1208 ``` 

1209 

1210 Load a flow from a local directory: 

1211 

1212 ``` python 

1213 # from_local_source.py 

1214 

1215 from pathlib import Path 

1216 from prefect import flow 

1217 

1218 @flow(log_prints=True) 

1219 def my_flow(name: str = "world"): 

1220 print(f"Hello {name}! I'm a flow from a Python script!") 

1221 

1222 if __name__ == "__main__": 

1223 my_flow.from_source( 

1224 source=str(Path(__file__).parent), 

1225 entrypoint="from_local_source.py:my_flow", 

1226 ).deploy( 

1227 name="my-deployment", 

1228 parameters=dict(name="Marvin"), 

1229 work_pool_name="local", 

1230 ) 

1231 ``` 

1232 """ 

1233 

1234 from prefect.runner.storage import ( 

1235 BlockStorageAdapter, 

1236 LocalStorage, 

1237 RunnerStorage, 

1238 create_storage_from_source, 

1239 ) 

1240 

1241 if isinstance(source, (Path, str)): 

1242 if isinstance(source, Path): 

1243 source = str(source) 

1244 storage = create_storage_from_source(source) 

1245 elif isinstance(source, RunnerStorage): 

1246 storage = source 

1247 elif hasattr(source, "get_directory"): 

1248 storage = BlockStorageAdapter(source) 

1249 else: 

1250 raise TypeError( 

1251 f"Unsupported source type {type(source).__name__!r}. Please provide a" 

1252 " URL to remote storage or a storage object." 

1253 ) 

1254 with tempfile.TemporaryDirectory() as tmpdir: 

1255 if not isinstance(storage, LocalStorage): 

1256 storage.set_base_path(Path(tmpdir)) 

1257 await storage.pull_code() 

1258 

1259 full_entrypoint = str(storage.destination / entrypoint) 

1260 flow = cast( 

1261 "Flow[..., Any]", 

1262 await from_async.wait_for_call_in_new_thread( 

1263 create_call(load_flow_from_entrypoint, full_entrypoint) 

1264 ), 

1265 ) 

1266 flow._storage = storage 

1267 flow._entrypoint = entrypoint 

1268 

1269 return flow 

1270 

1271 @classmethod 1a

1272 @async_dispatch(afrom_source) 1a

1273 def from_source( 1a

1274 cls, 

1275 source: Union[str, Path, "RunnerStorage", ReadableDeploymentStorage], 

1276 entrypoint: str, 

1277 ) -> "Flow[..., Any]": 

1278 """ 

1279 Loads a flow from a remote source. 

1280 

1281 Args: 

1282 source: Either a URL to a git repository or a storage object. 

1283 entrypoint: The path to a file containing a flow and the name of the flow function in 

1284 the format `./path/to/file.py:flow_func_name`. 

1285 

1286 Returns: 

1287 A new `Flow` instance. 

1288 

1289 Examples: 

1290 Load a flow from a public git repository: 

1291 

1292 

1293 ```python 

1294 from prefect import flow 

1295 from prefect.runner.storage import GitRepository 

1296 from prefect.blocks.system import Secret 

1297 

1298 my_flow = flow.from_source( 

1299 source="https://github.com/org/repo.git", 

1300 entrypoint="flows.py:my_flow", 

1301 ) 

1302 

1303 my_flow() 

1304 ``` 

1305 

1306 Load a flow from a private git repository using an access token stored in a `Secret` block: 

1307 

1308 ```python 

1309 from prefect import flow 

1310 from prefect.runner.storage import GitRepository 

1311 from prefect.blocks.system import Secret 

1312 

1313 my_flow = flow.from_source( 

1314 source=GitRepository( 

1315 url="https://github.com/org/repo.git", 

1316 credentials={"access_token": Secret.load("github-access-token")} 

1317 ), 

1318 entrypoint="flows.py:my_flow", 

1319 ) 

1320 

1321 my_flow() 

1322 ``` 

1323 

1324 Load a flow from a local directory: 

1325 

1326 ``` python 

1327 # from_local_source.py 

1328 

1329 from pathlib import Path 

1330 from prefect import flow 

1331 

1332 @flow(log_prints=True) 

1333 def my_flow(name: str = "world"): 

1334 print(f"Hello {name}! I'm a flow from a Python script!") 

1335 

1336 if __name__ == "__main__": 

1337 my_flow.from_source( 

1338 source=str(Path(__file__).parent), 

1339 entrypoint="from_local_source.py:my_flow", 

1340 ).deploy( 

1341 name="my-deployment", 

1342 parameters=dict(name="Marvin"), 

1343 work_pool_name="local", 

1344 ) 

1345 ``` 

1346 """ 

1347 

1348 from prefect.runner.storage import ( 

1349 BlockStorageAdapter, 

1350 LocalStorage, 

1351 RunnerStorage, 

1352 create_storage_from_source, 

1353 ) 

1354 

1355 if isinstance(source, (Path, str)): 

1356 if isinstance(source, Path): 

1357 source = str(source) 

1358 storage = create_storage_from_source(source) 

1359 elif isinstance(source, RunnerStorage): 

1360 storage = source 

1361 elif hasattr(source, "get_directory"): 

1362 storage = BlockStorageAdapter(source) 

1363 else: 

1364 raise TypeError( 

1365 f"Unsupported source type {type(source).__name__!r}. Please provide a" 

1366 " URL to remote storage or a storage object." 

1367 ) 

1368 with tempfile.TemporaryDirectory() as tmpdir: 

1369 if not isinstance(storage, LocalStorage): 

1370 storage.set_base_path(Path(tmpdir)) 

1371 run_coro_as_sync(storage.pull_code()) 

1372 

1373 full_entrypoint = str(storage.destination / entrypoint) 

1374 flow = load_flow_from_entrypoint(full_entrypoint) 

1375 flow._storage = storage 

1376 flow._entrypoint = entrypoint 

1377 

1378 return flow 

1379 

1380 @sync_compatible 1a

1381 async def deploy( 1a

1382 self, 

1383 name: str, 

1384 work_pool_name: Optional[str] = None, 

1385 image: Optional[Union[str, "DockerImage"]] = None, 

1386 build: bool = True, 

1387 push: bool = True, 

1388 work_queue_name: Optional[str] = None, 

1389 job_variables: Optional[dict[str, Any]] = None, 

1390 interval: Optional[Union[int, float, datetime.timedelta]] = None, 

1391 cron: Optional[str] = None, 

1392 rrule: Optional[str] = None, 

1393 paused: Optional[bool] = None, 

1394 schedule: Optional[Schedule] = None, 

1395 schedules: Optional[list[Schedule]] = None, 

1396 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, 

1397 triggers: Optional[list[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, 

1398 parameters: Optional[dict[str, Any]] = None, 

1399 description: Optional[str] = None, 

1400 tags: Optional[list[str]] = None, 

1401 version: Optional[str] = None, 

1402 version_type: Optional[VersionType] = None, 

1403 enforce_parameter_schema: bool = True, 

1404 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, 

1405 print_next_steps: bool = True, 

1406 ignore_warnings: bool = False, 

1407 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, 

1408 ) -> UUID: 

1409 """ 

1410 Deploys a flow to run on dynamic infrastructure via a work pool. 

1411 

1412 By default, calling this method will build a Docker image for the flow, push it to a registry, 

1413 and create a deployment via the Prefect API that will run the flow on the given schedule. 

1414 

1415 If you want to use an existing image, you can pass `build=False` to skip building and pushing 

1416 an image. 

1417 

1418 Args: 

1419 name: The name to give the created deployment. 

1420 work_pool_name: The name of the work pool to use for this deployment. Defaults to 

1421 the value of `PREFECT_DEFAULT_WORK_POOL_NAME`. 

1422 image: The name of the Docker image to build, including the registry and 

1423 repository. Pass a DockerImage instance to customize the Dockerfile used 

1424 and build arguments. 

1425 build: Whether or not to build a new image for the flow. If False, the provided 

1426 image will be used as-is and pulled at runtime. 

1427 push: Whether or not to skip pushing the built image to a registry. 

1428 work_queue_name: The name of the work queue to use for this deployment's scheduled runs. 

1429 If not provided the default work queue for the work pool will be used. 

1430 job_variables: Settings used to override the values specified default base job template 

1431 of the chosen work pool. Refer to the base job template of the chosen work pool for 

1432 available settings. 

1433 interval: An interval on which to execute the deployment. Accepts a number or a 

1434 timedelta object to create a single schedule. If a number is given, it will be 

1435 interpreted as seconds. Also accepts an iterable of numbers or timedelta to create 

1436 multiple schedules. 

1437 cron: A cron schedule string of when to execute runs of this deployment. 

1438 Also accepts an iterable of cron schedule strings to create multiple schedules. 

1439 rrule: An rrule schedule string of when to execute runs of this deployment. 

1440 Also accepts an iterable of rrule schedule strings to create multiple schedules. 

1441 triggers: A list of triggers that will kick off runs of this deployment. 

1442 paused: Whether or not to set this deployment as paused. 

1443 schedule: A schedule object defining when to execute runs of this deployment. 

1444 Used to provide additional scheduling options like `timezone` or `parameters`. 

1445 schedules: A list of schedule objects defining when to execute runs of this deployment. 

1446 Used to define multiple schedules or additional scheduling options like `timezone`. 

1447 concurrency_limit: The maximum number of runs that can be executed concurrently. 

1448 parameters: A dictionary of default parameter values to pass to runs of this deployment. 

1449 description: A description for the created deployment. Defaults to the flow's 

1450 description if not provided. 

1451 tags: A list of tags to associate with the created deployment for organizational 

1452 purposes. 

1453 version: A version for the created deployment. Defaults to the flow's version. 

1454 version_type: The type of version to use for the created deployment. The version type 

1455 will be inferred if not provided. 

1456 enforce_parameter_schema: Whether or not the Prefect API should enforce the 

1457 parameter schema for the created deployment. 

1458 entrypoint_type: Type of entrypoint to use for the deployment. When using a module path 

1459 entrypoint, ensure that the module will be importable in the execution environment. 

1460 print_next_steps_message: Whether or not to print a message with next steps 

1461 after deploying the deployments. 

1462 ignore_warnings: Whether or not to ignore warnings about the work pool type. 

1463 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. 

1464 Returns: 

1465 The ID of the created/updated deployment. 

1466 

1467 Examples: 

1468 Deploy a local flow to a work pool: 

1469 

1470 ```python 

1471 from prefect import flow 

1472 

1473 @flow 

1474 def my_flow(name): 

1475 print(f"hello {name}") 

1476 

1477 if __name__ == "__main__": 

1478 my_flow.deploy( 

1479 "example-deployment", 

1480 work_pool_name="my-work-pool", 

1481 image="my-repository/my-image:dev", 

1482 ) 

1483 ``` 

1484 

1485 Deploy a remotely stored flow to a work pool: 

1486 

1487 ```python 

1488 from prefect import flow 

1489 

1490 if __name__ == "__main__": 

1491 flow.from_source( 

1492 source="https://github.com/org/repo.git", 

1493 entrypoint="flows.py:my_flow", 

1494 ).deploy( 

1495 "example-deployment", 

1496 work_pool_name="my-work-pool", 

1497 image="my-repository/my-image:dev", 

1498 ) 

1499 ``` 

1500 """ 

1501 if not ( 

1502 work_pool_name := work_pool_name or PREFECT_DEFAULT_WORK_POOL_NAME.value() 

1503 ): 

1504 raise ValueError( 

1505 "No work pool name provided. Please provide a `work_pool_name` or set the" 

1506 " `PREFECT_DEFAULT_WORK_POOL_NAME` environment variable." 

1507 ) 

1508 

1509 from prefect.client.orchestration import get_client 

1510 

1511 try: 

1512 async with get_client() as client: 

1513 work_pool = await client.read_work_pool(work_pool_name) 

1514 active_workers = await client.read_workers_for_work_pool( 

1515 work_pool_name, 

1516 worker_filter=WorkerFilter( 

1517 status=WorkerFilterStatus(any_=["ONLINE"]) 

1518 ), 

1519 ) 

1520 except ObjectNotFound as exc: 

1521 raise ValueError( 

1522 f"Could not find work pool {work_pool_name!r}. Please create it before" 

1523 " deploying this flow." 

1524 ) from exc 

1525 

1526 to_deployment_coro = self.to_deployment( 

1527 name=name, 

1528 interval=interval, 

1529 cron=cron, 

1530 rrule=rrule, 

1531 schedule=schedule, 

1532 schedules=schedules, 

1533 concurrency_limit=concurrency_limit, 

1534 paused=paused, 

1535 triggers=triggers, 

1536 parameters=parameters, 

1537 description=description, 

1538 tags=tags, 

1539 version=version, 

1540 version_type=version_type, 

1541 enforce_parameter_schema=enforce_parameter_schema, 

1542 work_queue_name=work_queue_name, 

1543 job_variables=job_variables, 

1544 entrypoint_type=entrypoint_type, 

1545 _sla=_sla, 

1546 ) 

1547 

1548 if inspect.isawaitable(to_deployment_coro): 

1549 deployment = await to_deployment_coro 

1550 else: 

1551 deployment = to_deployment_coro 

1552 

1553 from prefect.deployments.runner import deploy 

1554 

1555 deploy_coro = deploy( 

1556 deployment, 

1557 work_pool_name=work_pool_name, 

1558 image=image, 

1559 build=build, 

1560 push=push, 

1561 print_next_steps_message=False, 

1562 ignore_warnings=ignore_warnings, 

1563 ) 

1564 if TYPE_CHECKING: 

1565 assert inspect.isawaitable(deploy_coro) 

1566 

1567 deployment_ids = await deploy_coro 

1568 

1569 if print_next_steps: 

1570 console = Console() 

1571 if ( 

1572 not work_pool.is_push_pool 

1573 and not work_pool.is_managed_pool 

1574 and not active_workers 

1575 ): 

1576 console.print( 

1577 "\nTo execute flow runs from this deployment, start a worker in a" 

1578 " separate terminal that pulls work from the" 

1579 f" {work_pool_name!r} work pool:" 

1580 ) 

1581 console.print( 

1582 f"\n\t$ prefect worker start --pool {work_pool_name!r}", 

1583 style="blue", 

1584 ) 

1585 console.print( 

1586 "\nTo schedule a run for this deployment, use the following command:" 

1587 ) 

1588 console.print( 

1589 f"\n\t$ prefect deployment run '{self.name}/{name}'\n", 

1590 style="blue", 

1591 ) 

1592 if PREFECT_UI_URL: 

1593 message = ( 

1594 "\nYou can also run your flow via the Prefect UI:" 

1595 f" [blue]{PREFECT_UI_URL.value()}/deployments/deployment/{deployment_ids[0]}[/]\n" 

1596 ) 

1597 console.print(message, soft_wrap=True) 

1598 

1599 return deployment_ids[0] 

1600 

1601 @overload 1a

1602 def __call__(self: "Flow[P, NoReturn]", *args: P.args, **kwargs: P.kwargs) -> None: 1a

1603 # `NoReturn` matches if a type can't be inferred for the function which stops a 

1604 # sync function from matching the `Coroutine` overload 

1605 ... 

1606 

1607 @overload 1a

1608 def __call__( 1608 ↛ exitline 1608 didn't return from function '__call__' because 1a

1609 self: "Flow[P, Coroutine[Any, Any, T]]", *args: P.args, **kwargs: P.kwargs 

1610 ) -> Coroutine[Any, Any, T]: ... 

1611 

1612 @overload 1a

1613 def __call__( 1613 ↛ exitline 1613 didn't return from function '__call__' because 1a

1614 self: "Flow[P, T]", 

1615 *args: P.args, 

1616 **kwargs: P.kwargs, 

1617 ) -> T: ... 

1618 

1619 @overload 1a

1620 def __call__( 1620 ↛ exitline 1620 didn't return from function '__call__' because 1a

1621 self: "Flow[P, Coroutine[Any, Any, T]]", 

1622 *args: P.args, 

1623 return_state: Literal[True], 

1624 **kwargs: P.kwargs, 

1625 ) -> Awaitable[State[T]]: ... 

1626 

1627 @overload 1a

1628 def __call__( 1628 ↛ exitline 1628 didn't return from function '__call__' because 1a

1629 self: "Flow[P, T]", 

1630 *args: P.args, 

1631 return_state: Literal[True], 

1632 **kwargs: P.kwargs, 

1633 ) -> State[T]: ... 

1634 

1635 def __call__( 1a

1636 self, 

1637 *args: "P.args", 

1638 return_state: bool = False, 

1639 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, 

1640 **kwargs: "P.kwargs", 

1641 ): 

1642 """ 

1643 Run the flow and return its result. 

1644 

1645 

1646 Flow parameter values must be serializable by Pydantic. 

1647 

1648 If writing an async flow, this call must be awaited. 

1649 

1650 This will create a new flow run in the API. 

1651 

1652 Args: 

1653 *args: Arguments to run the flow with. 

1654 return_state: Return a Prefect State containing the result of the 

1655 flow run. 

1656 wait_for: Upstream task futures to wait for before starting the flow if called as a subflow 

1657 **kwargs: Keyword arguments to run the flow with. 

1658 

1659 Returns: 

1660 If `return_state` is False, returns the result of the flow run. 

1661 If `return_state` is True, returns the result of the flow run 

1662 wrapped in a Prefect State which provides error handling. 

1663 

1664 Examples: 

1665 

1666 Define a flow 

1667 

1668 ```python 

1669 @flow 

1670 def my_flow(name): 

1671 print(f"hello {name}") 

1672 return f"goodbye {name}" 

1673 ``` 

1674 

1675 Run a flow 

1676 

1677 ```python 

1678 my_flow("marvin") 

1679 ``` 

1680 

1681 Run a flow with additional tags 

1682 

1683 ```python 

1684 from prefect import tags 

1685 

1686 with tags("db", "blue"): 

1687 my_flow("foo") 

1688 ``` 

1689 """ 

1690 from prefect.utilities.visualization import ( 

1691 get_task_viz_tracker, 

1692 track_viz_task, 

1693 ) 

1694 

1695 # Convert the call args/kwargs to a parameter dict 

1696 parameters = get_call_parameters(self.fn, args, kwargs) 

1697 

1698 return_type = "state" if return_state else "result" 

1699 

1700 task_viz_tracker = get_task_viz_tracker() 

1701 if task_viz_tracker: 

1702 # this is a subflow, for now return a single task and do not go further 

1703 # we can add support for exploring subflows for tasks in the future. 

1704 return track_viz_task(self.isasync, self.name, parameters) 

1705 

1706 from prefect.flow_engine import run_flow 

1707 

1708 return run_flow( 

1709 flow=self, 

1710 parameters=parameters, 

1711 wait_for=wait_for, 

1712 return_type=return_type, 

1713 ) 

1714 

1715 @sync_compatible 1a

1716 async def visualize(self, *args: "P.args", **kwargs: "P.kwargs"): 1a

1717 """ 

1718 Generates a graphviz object representing the current flow. In IPython notebooks, 

1719 it's rendered inline, otherwise in a new window as a PNG. 

1720 

1721 Raises: 

1722 - ImportError: If `graphviz` isn't installed. 

1723 - GraphvizExecutableNotFoundError: If the `dot` executable isn't found. 

1724 - FlowVisualizationError: If the flow can't be visualized for any other reason. 

1725 """ 

1726 from prefect.utilities.visualization import ( 

1727 FlowVisualizationError, 

1728 GraphvizExecutableNotFoundError, 

1729 GraphvizImportError, 

1730 TaskVizTracker, 

1731 VisualizationUnsupportedError, 

1732 build_task_dependencies, 

1733 visualize_task_dependencies, 

1734 ) 

1735 

1736 if not PREFECT_TESTING_UNIT_TEST_MODE: 

1737 warnings.warn( 

1738 "`flow.visualize()` will execute code inside of your flow that is not" 

1739 " decorated with `@task` or `@flow`." 

1740 ) 

1741 

1742 try: 

1743 with TaskVizTracker() as tracker: 

1744 if self.isasync: 

1745 await self.fn(*args, **kwargs) # type: ignore[reportGeneralTypeIssues] 

1746 else: 

1747 self.fn(*args, **kwargs) 

1748 

1749 graph = build_task_dependencies(tracker) 

1750 

1751 visualize_task_dependencies(graph, self.name) 

1752 

1753 except GraphvizImportError: 

1754 raise 

1755 except GraphvizExecutableNotFoundError: 

1756 raise 

1757 except VisualizationUnsupportedError: 

1758 raise 

1759 except FlowVisualizationError: 

1760 raise 

1761 except Exception as e: 

1762 msg = ( 

1763 "It's possible you are trying to visualize a flow that contains " 

1764 "code that directly interacts with the result of a task" 

1765 " inside of the flow. \nTry passing a `viz_return_value` " 

1766 "to the task decorator, e.g. `@task(viz_return_value=[1, 2, 3]).`" 

1767 ) 

1768 

1769 new_exception = type(e)(str(e) + "\n" + msg) 

1770 # Copy traceback information from the original exception 

1771 new_exception.__traceback__ = e.__traceback__ 

1772 raise new_exception 

1773 

1774 

1775class FlowDecorator: 1a

1776 @overload 1a

1777 def __call__(self, __fn: Callable[P, R]) -> Flow[P, R]: ... 1777 ↛ exitline 1777 didn't return from function '__call__' because 1a

1778 

1779 @overload 1a

1780 def __call__( 1780 ↛ exitline 1780 didn't return from function '__call__' because 1a

1781 self, 

1782 __fn: None = None, 

1783 *, 

1784 name: Optional[str] = None, 

1785 version: Optional[str] = None, 

1786 flow_run_name: Optional[Union[Callable[[], str], str]] = None, 

1787 retries: Optional[int] = None, 

1788 retry_delay_seconds: Optional[Union[int, float]] = None, 

1789 task_runner: None = None, 

1790 description: Optional[str] = None, 

1791 timeout_seconds: Union[int, float, None] = None, 

1792 validate_parameters: bool = True, 

1793 persist_result: Optional[bool] = None, 

1794 result_storage: Optional[ResultStorage] = None, 

1795 result_serializer: Optional[ResultSerializer] = None, 

1796 cache_result_in_memory: bool = True, 

1797 log_prints: Optional[bool] = None, 

1798 on_completion: Optional[list[FlowStateHook[..., Any]]] = None, 

1799 on_failure: Optional[list[FlowStateHook[..., Any]]] = None, 

1800 on_cancellation: Optional[list[FlowStateHook[..., Any]]] = None, 

1801 on_crashed: Optional[list[FlowStateHook[..., Any]]] = None, 

1802 on_running: Optional[list[FlowStateHook[..., Any]]] = None, 

1803 ) -> Callable[[Callable[P, R]], Flow[P, R]]: ... 

1804 

1805 @overload 1a

1806 def __call__( 1806 ↛ exitline 1806 didn't return from function '__call__' because 1a

1807 self, 

1808 __fn: None = None, 

1809 *, 

1810 name: Optional[str] = None, 

1811 version: Optional[str] = None, 

1812 flow_run_name: Optional[Union[Callable[[], str], str]] = None, 

1813 retries: Optional[int] = None, 

1814 retry_delay_seconds: Optional[Union[int, float]] = None, 

1815 task_runner: Optional[TaskRunner[PrefectFuture[Any]]] = None, 

1816 description: Optional[str] = None, 

1817 timeout_seconds: Union[int, float, None] = None, 

1818 validate_parameters: bool = True, 

1819 persist_result: Optional[bool] = None, 

1820 result_storage: Optional[ResultStorage] = None, 

1821 result_serializer: Optional[ResultSerializer] = None, 

1822 cache_result_in_memory: bool = True, 

1823 log_prints: Optional[bool] = None, 

1824 on_completion: Optional[list[FlowStateHook[..., Any]]] = None, 

1825 on_failure: Optional[list[FlowStateHook[..., Any]]] = None, 

1826 on_cancellation: Optional[list[FlowStateHook[..., Any]]] = None, 

1827 on_crashed: Optional[list[FlowStateHook[..., Any]]] = None, 

1828 on_running: Optional[list[FlowStateHook[..., Any]]] = None, 

1829 ) -> Callable[[Callable[P, R]], Flow[P, R]]: ... 

1830 

1831 def __call__( 1a

1832 self, 

1833 __fn: Optional[Callable[P, R]] = None, 

1834 *, 

1835 name: Optional[str] = None, 

1836 version: Optional[str] = None, 

1837 flow_run_name: Optional[Union[Callable[[], str], str]] = None, 

1838 retries: Optional[int] = None, 

1839 retry_delay_seconds: Union[int, float, None] = None, 

1840 task_runner: Optional[TaskRunner[PrefectFuture[Any]]] = None, 

1841 description: Optional[str] = None, 

1842 timeout_seconds: Union[int, float, None] = None, 

1843 validate_parameters: bool = True, 

1844 persist_result: Optional[bool] = None, 

1845 result_storage: Optional[ResultStorage] = None, 

1846 result_serializer: Optional[ResultSerializer] = None, 

1847 cache_result_in_memory: bool = True, 

1848 log_prints: Optional[bool] = None, 

1849 on_completion: Optional[list[FlowStateHook[..., Any]]] = None, 

1850 on_failure: Optional[list[FlowStateHook[..., Any]]] = None, 

1851 on_cancellation: Optional[list[FlowStateHook[..., Any]]] = None, 

1852 on_crashed: Optional[list[FlowStateHook[..., Any]]] = None, 

1853 on_running: Optional[list[FlowStateHook[..., Any]]] = None, 

1854 ) -> Union[Flow[P, R], Callable[[Callable[P, R]], Flow[P, R]]]: 

1855 """ 

1856 Decorator to designate a function as a Prefect workflow. 

1857 

1858 This decorator may be used for asynchronous or synchronous functions. 

1859 

1860 Flow parameters must be serializable by Pydantic. 

1861 

1862 Args: 

1863 name: An optional name for the flow; if not provided, the name will be inferred 

1864 from the given function. 

1865 version: An optional version string for the flow; if not provided, we will 

1866 attempt to create a version string as a hash of the file containing the 

1867 wrapped function; if the file cannot be located, the version will be null. 

1868 flow_run_name: An optional name to distinguish runs of this flow; this name can 

1869 be provided as a string template with the flow's parameters as variables, 

1870 or a function that returns a string. 

1871 retries: An optional number of times to retry on flow run failure. 

1872 retry_delay_seconds: An optional number of seconds to wait before retrying the 

1873 flow after failure. This is only applicable if `retries` is nonzero. 

1874 task_runner: An optional task runner to use for task execution within the flow; if 

1875 not provided, a `ConcurrentTaskRunner` will be instantiated. 

1876 description: An optional string description for the flow; if not provided, the 

1877 description will be pulled from the docstring for the decorated function. 

1878 timeout_seconds: An optional number of seconds indicating a maximum runtime for 

1879 the flow. If the flow exceeds this runtime, it will be marked as failed. 

1880 Flow execution may continue until the next task is called. 

1881 validate_parameters: By default, parameters passed to flows are validated by 

1882 Pydantic. This will check that input values conform to the annotated types 

1883 on the function. Where possible, values will be coerced into the correct 

1884 type; for example, if a parameter is defined as `x: int` and "5" is passed, 

1885 it will be resolved to `5`. If set to `False`, no validation will be 

1886 performed on flow parameters. 

1887 persist_result: An optional toggle indicating whether the result of this flow 

1888 should be persisted to result storage. Defaults to `None`, which indicates 

1889 that Prefect should choose whether the result should be persisted depending on 

1890 the features being used. 

1891 result_storage: An optional block to use to persist the result of this flow. 

1892 This value will be used as the default for any tasks in this flow. 

1893 If not provided, the local file system will be used unless called as 

1894 a subflow, at which point the default will be loaded from the parent flow. 

1895 result_serializer: An optional serializer to use to serialize the result of this 

1896 flow for persistence. This value will be used as the default for any tasks 

1897 in this flow. If not provided, the value of `PREFECT_RESULTS_DEFAULT_SERIALIZER` 

1898 will be used unless called as a subflow, at which point the default will be 

1899 loaded from the parent flow. 

1900 cache_result_in_memory: An optional toggle indicating whether the cached result of 

1901 a running the flow should be stored in memory. Defaults to `True`. 

1902 log_prints: If set, `print` statements in the flow will be redirected to the 

1903 Prefect logger for the flow run. Defaults to `None`, which indicates that 

1904 the value from the parent flow should be used. If this is a parent flow, 

1905 the default is pulled from the `PREFECT_LOGGING_LOG_PRINTS` setting. 

1906 on_completion: An optional list of functions to call when the flow run is 

1907 completed. Each function should accept three arguments: the flow, the flow 

1908 run, and the final state of the flow run. 

1909 on_failure: An optional list of functions to call when the flow run fails. Each 

1910 function should accept three arguments: the flow, the flow run, and the 

1911 final state of the flow run. 

1912 on_cancellation: An optional list of functions to call when the flow run is 

1913 cancelled. These functions will be passed the flow, flow run, and final state. 

1914 on_crashed: An optional list of functions to call when the flow run crashes. Each 

1915 function should accept three arguments: the flow, the flow run, and the 

1916 final state of the flow run. 

1917 on_running: An optional list of functions to call when the flow run is started. Each 

1918 function should accept three arguments: the flow, the flow run, and the current state 

1919 

1920 Returns: 

1921 A callable `Flow` object which, when called, will run the flow and return its 

1922 final state. 

1923 

1924 Examples: 

1925 Define a simple flow 

1926 

1927 ```python 

1928 from prefect import flow 

1929 

1930 @flow 

1931 def add(x, y): 

1932 return x + y 

1933 ``` 

1934 

1935 Define an async flow 

1936 

1937 ```python 

1938 @flow 

1939 async def add(x, y): 

1940 return x + y 

1941 ``` 

1942 

1943 Define a flow with a version and description 

1944 

1945 ```python 

1946 @flow(version="first-flow", description="This flow is empty!") 

1947 def my_flow(): 

1948 pass 

1949 ``` 

1950 

1951 Define a flow with a custom name 

1952 

1953 ```python 

1954 @flow(name="The Ultimate Flow") 

1955 def my_flow(): 

1956 pass 

1957 ``` 

1958 

1959 Define a flow that submits its tasks to dask 

1960 

1961 ```python 

1962 from prefect_dask.task_runners import DaskTaskRunner 

1963 

1964 @flow(task_runner=DaskTaskRunner) 

1965 def my_flow(): 

1966 pass 

1967 ``` 

1968 """ 

1969 if __fn: 1969 ↛ 1993line 1969 didn't jump to line 1993 because the condition on line 1969 was always true1a

1970 return Flow( 1a

1971 fn=__fn, 

1972 name=name, 

1973 version=version, 

1974 flow_run_name=flow_run_name, 

1975 task_runner=task_runner, 

1976 description=description, 

1977 timeout_seconds=timeout_seconds, 

1978 validate_parameters=validate_parameters, 

1979 retries=retries, 

1980 retry_delay_seconds=retry_delay_seconds, 

1981 persist_result=persist_result, 

1982 result_storage=result_storage, 

1983 result_serializer=result_serializer, 

1984 cache_result_in_memory=cache_result_in_memory, 

1985 log_prints=log_prints, 

1986 on_completion=on_completion, 

1987 on_failure=on_failure, 

1988 on_cancellation=on_cancellation, 

1989 on_crashed=on_crashed, 

1990 on_running=on_running, 

1991 ) 

1992 else: 

1993 return cast( 

1994 Callable[[Callable[P, R]], Flow[P, R]], 

1995 partial( 

1996 flow, 

1997 name=name, 

1998 version=version, 

1999 flow_run_name=flow_run_name, 

2000 task_runner=task_runner, 

2001 description=description, 

2002 timeout_seconds=timeout_seconds, 

2003 validate_parameters=validate_parameters, 

2004 retries=retries, 

2005 retry_delay_seconds=retry_delay_seconds, 

2006 persist_result=persist_result, 

2007 result_storage=result_storage, 

2008 result_serializer=result_serializer, 

2009 cache_result_in_memory=cache_result_in_memory, 

2010 log_prints=log_prints, 

2011 on_completion=on_completion, 

2012 on_failure=on_failure, 

2013 on_cancellation=on_cancellation, 

2014 on_crashed=on_crashed, 

2015 on_running=on_running, 

2016 ), 

2017 ) 

2018 

2019 if not TYPE_CHECKING: 2019 ↛ 2025line 2019 didn't jump to line 2025 because the condition on line 2019 was always true1a

2020 # Add from_source so it is available on the flow function we all know and love 

2021 from_source = staticmethod(Flow.from_source) 1a

2022 else: 

2023 # Mypy loses the plot somewhere along the line, so the annotation is reconstructed 

2024 # manually here. 

2025 @staticmethod 

2026 def from_source( 

2027 source: Union[str, Path, "RunnerStorage", ReadableDeploymentStorage], 

2028 entrypoint: str, 

2029 ) -> Union["Flow[..., Any]", Coroutine[Any, Any, "Flow[..., Any]"]]: ... 

2030 

2031 

2032flow: FlowDecorator = FlowDecorator() 1a

2033 

2034 

2035class InfrastructureBoundFlow(Flow[P, R]): 1a

2036 """ 

2037 EXPERIMENTAL: This class is experimental and may be removed or changed in future 

2038 releases. 

2039 

2040 A flow that is bound to running on a specific infrastructure. 

2041 

2042 Attributes: 

2043 work_pool: The name of the work pool to run the flow on. The base job 

2044 configuration of the work pool will determine the configuration of the 

2045 infrastructure the flow will run on. 

2046 job_variables: Infrastructure configuration that will override the base job 

2047 configuration of the work pool. 

2048 worker_cls: The class of the worker to use to spin up infrastructure and submit 

2049 the flow to it. 

2050 """ 

2051 

2052 def __init__( 1a

2053 self, 

2054 *args: Any, 

2055 work_pool: str, 

2056 job_variables: dict[str, Any], 

2057 worker_cls: type["BaseWorker[Any, Any, Any]"], 

2058 **kwargs: Any, 

2059 ): 

2060 super().__init__(*args, **kwargs) 

2061 self.work_pool = work_pool 

2062 self.job_variables = job_variables 

2063 self.worker_cls = worker_cls 

2064 

2065 @overload 1a

2066 def __call__(self: "Flow[P, NoReturn]", *args: P.args, **kwargs: P.kwargs) -> None: 1a

2067 # `NoReturn` matches if a type can't be inferred for the function which stops a 

2068 # sync function from matching the `Coroutine` overload 

2069 ... 

2070 

2071 @overload 1a

2072 def __call__( 2072 ↛ exitline 2072 didn't return from function '__call__' because 1a

2073 self: "Flow[P, Coroutine[Any, Any, T]]", 

2074 *args: P.args, 

2075 **kwargs: P.kwargs, 

2076 ) -> Coroutine[Any, Any, T]: ... 

2077 

2078 @overload 1a

2079 def __call__( 2079 ↛ exitline 2079 didn't return from function '__call__' because 1a

2080 self: "Flow[P, T]", 

2081 *args: P.args, 

2082 **kwargs: P.kwargs, 

2083 ) -> T: ... 

2084 

2085 @overload 1a

2086 def __call__( 2086 ↛ exitline 2086 didn't return from function '__call__' because 1a

2087 self: "Flow[P, Coroutine[Any, Any, T]]", 

2088 *args: P.args, 

2089 return_state: Literal[True], 

2090 **kwargs: P.kwargs, 

2091 ) -> Awaitable[State[T]]: ... 

2092 

2093 @overload 1a

2094 def __call__( 2094 ↛ exitline 2094 didn't return from function '__call__' because 1a

2095 self: "Flow[P, T]", 

2096 *args: P.args, 

2097 return_state: Literal[True], 

2098 **kwargs: P.kwargs, 

2099 ) -> State[T]: ... 

2100 

2101 def __call__( 1a

2102 self, 

2103 *args: "P.args", 

2104 return_state: bool = False, 

2105 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, 

2106 **kwargs: "P.kwargs", 

2107 ): 

2108 async def modified_call( 

2109 *args: P.args, 

2110 return_state: bool = False, 

2111 # TODO: Handle wait_for once we have an asynchronous way to wait for futures 

2112 # We should wait locally for futures to resolve before spinning up 

2113 # infrastructure. 

2114 wait_for: Optional[Iterable[PrefectFuture[Any]]] = None, 

2115 **kwargs: P.kwargs, 

2116 ) -> R | State[R]: 

2117 try: 

2118 async with self.worker_cls(work_pool_name=self.work_pool) as worker: 

2119 parameters = get_call_parameters(self, args, kwargs) 

2120 future = await worker.submit( 

2121 flow=self, 

2122 parameters=parameters, 

2123 job_variables=self.job_variables, 

2124 ) 

2125 if return_state: 

2126 await future.wait_async() 

2127 return future.state 

2128 return await future.aresult() 

2129 except (ExceptionGroup, BaseExceptionGroup) as exc: 

2130 # For less verbose tracebacks 

2131 exceptions = exc.exceptions 

2132 if len(exceptions) == 1: 

2133 raise exceptions[0] from None 

2134 else: 

2135 raise 

2136 

2137 if inspect.iscoroutinefunction(self.fn): 

2138 return modified_call( 

2139 *args, return_state=return_state, wait_for=wait_for, **kwargs 

2140 ) 

2141 else: 

2142 return run_coro_as_sync( 

2143 modified_call( 

2144 *args, 

2145 return_state=return_state, 

2146 wait_for=wait_for, 

2147 **kwargs, 

2148 ) 

2149 ) 

2150 

2151 def submit(self, *args: P.args, **kwargs: P.kwargs) -> PrefectFlowRunFuture[R]: 1a

2152 """ 

2153 EXPERIMENTAL: This method is experimental and may be removed or changed in future 

2154 releases. 

2155 

2156 Submit the flow to run on remote infrastructure. 

2157 

2158 This method will spin up a local worker to submit the flow to remote infrastructure. To 

2159 submit the flow to remote infrastructure without spinning up a local worker, use 

2160 `submit_to_work_pool` instead. 

2161 

2162 Args: 

2163 *args: Positional arguments to pass to the flow. 

2164 **kwargs: Keyword arguments to pass to the flow. 

2165 

2166 Returns: 

2167 A `PrefectFlowRunFuture` that can be used to retrieve the result of the flow run. 

2168 

2169 Examples: 

2170 Submit a flow to run on Kubernetes: 

2171 

2172 ```python 

2173 from prefect import flow 

2174 from prefect_kubernetes.experimental import kubernetes 

2175 

2176 @kubernetes(work_pool="my-kubernetes-work-pool") 

2177 @flow 

2178 def my_flow(x: int, y: int): 

2179 return x + y 

2180 

2181 future = my_flow.submit(x=1, y=2) 

2182 result = future.result() 

2183 print(result) 

2184 ``` 

2185 """ 

2186 

2187 async def submit_func(): 

2188 async with self.worker_cls(work_pool_name=self.work_pool) as worker: 

2189 parameters = get_call_parameters(self, args, kwargs) 

2190 return await worker.submit( 

2191 flow=self, 

2192 parameters=parameters, 

2193 job_variables=self.job_variables, 

2194 ) 

2195 

2196 return run_coro_as_sync(submit_func()) 

2197 

2198 def submit_to_work_pool( 1a

2199 self, *args: P.args, **kwargs: P.kwargs 

2200 ) -> PrefectFlowRunFuture[R]: 

2201 """ 

2202 EXPERIMENTAL: This method is experimental and may be removed or changed in future 

2203 releases. 

2204 

2205 Submits the flow to run on remote infrastructure. 

2206 

2207 This method will create a flow run for an existing worker to submit to remote infrastructure. 

2208 If you don't have a worker available, use `submit` instead. 

2209 

2210 Args: 

2211 *args: Positional arguments to pass to the flow. 

2212 **kwargs: Keyword arguments to pass to the flow. 

2213 

2214 Returns: 

2215 A `PrefectFlowRunFuture` that can be used to retrieve the result of the flow run. 

2216 

2217 Examples: 

2218 Dispatch a flow to run on Kubernetes: 

2219 

2220 ```python 

2221 from prefect import flow 

2222 from prefect_kubernetes.experimental import kubernetes 

2223 

2224 @kubernetes(work_pool="my-kubernetes-work-pool") 

2225 @flow 

2226 def my_flow(x: int, y: int): 

2227 return x + y 

2228 

2229 future = my_flow.submit_to_work_pool(x=1, y=2) 

2230 result = future.result() 

2231 print(result) 

2232 ``` 

2233 """ 

2234 warnings.warn( 

2235 "Dispatching flows to remote infrastructure is experimental. The interface " 

2236 "and behavior of this method are subject to change.", 

2237 category=FutureWarning, 

2238 ) 

2239 from prefect import get_client 

2240 from prefect._experimental.bundles import ( 

2241 convert_step_to_command, 

2242 create_bundle_for_flow_run, 

2243 upload_bundle_to_storage, 

2244 ) 

2245 from prefect.context import FlowRunContext, TagsContext 

2246 from prefect.results import get_result_store, resolve_result_storage 

2247 from prefect.states import Pending, Scheduled 

2248 from prefect.tasks import Task 

2249 

2250 # Get parameters to error early if they are invalid 

2251 parameters = get_call_parameters(self, args, kwargs) 

2252 

2253 with get_client(sync_client=True) as client: 

2254 work_pool = client.read_work_pool(self.work_pool) 

2255 

2256 if ( 

2257 work_pool.storage_configuration.bundle_upload_step is None 

2258 or work_pool.storage_configuration.bundle_execution_step is None 

2259 ): 

2260 raise RuntimeError( 

2261 f"Storage is not configured for work pool {work_pool.name!r}. " 

2262 "Please configure storage for the work pool by running `prefect " 

2263 "work-pool storage configure`." 

2264 ) 

2265 

2266 current_result_store = get_result_store() 

2267 # Check result storage and use the work pool default if needed 

2268 if ( 

2269 current_result_store.result_storage is None 

2270 or isinstance(current_result_store.result_storage, LocalFileSystem) 

2271 and self.result_storage is None 

2272 ): 

2273 if ( 

2274 work_pool.storage_configuration.default_result_storage_block_id 

2275 is None 

2276 ): 

2277 logger.warning( 

2278 f"Flow {self.name!r} has no result storage configured. Please configure " 

2279 "result storage for the flow if you want to retrieve the result for the flow run." 

2280 ) 

2281 else: 

2282 # Use the work pool's default result storage block for the flow run to ensure the caller can retrieve the result 

2283 flow = self.with_options( 

2284 result_storage=resolve_result_storage( 

2285 work_pool.storage_configuration.default_result_storage_block_id, 

2286 _sync=True, 

2287 ), 

2288 persist_result=True, 

2289 ) 

2290 else: 

2291 flow = self 

2292 

2293 bundle_key = str(uuid.uuid4()) 

2294 upload_command = convert_step_to_command( 

2295 work_pool.storage_configuration.bundle_upload_step, 

2296 bundle_key, 

2297 quiet=True, 

2298 ) 

2299 execute_command = convert_step_to_command( 

2300 work_pool.storage_configuration.bundle_execution_step, bundle_key 

2301 ) 

2302 

2303 job_variables = (self.job_variables or {}) | { 

2304 "command": " ".join(execute_command) 

2305 } 

2306 

2307 # Create a parent task run if this is a child flow run to ensure it shows up as a child flow in the UI 

2308 parent_task_run = None 

2309 if flow_run_ctx := FlowRunContext.get(): 

2310 parent_task = Task[Any, Any]( 

2311 name=flow.name, 

2312 fn=flow.fn, 

2313 version=flow.version, 

2314 ) 

2315 parent_task_run = run_coro_as_sync( 

2316 parent_task.create_run( 

2317 flow_run_context=flow_run_ctx, 

2318 parameters=parameters, 

2319 ) 

2320 ) 

2321 

2322 flow_run = client.create_flow_run( 

2323 flow, 

2324 parameters=flow.serialize_parameters(parameters), 

2325 # Start out in pending to prevent a worker from starting the flow run before the bundle is uploaded 

2326 state=Pending(), 

2327 job_variables=job_variables, 

2328 work_pool_name=work_pool.name, 

2329 tags=TagsContext.get().current_tags, 

2330 parent_task_run_id=getattr(parent_task_run, "id", None), 

2331 ) 

2332 

2333 bundle = create_bundle_for_flow_run(flow=flow, flow_run=flow_run) 

2334 upload_bundle_to_storage(bundle, bundle_key, upload_command) 

2335 

2336 # Set flow run to scheduled now that the bundle is uploaded and ready to be executed 

2337 client.set_flow_run_state(flow_run.id, state=Scheduled()) 

2338 

2339 # TODO: It'd be nice to be able to return the future sooner 

2340 return PrefectFlowRunFuture(flow_run_id=flow_run.id) 

2341 

2342 def with_options( 1a

2343 self, 

2344 *, 

2345 name: Optional[str] = None, 

2346 version: Optional[str] = None, 

2347 retries: Optional[int] = None, 

2348 retry_delay_seconds: Optional[Union[int, float]] = None, 

2349 description: Optional[str] = None, 

2350 flow_run_name: Optional[Union[Callable[[], str], str]] = None, 

2351 task_runner: Union[ 

2352 Type[TaskRunner[PrefectFuture[Any]]], TaskRunner[PrefectFuture[Any]], None 

2353 ] = None, 

2354 timeout_seconds: Union[int, float, None] = None, 

2355 validate_parameters: Optional[bool] = None, 

2356 persist_result: Optional[bool] = NotSet, # type: ignore 

2357 result_storage: Optional[ResultStorage] = NotSet, # type: ignore 

2358 result_serializer: Optional[ResultSerializer] = NotSet, # type: ignore 

2359 cache_result_in_memory: Optional[bool] = None, 

2360 log_prints: Optional[bool] = NotSet, # type: ignore 

2361 on_completion: Optional[list[FlowStateHook[P, R]]] = None, 

2362 on_failure: Optional[list[FlowStateHook[P, R]]] = None, 

2363 on_cancellation: Optional[list[FlowStateHook[P, R]]] = None, 

2364 on_crashed: Optional[list[FlowStateHook[P, R]]] = None, 

2365 on_running: Optional[list[FlowStateHook[P, R]]] = None, 

2366 job_variables: Optional[dict[str, Any]] = None, 

2367 ) -> "InfrastructureBoundFlow[P, R]": 

2368 new_flow = super().with_options( 

2369 name=name, 

2370 version=version, 

2371 retries=retries, 

2372 retry_delay_seconds=retry_delay_seconds, 

2373 description=description, 

2374 flow_run_name=flow_run_name, 

2375 task_runner=task_runner, 

2376 timeout_seconds=timeout_seconds, 

2377 validate_parameters=validate_parameters, 

2378 persist_result=persist_result, 

2379 result_storage=result_storage, 

2380 result_serializer=result_serializer, 

2381 cache_result_in_memory=cache_result_in_memory, 

2382 log_prints=log_prints, 

2383 on_completion=on_completion, 

2384 on_failure=on_failure, 

2385 on_cancellation=on_cancellation, 

2386 on_crashed=on_crashed, 

2387 on_running=on_running, 

2388 ) 

2389 new_infrastructure_bound_flow = bind_flow_to_infrastructure( 

2390 new_flow, 

2391 self.work_pool, 

2392 self.worker_cls, 

2393 job_variables=job_variables 

2394 if job_variables is not None 

2395 else self.job_variables, 

2396 ) 

2397 return new_infrastructure_bound_flow 

2398 

2399 

2400def bind_flow_to_infrastructure( 1a

2401 flow: Flow[P, R], 

2402 work_pool: str, 

2403 worker_cls: type["BaseWorker[Any, Any, Any]"], 

2404 job_variables: dict[str, Any] | None = None, 

2405) -> InfrastructureBoundFlow[P, R]: 

2406 new = InfrastructureBoundFlow[P, R]( 

2407 flow.fn, 

2408 work_pool=work_pool, 

2409 job_variables=job_variables or {}, 

2410 worker_cls=worker_cls, 

2411 ) 

2412 # Copy all attributes from the original flow 

2413 for attr, value in flow.__dict__.items(): 

2414 setattr(new, attr, value) 

2415 return new 

2416 

2417 

2418def _raise_on_name_with_banned_characters(name: Optional[str]) -> Optional[str]: 1a

2419 """ 

2420 Raise an InvalidNameError if the given name contains any invalid 

2421 characters. 

2422 """ 

2423 if name is None: 2423 ↛ 2424line 2423 didn't jump to line 2424 because the condition on line 2423 was never true1a

2424 return name 

2425 

2426 if not re.match(WITHOUT_BANNED_CHARACTERS, name): 2426 ↛ 2427line 2426 didn't jump to line 2427 because the condition on line 2426 was never true1a

2427 raise InvalidNameError( 

2428 f"Name {name!r} contains an invalid character. " 

2429 f"Must not contain any of: {BANNED_CHARACTERS}." 

2430 ) 

2431 

2432 return name 1a

2433 

2434 

2435def select_flow( 1a

2436 flows: Iterable[Flow[P, R]], 

2437 flow_name: Optional[str] = None, 

2438 from_message: Optional[str] = None, 

2439) -> Flow[P, R]: 

2440 """ 

2441 Select the only flow in an iterable or a flow specified by name. 

2442 

2443 Returns 

2444 A single flow object 

2445 

2446 Raises: 

2447 MissingFlowError: If no flows exist in the iterable 

2448 MissingFlowError: If a flow name is provided and that flow does not exist 

2449 UnspecifiedFlowError: If multiple flows exist but no flow name was provided 

2450 """ 

2451 # Convert to flows by name 

2452 flows_dict = {f.name: f for f in flows} 

2453 

2454 # Add a leading space if given, otherwise use an empty string 

2455 from_message = (" " + from_message) if from_message else "" 

2456 if not Optional: 

2457 raise MissingFlowError(f"No flows found{from_message}.") 

2458 

2459 elif flow_name and flow_name not in flows_dict: 

2460 raise MissingFlowError( 

2461 f"Flow {flow_name!r} not found{from_message}. " 

2462 f"Found the following flows: {listrepr(flows_dict.keys())}. " 

2463 "Check to make sure that your flow function is decorated with `@flow`." 

2464 ) 

2465 

2466 elif not flow_name and len(flows_dict) > 1: 

2467 raise UnspecifiedFlowError( 

2468 ( 

2469 f"Found {len(flows_dict)} flows{from_message}:" 

2470 f" {listrepr(sorted(flows_dict.keys()))}. Specify a flow name to select a" 

2471 " flow." 

2472 ), 

2473 ) 

2474 

2475 if flow_name: 

2476 return flows_dict[flow_name] 

2477 else: 

2478 return list(flows_dict.values())[0] 

2479 

2480 

2481def load_flow_from_entrypoint( 1a

2482 entrypoint: str, 

2483 use_placeholder_flow: bool = True, 

2484) -> Flow[P, Any]: 

2485 """ 

2486 Extract a flow object from a script at an entrypoint by running all of the code in the file. 

2487 

2488 Args: 

2489 entrypoint: a string in the format `<path_to_script>:<flow_func_name>` 

2490 or a string in the format `<path_to_script>:<class_name>.<flow_method_name>` 

2491 or a module path to a flow function 

2492 use_placeholder_flow: if True, use a placeholder Flow object if the actual flow object 

2493 cannot be loaded from the entrypoint (e.g. dependencies are missing) 

2494 

2495 Returns: 

2496 The flow object from the script 

2497 

2498 Raises: 

2499 ScriptError: If an exception is encountered while running the script 

2500 MissingFlowError: If the flow function specified in the entrypoint does not exist 

2501 """ 

2502 

2503 if ":" in entrypoint: 

2504 # split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff 

2505 path, func_name = entrypoint.rsplit(":", maxsplit=1) 

2506 else: 

2507 path, func_name = entrypoint.rsplit(".", maxsplit=1) 

2508 try: 

2509 flow: Flow[P, Any] = import_object(entrypoint) # pyright: ignore[reportRedeclaration] 

2510 except AttributeError as exc: 

2511 raise MissingFlowError( 

2512 f"Flow function with name {func_name!r} not found in {path!r}. " 

2513 ) from exc 

2514 except ScriptError: 

2515 # If the flow has dependencies that are not installed in the current 

2516 # environment, fallback to loading the flow via AST parsing. 

2517 if use_placeholder_flow: 

2518 flow: Optional[Flow[P, Any]] = safe_load_flow_from_entrypoint(entrypoint) 

2519 if flow is None: 

2520 raise 

2521 else: 

2522 raise 

2523 

2524 if not isinstance(flow, Flow): # pyright: ignore[reportUnnecessaryIsInstance] 

2525 raise MissingFlowError( 

2526 f"Function with name {func_name!r} is not a flow. Make sure that it is " 

2527 "decorated with '@flow'." 

2528 ) 

2529 

2530 return flow 

2531 

2532 

2533def load_function_and_convert_to_flow(entrypoint: str) -> Flow[P, Any]: 1a

2534 """ 

2535 Loads a function from an entrypoint and converts it to a flow if it is not already a flow. 

2536 """ 

2537 

2538 if ":" in entrypoint: 

2539 # split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff 

2540 path, func_name = entrypoint.rsplit(":", maxsplit=1) 

2541 else: 

2542 path, func_name = entrypoint.rsplit(".", maxsplit=1) 

2543 try: 

2544 func = import_object(entrypoint) # pyright: ignore[reportRedeclaration] 

2545 except AttributeError as exc: 

2546 raise RuntimeError( 

2547 f"Function with name {func_name!r} not found in {path!r}." 

2548 ) from exc 

2549 

2550 if isinstance(func, Flow): 

2551 return func 

2552 else: 

2553 return Flow(func, log_prints=True) 

2554 

2555 

2556def serve( 1a

2557 *args: "RunnerDeployment", 

2558 pause_on_shutdown: bool = True, 

2559 print_starting_message: bool = True, 

2560 limit: Optional[int] = None, 

2561 **kwargs: Any, 

2562) -> None: 

2563 """ 

2564 Serve the provided list of deployments. 

2565 

2566 Args: 

2567 *args: A list of deployments to serve. 

2568 pause_on_shutdown: A boolean for whether or not to automatically pause 

2569 deployment schedules on shutdown. 

2570 print_starting_message: Whether or not to print message to the console 

2571 on startup. 

2572 limit: The maximum number of runs that can be executed concurrently. 

2573 **kwargs: Additional keyword arguments to pass to the runner. 

2574 

2575 Examples: 

2576 Prepare two deployments and serve them: 

2577 

2578 ```python 

2579 import datetime 

2580 

2581 from prefect import flow, serve 

2582 

2583 @flow 

2584 def my_flow(name): 

2585 print(f"hello {name}") 

2586 

2587 @flow 

2588 def my_other_flow(name): 

2589 print(f"goodbye {name}") 

2590 

2591 if __name__ == "__main__": 

2592 # Run once a day 

2593 hello_deploy = my_flow.to_deployment( 

2594 "hello", tags=["dev"], interval=datetime.timedelta(days=1) 

2595 ) 

2596 

2597 # Run every Sunday at 4:00 AM 

2598 bye_deploy = my_other_flow.to_deployment( 

2599 "goodbye", tags=["dev"], cron="0 4 * * sun" 

2600 ) 

2601 

2602 serve(hello_deploy, bye_deploy) 

2603 ``` 

2604 """ 

2605 

2606 from prefect.runner import Runner 

2607 

2608 if is_in_async_context(): 

2609 raise RuntimeError( 

2610 "Cannot call `serve` in an asynchronous context. Use `aserve` instead." 

2611 ) 

2612 

2613 runner = Runner(pause_on_shutdown=pause_on_shutdown, limit=limit, **kwargs) 

2614 for deployment in args: 

2615 if deployment.work_pool_name: 

2616 warnings.warn( 

2617 "Work pools are not necessary for served deployments - " 

2618 "the `work_pool_name` argument will be ignored. Omit the " 

2619 f"`work_pool_name` argument from `to_deployment` for {deployment.name!r}.", 

2620 UserWarning, 

2621 ) 

2622 deployment.work_pool_name = None 

2623 runner.add_deployment(deployment) 

2624 

2625 if print_starting_message: 

2626 _display_serve_start_message(*args) 

2627 

2628 try: 

2629 asyncio.run(runner.start()) 

2630 except (KeyboardInterrupt, TerminationSignal) as exc: 

2631 logger.info(f"Received {type(exc).__name__}, shutting down...") 

2632 

2633 

2634async def aserve( 1a

2635 *args: "RunnerDeployment", 

2636 pause_on_shutdown: bool = True, 

2637 print_starting_message: bool = True, 

2638 limit: Optional[int] = None, 

2639 **kwargs: Any, 

2640) -> None: 

2641 """ 

2642 Asynchronously serve the provided list of deployments. 

2643 

2644 Use `serve` instead if calling from a synchronous context. 

2645 

2646 Args: 

2647 *args: A list of deployments to serve. 

2648 pause_on_shutdown: A boolean for whether or not to automatically pause 

2649 deployment schedules on shutdown. 

2650 print_starting_message: Whether or not to print message to the console 

2651 on startup. 

2652 limit: The maximum number of runs that can be executed concurrently. 

2653 **kwargs: Additional keyword arguments to pass to the runner. 

2654 

2655 Examples: 

2656 Prepare deployment and asynchronous initialization function and serve them: 

2657 

2658 ```python 

2659 import asyncio 

2660 import datetime 

2661 

2662 from prefect import flow, aserve, get_client 

2663 

2664 

2665 async def init(): 

2666 await set_concurrency_limit() 

2667 

2668 

2669 async def set_concurrency_limit(): 

2670 async with get_client() as client: 

2671 await client.create_concurrency_limit(tag='dev', concurrency_limit=3) 

2672 

2673 

2674 @flow 

2675 async def my_flow(name): 

2676 print(f"hello {name}") 

2677 

2678 

2679 async def main(): 

2680 # Initialization function 

2681 await init() 

2682 

2683 # Run once a day 

2684 hello_deploy = await my_flow.to_deployment( 

2685 "hello", tags=["dev"], interval=datetime.timedelta(days=1) 

2686 ) 

2687 

2688 await aserve(hello_deploy) 

2689 

2690 

2691 if __name__ == "__main__": 

2692 asyncio.run(main()) 

2693 """ 

2694 

2695 from prefect.runner import Runner 

2696 

2697 runner = Runner(pause_on_shutdown=pause_on_shutdown, limit=limit, **kwargs) 

2698 for deployment in args: 

2699 add_deployment_coro = runner.add_deployment(deployment) 

2700 if TYPE_CHECKING: 

2701 assert inspect.isawaitable(add_deployment_coro) 

2702 

2703 await add_deployment_coro 

2704 

2705 if print_starting_message: 

2706 _display_serve_start_message(*args) 

2707 

2708 await runner.start() 

2709 

2710 

2711def _display_serve_start_message(*args: "RunnerDeployment"): 1a

2712 from rich.console import Console, Group 

2713 from rich.table import Table 

2714 

2715 help_message_top = ( 

2716 "[green]Your deployments are being served and polling for scheduled runs!\n[/]" 

2717 ) 

2718 

2719 table = Table(title="Deployments", show_header=False) 

2720 

2721 table.add_column(style="blue", no_wrap=True) 

2722 

2723 for deployment in args: 

2724 table.add_row(f"{deployment.flow_name}/{deployment.name}") 

2725 

2726 help_message_bottom = ( 

2727 "\nTo trigger any of these deployments, use the" 

2728 " following command:\n[blue]\n\t$ prefect deployment run" 

2729 " [DEPLOYMENT_NAME]\n[/]" 

2730 ) 

2731 if PREFECT_UI_URL: 

2732 help_message_bottom += ( 

2733 "\nYou can also trigger your deployments via the Prefect UI:" 

2734 f" [blue]{PREFECT_UI_URL.value()}/deployments[/]\n" 

2735 ) 

2736 

2737 console = Console() 

2738 console.print(Group(help_message_top, table, help_message_bottom), soft_wrap=True) 

2739 

2740 

2741@client_injector 1a

2742async def load_flow_from_flow_run( 1a

2743 client: "PrefectClient", 

2744 flow_run: "FlowRun", 

2745 ignore_storage: bool = False, 

2746 storage_base_path: Optional[str] = None, 

2747 use_placeholder_flow: bool = True, 

2748) -> Flow[..., Any]: 

2749 """ 

2750 Load a flow from the location/script provided in a deployment's storage document. 

2751 

2752 If `ignore_storage=True` is provided, no pull from remote storage occurs. This flag 

2753 is largely for testing, and assumes the flow is already available locally. 

2754 """ 

2755 if flow_run.deployment_id is None: 

2756 raise ValueError("Flow run does not have an associated deployment") 

2757 

2758 deployment = await client.read_deployment(flow_run.deployment_id) 

2759 

2760 if deployment.entrypoint is None: 

2761 raise ValueError( 

2762 f"Deployment {deployment.id} does not have an entrypoint and can not be run." 

2763 ) 

2764 

2765 run_logger = flow_run_logger(flow_run) 

2766 

2767 runner_storage_base_path = storage_base_path or os.environ.get( 

2768 "PREFECT__STORAGE_BASE_PATH" 

2769 ) 

2770 

2771 # If there's no colon, assume it's a module path 

2772 if ":" not in deployment.entrypoint: 

2773 run_logger.debug( 

2774 f"Importing flow code from module path {deployment.entrypoint}" 

2775 ) 

2776 flow = await run_sync_in_worker_thread( 

2777 load_flow_from_entrypoint, 

2778 deployment.entrypoint, 

2779 use_placeholder_flow=use_placeholder_flow, 

2780 ) 

2781 return flow 

2782 

2783 if not ignore_storage and not deployment.pull_steps: 

2784 sys.path.insert(0, ".") 

2785 if deployment.storage_document_id: 

2786 storage_document = await client.read_block_document( 

2787 deployment.storage_document_id 

2788 ) 

2789 from prefect.blocks.core import Block 

2790 

2791 storage_block = Block._from_block_document(storage_document) 

2792 else: 

2793 basepath = deployment.path 

2794 if runner_storage_base_path: 

2795 basepath = str(basepath).replace( 

2796 "$STORAGE_BASE_PATH", runner_storage_base_path 

2797 ) 

2798 storage_block = LocalFileSystem(basepath=basepath) 

2799 

2800 from_path = ( 

2801 str(deployment.path).replace("$STORAGE_BASE_PATH", runner_storage_base_path) 

2802 if runner_storage_base_path and deployment.path 

2803 else deployment.path 

2804 ) 

2805 run_logger.info(f"Downloading flow code from storage at {from_path!r}") 

2806 await storage_block.get_directory(from_path=from_path, local_path=".") 

2807 

2808 if deployment.pull_steps: 

2809 run_logger.debug( 

2810 f"Running {len(deployment.pull_steps)} deployment pull step(s)" 

2811 ) 

2812 

2813 from prefect.deployments.steps.core import StepExecutionError, run_steps 

2814 

2815 try: 

2816 output = await run_steps( 

2817 deployment.pull_steps, 

2818 print_function=run_logger.info, 

2819 deployment=deployment, 

2820 flow_run=flow_run, 

2821 logger=run_logger, 

2822 ) 

2823 except StepExecutionError as e: 

2824 e = e.__cause__ or e 

2825 run_logger.error(str(e)) 

2826 raise 

2827 

2828 if output.get("directory"): 

2829 run_logger.debug(f"Changing working directory to {output['directory']!r}") 

2830 os.chdir(output["directory"]) 

2831 

2832 import_path = relative_path_to_current_platform(deployment.entrypoint) 

2833 run_logger.debug(f"Importing flow code from '{import_path}'") 

2834 

2835 try: 

2836 flow = await run_sync_in_worker_thread( 

2837 load_flow_from_entrypoint, 

2838 str(import_path), 

2839 use_placeholder_flow=use_placeholder_flow, 

2840 ) 

2841 except MissingFlowError: 

2842 flow = await run_sync_in_worker_thread( 

2843 load_function_and_convert_to_flow, 

2844 str(import_path), 

2845 ) 

2846 

2847 return flow 

2848 

2849 

2850def load_placeholder_flow(entrypoint: str, raises: Exception) -> Flow[P, Any]: 1a

2851 """ 

2852 Load a placeholder flow that is initialized with the same arguments as the 

2853 flow specified in the entrypoint. If called the flow will raise `raises`. 

2854 

2855 This is useful when a flow can't be loaded due to missing dependencies or 

2856 other issues but the base metadata defining the flow is still needed. 

2857 

2858 Args: 

2859 entrypoint: a string in the format `<path_to_script>:<flow_func_name>` 

2860 or a module path to a flow function 

2861 raises: an exception to raise when the flow is called 

2862 """ 

2863 

2864 def _base_placeholder(): 

2865 raise raises 

2866 

2867 def sync_placeholder_flow(*args: "P.args", **kwargs: "P.kwargs"): 

2868 _base_placeholder() 

2869 

2870 async def async_placeholder_flow(*args: "P.args", **kwargs: "P.kwargs"): 

2871 _base_placeholder() 

2872 

2873 placeholder_flow = ( 

2874 async_placeholder_flow 

2875 if is_entrypoint_async(entrypoint) 

2876 else sync_placeholder_flow 

2877 ) 

2878 

2879 arguments = load_flow_arguments_from_entrypoint(entrypoint) 

2880 arguments["fn"] = placeholder_flow 

2881 

2882 return Flow(**arguments) 

2883 

2884 

2885def safe_load_flow_from_entrypoint(entrypoint: str) -> Optional[Flow[P, Any]]: 1a

2886 """ 

2887 Safely load a Prefect flow from an entrypoint string. Returns None if loading fails. 

2888 

2889 Args: 

2890 entrypoint (str): A string identifying the flow to load. Can be in one of the following formats: 

2891 - `<path_to_script>:<flow_func_name>` 

2892 - `<path_to_script>:<class_name>.<flow_method_name>` 

2893 - `<module_path>.<flow_func_name>` 

2894 

2895 Returns: 

2896 Optional[Flow]: The loaded Prefect flow object, or None if loading fails due to errors 

2897 (e.g. unresolved dependencies, syntax errors, or missing objects). 

2898 """ 

2899 func_or_cls_def, source_code, parts = _entrypoint_definition_and_source(entrypoint) 

2900 

2901 path = entrypoint.rsplit(":", maxsplit=1)[0] if ":" in entrypoint else None 

2902 namespace = safe_load_namespace(source_code, filepath=path) 

2903 

2904 if parts[0] not in namespace: 

2905 # If the object is not in the namespace, it may be due to missing dependencies 

2906 # in annotations or default values. We will attempt to sanitize them by removing 

2907 # anything that cannot be compiled, and then recompile the function or class. 

2908 if isinstance(func_or_cls_def, (ast.FunctionDef, ast.AsyncFunctionDef)): 

2909 return _sanitize_and_load_flow(func_or_cls_def, namespace) 

2910 elif ( 

2911 isinstance(func_or_cls_def, ast.ClassDef) 

2912 and len(parts) >= 2 

2913 and func_or_cls_def.name == parts[0] 

2914 ): 

2915 method_name = parts[1] 

2916 method_def = next( 

2917 ( 

2918 stmt 

2919 for stmt in func_or_cls_def.body 

2920 if isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)) 

2921 and stmt.name == method_name 

2922 ), 

2923 None, 

2924 ) 

2925 if method_def is not None: 

2926 return _sanitize_and_load_flow(method_def, namespace) 

2927 else: 

2928 return None 

2929 

2930 obj = namespace.get(parts[0]) 

2931 for part in parts[1:]: 

2932 obj = getattr(obj, part, None) 

2933 if obj is None: 

2934 return None 

2935 return obj 

2936 

2937 

2938def _sanitize_and_load_flow( 1a

2939 func_def: Union[ast.FunctionDef, ast.AsyncFunctionDef], namespace: dict[str, Any] 

2940) -> Optional[Flow[P, Any]]: 

2941 """ 

2942 Attempt to load a flow from the function definition after sanitizing the annotations 

2943 and defaults that can't be compiled. 

2944 

2945 Args: 

2946 func_def: the function definition 

2947 namespace: the namespace to load the function into 

2948 

2949 Returns: 

2950 The loaded function or None if the function can't be loaded 

2951 after sanitizing the annotations and defaults. 

2952 """ 

2953 args = func_def.args.posonlyargs + func_def.args.args + func_def.args.kwonlyargs 

2954 if func_def.args.vararg: 

2955 args.append(func_def.args.vararg) 

2956 if func_def.args.kwarg: 

2957 args.append(func_def.args.kwarg) 

2958 # Remove annotations that can't be compiled 

2959 for arg in args: 

2960 if arg.annotation is not None: 

2961 try: 

2962 code = compile( 

2963 ast.Expression(arg.annotation), 

2964 filename="<ast>", 

2965 mode="eval", 

2966 ) 

2967 exec(code, namespace) 

2968 except Exception as e: 

2969 logger.debug( 

2970 "Failed to evaluate annotation for argument %s due to the following error. Ignoring annotation.", 

2971 arg.arg, 

2972 exc_info=e, 

2973 ) 

2974 arg.annotation = None 

2975 

2976 # Remove defaults that can't be compiled 

2977 new_defaults: list[Any] = [] 

2978 for default in func_def.args.defaults: 

2979 try: 

2980 code = compile(ast.Expression(default), "<ast>", "eval") 

2981 exec(code, namespace) 

2982 new_defaults.append(default) 

2983 except Exception as e: 

2984 logger.debug( 

2985 "Failed to evaluate default value %s due to the following error. Ignoring default.", 

2986 default, 

2987 exc_info=e, 

2988 ) 

2989 new_defaults.append( 

2990 ast.Constant( 

2991 value=None, lineno=default.lineno, col_offset=default.col_offset 

2992 ) 

2993 ) 

2994 func_def.args.defaults = new_defaults 

2995 

2996 # Remove kw_defaults that can't be compiled 

2997 new_kw_defaults: list[Any] = [] 

2998 for default in func_def.args.kw_defaults: 

2999 if default is not None: 

3000 try: 

3001 code = compile(ast.Expression(default), "<ast>", "eval") 

3002 exec(code, namespace) 

3003 new_kw_defaults.append(default) 

3004 except Exception as e: 

3005 logger.debug( 

3006 "Failed to evaluate default value %s due to the following error. Ignoring default.", 

3007 default, 

3008 exc_info=e, 

3009 ) 

3010 new_kw_defaults.append( 

3011 ast.Constant( 

3012 value=None, 

3013 lineno=default.lineno, 

3014 col_offset=default.col_offset, 

3015 ) 

3016 ) 

3017 else: 

3018 new_kw_defaults.append( 

3019 ast.Constant( 

3020 value=None, 

3021 lineno=func_def.lineno, 

3022 col_offset=func_def.col_offset, 

3023 ) 

3024 ) 

3025 func_def.args.kw_defaults = new_kw_defaults 

3026 

3027 if func_def.returns is not None: 

3028 try: 

3029 code = compile( 

3030 ast.Expression(func_def.returns), filename="<ast>", mode="eval" 

3031 ) 

3032 exec(code, namespace) 

3033 except Exception as e: 

3034 logger.debug( 

3035 "Failed to evaluate return annotation due to the following error. Ignoring annotation.", 

3036 exc_info=e, 

3037 ) 

3038 func_def.returns = None 

3039 

3040 # Attempt to compile the function without annotations and defaults that 

3041 # can't be compiled 

3042 try: 

3043 code = compile( 

3044 ast.Module(body=[func_def], type_ignores=[]), 

3045 filename="<ast>", 

3046 mode="exec", 

3047 ) 

3048 exec(code, namespace) 

3049 except Exception as e: 

3050 logger.debug("Failed to compile: %s", e) 

3051 else: 

3052 return namespace.get(func_def.name) 

3053 

3054 

3055def load_flow_arguments_from_entrypoint( 1a

3056 entrypoint: str, arguments: Optional[Union[list[str], set[str]]] = None 

3057) -> dict[str, Any]: 

3058 """ 

3059 Extract flow arguments from an entrypoint string. 

3060 

3061 Loads the source code of the entrypoint and extracts the flow arguments 

3062 from the `flow` decorator. 

3063 

3064 Args: 

3065 entrypoint: a string in the format `<path_to_script>:<flow_func_name>` 

3066 or a module path to a flow function 

3067 """ 

3068 

3069 func_def, source_code, _ = _entrypoint_definition_and_source(entrypoint) 

3070 path = None 

3071 if ":" in entrypoint: 

3072 path = entrypoint.rsplit(":")[0] 

3073 

3074 if arguments is None: 

3075 # If no arguments are provided default to known arguments that are of 

3076 # built-in types. 

3077 arguments = { 

3078 "name", 

3079 "version", 

3080 "retries", 

3081 "retry_delay_seconds", 

3082 "description", 

3083 "timeout_seconds", 

3084 "validate_parameters", 

3085 "persist_result", 

3086 "cache_result_in_memory", 

3087 "log_prints", 

3088 } 

3089 

3090 result: dict[str, Any] = {} 

3091 

3092 for decorator in func_def.decorator_list: 

3093 if ( 

3094 isinstance(decorator, ast.Call) 

3095 and getattr(decorator.func, "id", "") == "flow" 

3096 ): 

3097 for keyword in decorator.keywords: 

3098 if keyword.arg not in arguments: 

3099 continue 

3100 

3101 if isinstance(keyword.value, ast.Constant): 

3102 # Use the string value of the argument 

3103 result[cast(str, keyword.arg)] = str(keyword.value.value) 

3104 continue 

3105 

3106 # if the arg value is not a raw str (i.e. a variable or expression), 

3107 # then attempt to evaluate it 

3108 namespace = safe_load_namespace(source_code, filepath=path) 

3109 literal_arg_value = ast.get_source_segment(source_code, keyword.value) 

3110 cleaned_value = ( 

3111 literal_arg_value.replace("\n", "") if literal_arg_value else "" 

3112 ) 

3113 

3114 try: 

3115 evaluated_value = eval(cleaned_value, namespace) # type: ignore 

3116 result[cast(str, keyword.arg)] = str(evaluated_value) 

3117 except Exception as e: 

3118 logger.info( 

3119 "Failed to parse @flow argument: `%s=%s` due to the following error. Ignoring and falling back to default behavior.", 

3120 keyword.arg, 

3121 literal_arg_value, 

3122 exc_info=e, 

3123 ) 

3124 # ignore the decorator arg and fallback to default behavior 

3125 continue 

3126 

3127 if "name" in arguments and "name" not in result: 

3128 # If no matching decorator or keyword argument for `name' is found 

3129 # fallback to the function name. 

3130 result["name"] = func_def.name.replace("_", "-") 

3131 

3132 return result 

3133 

3134 

3135def is_entrypoint_async(entrypoint: str) -> bool: 1a

3136 """ 

3137 Determine if the function specified in the entrypoint is asynchronous. 

3138 

3139 Args: 

3140 entrypoint: A string in the format `<path_to_script>:<func_name>` or 

3141 a module path to a function. 

3142 

3143 Returns: 

3144 True if the function is asynchronous, False otherwise. 

3145 """ 

3146 func_def, _, _ = _entrypoint_definition_and_source(entrypoint) 

3147 return isinstance(func_def, ast.AsyncFunctionDef) 

3148 

3149 

3150def _entrypoint_definition_and_source( 1a

3151 entrypoint: str, 

3152) -> Tuple[Union[ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef], str, List[str]]: 

3153 """ 

3154 Resolves and parses the source definition of a given entrypoint. 

3155 

3156 The entrypoint can be provided in one of the following formats: 

3157 - '<path_to_script>:<flow_func_name>' 

3158 - '<path_to_script>:<class_name>.<flow_method_name>' 

3159 - '<module_path.to.flow_function>' 

3160 

3161 Returns: 

3162 A tuple containing: 

3163 - The AST node (FunctionDef, AsyncFunctionDef, or ClassDef) of the base object. 

3164 - The full source code of the file or module as a string. 

3165 - A list of attribute access parts from the object path (e.g., ['MyFlowClass', 'run']). 

3166 

3167 Raises: 

3168 ValueError: If the module or target object cannot be found. 

3169 """ 

3170 if ":" in entrypoint: 

3171 path, object_path = entrypoint.rsplit(":", maxsplit=1) 

3172 source_code = Path(path).read_text() 

3173 else: 

3174 path, object_path = entrypoint.rsplit(".", maxsplit=1) 

3175 spec = importlib.util.find_spec(path) 

3176 if not spec or not spec.origin: 

3177 raise ValueError(f"Could not find module {path!r}") 

3178 source_code = Path(spec.origin).read_text() 

3179 

3180 parsed_code = ast.parse(source_code) 

3181 parts = object_path.split(".") 

3182 base_name = parts[0] 

3183 

3184 base_def = next( 

3185 ( 

3186 node 

3187 for node in ast.walk(parsed_code) 

3188 if isinstance( 

3189 node, 

3190 ( 

3191 ast.FunctionDef, 

3192 ast.AsyncFunctionDef, 

3193 ast.ClassDef, # flow can be staticmethod/classmethod 

3194 ), 

3195 ) 

3196 and node.name == base_name 

3197 ), 

3198 None, 

3199 ) 

3200 

3201 if not base_def: 

3202 raise ValueError(f"Could not find object {base_name!r} in {path!r}") 

3203 

3204 return base_def, source_code, parts