Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/task.py: 29%

47 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import inspect 1a

4import logging 1a

5from typing import Any, Optional 1a

6 

7import typer 1a

8 

9from prefect.cli._types import PrefectTyper 1a

10from prefect.cli._utilities import exit_with_error 1a

11from prefect.cli.root import app 1a

12from prefect.logging import get_logger 1a

13from prefect.task_worker import serve as task_serve 1a

14from prefect.tasks import Task 1a

15from prefect.utilities.importtools import import_object, load_module 1a

16 

17task_app: PrefectTyper = PrefectTyper(name="task", help="Work with task scheduling.") 1a

18app.add_typer(task_app, aliases=["task"]) 1a

19 

20logger: logging.Logger = get_logger("prefect.cli.task") 1a

21 

22 

23def _import_tasks_from_module(module: str) -> list[Task[Any, Any]]: 1a

24 mod = load_module(module) 

25 return [ 

26 obj 

27 for _, obj in inspect.getmembers(mod) 

28 if isinstance(obj, Task) and not inspect.ismodule(obj) 

29 ] 

30 

31 

32@task_app.command() 1a

33async def serve( 1a

34 entrypoints: Optional[list[str]] = typer.Argument( 

35 None, 

36 help="The paths to one or more tasks, in the form of `./path/to/file.py:task_func_name`.", 

37 ), 

38 module: Optional[list[str]] = typer.Option( 

39 None, 

40 "--module", 

41 "-m", 

42 help="The module(s) to import the tasks from.", 

43 ), 

44 limit: int = typer.Option( 

45 10, 

46 help="The maximum number of tasks that can be run concurrently. Defaults to 10.", 

47 ), 

48): 

49 """ 

50 Serve the provided tasks so that their runs may be submitted to and 

51 executed in the engine. 

52 

53 Args: 

54 entrypoints: List of strings representing the paths to one or more 

55 tasks. Each path should be in the format 

56 `./path/to/file.py:task_func_name`. 

57 module: The module(s) to import the task definitions from. 

58 limit: The maximum number of tasks that can be run concurrently. Defaults to 10. 

59 """ 

60 if (entrypoints and any(entrypoints)) and (module and any(module)): 

61 exit_with_error( 

62 "You may provide entrypoints or modules, but not both at the same time." 

63 ) 

64 

65 if not entrypoints and not module: 

66 exit_with_error( 

67 "You must provide either `path/to/file.py:task_func` or `--module module_name`." 

68 ) 

69 

70 tasks: list[Task[Any, Any]] = [] 

71 

72 if entrypoints: 

73 for entrypoint in entrypoints: 

74 if ".py:" not in entrypoint: 

75 exit_with_error( 

76 ( 

77 f"Error: Invalid entrypoint format {entrypoint!r}. It " 

78 "must be of the form `./path/to/file.py:task_func_name`." 

79 ) 

80 ) 

81 

82 try: 

83 tasks.append(import_object(entrypoint)) 

84 except Exception: 

85 mod, task_name = entrypoint.split(":") 

86 exit_with_error( 

87 f"Error: {mod!r} has no function {task_name!r}.", style="red" 

88 ) 

89 

90 elif module: 

91 for mod in module: 

92 try: 

93 module_tasks = _import_tasks_from_module(mod) 

94 except Exception as e: 

95 exit_with_error( 

96 f"Module '{mod}' could not be imported. Please check the module name and try again.\n\n{e.__class__.__name__}: {e}" 

97 ) 

98 plural = "s" if len(module_tasks) != 1 else "" 

99 logger.debug(f"Found {len(module_tasks)} task{plural} in {mod!r}.") 

100 tasks.extend(module_tasks) 

101 

102 if not tasks: 

103 sources = ( 

104 f"entrypoints: {entrypoints!r}" if entrypoints else f"modules: {module!r}" 

105 ) 

106 exit_with_error(f"No tasks found to serve in {sources}.") 

107 

108 await task_serve(*tasks, limit=limit)