Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/task.py: 29%
47 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import inspect 1a
4import logging 1a
5from typing import Any, Optional 1a
7import typer 1a
9from prefect.cli._types import PrefectTyper 1a
10from prefect.cli._utilities import exit_with_error 1a
11from prefect.cli.root import app 1a
12from prefect.logging import get_logger 1a
13from prefect.task_worker import serve as task_serve 1a
14from prefect.tasks import Task 1a
15from prefect.utilities.importtools import import_object, load_module 1a
17task_app: PrefectTyper = PrefectTyper(name="task", help="Work with task scheduling.") 1a
18app.add_typer(task_app, aliases=["task"]) 1a
20logger: logging.Logger = get_logger("prefect.cli.task") 1a
23def _import_tasks_from_module(module: str) -> list[Task[Any, Any]]: 1a
24 mod = load_module(module)
25 return [
26 obj
27 for _, obj in inspect.getmembers(mod)
28 if isinstance(obj, Task) and not inspect.ismodule(obj)
29 ]
32@task_app.command() 1a
33async def serve( 1a
34 entrypoints: Optional[list[str]] = typer.Argument(
35 None,
36 help="The paths to one or more tasks, in the form of `./path/to/file.py:task_func_name`.",
37 ),
38 module: Optional[list[str]] = typer.Option(
39 None,
40 "--module",
41 "-m",
42 help="The module(s) to import the tasks from.",
43 ),
44 limit: int = typer.Option(
45 10,
46 help="The maximum number of tasks that can be run concurrently. Defaults to 10.",
47 ),
48):
49 """
50 Serve the provided tasks so that their runs may be submitted to and
51 executed in the engine.
53 Args:
54 entrypoints: List of strings representing the paths to one or more
55 tasks. Each path should be in the format
56 `./path/to/file.py:task_func_name`.
57 module: The module(s) to import the task definitions from.
58 limit: The maximum number of tasks that can be run concurrently. Defaults to 10.
59 """
60 if (entrypoints and any(entrypoints)) and (module and any(module)):
61 exit_with_error(
62 "You may provide entrypoints or modules, but not both at the same time."
63 )
65 if not entrypoints and not module:
66 exit_with_error(
67 "You must provide either `path/to/file.py:task_func` or `--module module_name`."
68 )
70 tasks: list[Task[Any, Any]] = []
72 if entrypoints:
73 for entrypoint in entrypoints:
74 if ".py:" not in entrypoint:
75 exit_with_error(
76 (
77 f"Error: Invalid entrypoint format {entrypoint!r}. It "
78 "must be of the form `./path/to/file.py:task_func_name`."
79 )
80 )
82 try:
83 tasks.append(import_object(entrypoint))
84 except Exception:
85 mod, task_name = entrypoint.split(":")
86 exit_with_error(
87 f"Error: {mod!r} has no function {task_name!r}.", style="red"
88 )
90 elif module:
91 for mod in module:
92 try:
93 module_tasks = _import_tasks_from_module(mod)
94 except Exception as e:
95 exit_with_error(
96 f"Module '{mod}' could not be imported. Please check the module name and try again.\n\n{e.__class__.__name__}: {e}"
97 )
98 plural = "s" if len(module_tasks) != 1 else ""
99 logger.debug(f"Found {len(module_tasks)} task{plural} in {mod!r}.")
100 tasks.extend(module_tasks)
102 if not tasks:
103 sources = (
104 f"entrypoints: {entrypoints!r}" if entrypoints else f"modules: {module!r}"
105 )
106 exit_with_error(f"No tasks found to serve in {sources}.")
108 await task_serve(*tasks, limit=limit)