Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/_ast.py: 18%
54 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import ast 1a
2import math 1a
3from typing import TYPE_CHECKING, Any, Literal 1a
5import anyio 1a
6from typing_extensions import TypeAlias 1a
8from prefect.logging.loggers import get_logger 1a
9from prefect.settings import get_current_settings 1a
10from prefect.utilities.asyncutils import LazySemaphore 1a
11from prefect.utilities.filesystem import get_open_file_limit 1a
13# Only allow half of the open file limit to be open at once to allow for other
14# actors to open files.
15OPEN_FILE_SEMAPHORE = LazySemaphore(lambda: math.floor(get_open_file_limit() * 0.5)) 1a
17# this potentially could be a TypedDict, but you
18# need some way to convince the type checker that
19# Literal["flow_name", "task_name"] are being provided
20DecoratedFnMetadata: TypeAlias = dict[str, Any] 1a
23async def find_prefect_decorated_functions_in_file( 1a
24 path: anyio.Path, decorator_module: str, decorator_name: Literal["flow", "task"]
25) -> list[DecoratedFnMetadata]:
26 logger = get_logger()
27 decorator_name_key = f"{decorator_name}_name"
28 decorated_functions: list[DecoratedFnMetadata] = []
30 async with OPEN_FILE_SEMAPHORE:
31 try:
32 async with await anyio.open_file(path) as f:
33 try:
34 tree = ast.parse(await f.read())
35 except SyntaxError:
36 if get_current_settings().debug_mode:
37 logger.debug(
38 f"Could not parse {path} as a Python file. Skipping."
39 )
40 return decorated_functions
41 except Exception as exc:
42 if get_current_settings().debug_mode:
43 logger.debug(f"Could not open {path}: {exc}. Skipping.")
44 return decorated_functions
46 for node in ast.walk(tree):
47 if isinstance(
48 node,
49 (
50 ast.FunctionDef,
51 ast.AsyncFunctionDef,
52 ),
53 ):
54 for decorator in node.decorator_list:
55 # handles @decorator_name
56 is_name_match = (
57 isinstance(decorator, ast.Name) and decorator.id == decorator_name
58 )
59 # handles @decorator_name()
60 is_func_name_match = (
61 isinstance(decorator, ast.Call)
62 and isinstance(decorator.func, ast.Name)
63 and decorator.func.id == decorator_name
64 )
65 # handles @decorator_module.decorator_name
66 is_module_attribute_match = (
67 isinstance(decorator, ast.Attribute)
68 and isinstance(decorator.value, ast.Name)
69 and decorator.value.id == decorator_module
70 and decorator.attr == decorator_name
71 )
72 # handles @decorator_module.decorator_name()
73 is_module_attribute_func_match = (
74 isinstance(decorator, ast.Call)
75 and isinstance(decorator.func, ast.Attribute)
76 and decorator.func.attr == decorator_name
77 and isinstance(decorator.func.value, ast.Name)
78 and decorator.func.value.id == decorator_module
79 )
80 if is_name_match or is_module_attribute_match:
81 decorated_functions.append(
82 {
83 decorator_name_key: node.name,
84 "function_name": node.name,
85 "filepath": str(path),
86 }
87 )
88 if is_func_name_match or is_module_attribute_func_match:
89 name_kwarg_node = None
90 if TYPE_CHECKING:
91 assert isinstance(decorator, ast.Call)
92 for kw in decorator.keywords:
93 if kw.arg == "name":
94 name_kwarg_node = kw
95 break
96 if name_kwarg_node is not None and isinstance(
97 name_kwarg_node.value, ast.Constant
98 ):
99 decorated_fn_name = name_kwarg_node.value.value
100 else:
101 decorated_fn_name = node.name
102 decorated_functions.append(
103 {
104 decorator_name_key: decorated_fn_name,
105 "function_name": node.name,
106 "filepath": str(path),
107 }
108 )
109 return decorated_functions
112async def find_flow_functions_in_file(path: anyio.Path) -> list[DecoratedFnMetadata]: 1a
113 return await find_prefect_decorated_functions_in_file(path, "prefect", "flow")
116async def find_task_functions_in_file(path: anyio.Path) -> list[DecoratedFnMetadata]: 1a
117 return await find_prefect_decorated_functions_in_file(path, "prefect", "task")