Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/_ast.py: 18%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import ast 1a

2import math 1a

3from typing import TYPE_CHECKING, Any, Literal 1a

4 

5import anyio 1a

6from typing_extensions import TypeAlias 1a

7 

8from prefect.logging.loggers import get_logger 1a

9from prefect.settings import get_current_settings 1a

10from prefect.utilities.asyncutils import LazySemaphore 1a

11from prefect.utilities.filesystem import get_open_file_limit 1a

12 

13# Only allow half of the open file limit to be open at once to allow for other 

14# actors to open files. 

15OPEN_FILE_SEMAPHORE = LazySemaphore(lambda: math.floor(get_open_file_limit() * 0.5)) 1a

16 

17# this potentially could be a TypedDict, but you 

18# need some way to convince the type checker that 

19# Literal["flow_name", "task_name"] are being provided 

20DecoratedFnMetadata: TypeAlias = dict[str, Any] 1a

21 

22 

23async def find_prefect_decorated_functions_in_file( 1a

24 path: anyio.Path, decorator_module: str, decorator_name: Literal["flow", "task"] 

25) -> list[DecoratedFnMetadata]: 

26 logger = get_logger() 

27 decorator_name_key = f"{decorator_name}_name" 

28 decorated_functions: list[DecoratedFnMetadata] = [] 

29 

30 async with OPEN_FILE_SEMAPHORE: 

31 try: 

32 async with await anyio.open_file(path) as f: 

33 try: 

34 tree = ast.parse(await f.read()) 

35 except SyntaxError: 

36 if get_current_settings().debug_mode: 

37 logger.debug( 

38 f"Could not parse {path} as a Python file. Skipping." 

39 ) 

40 return decorated_functions 

41 except Exception as exc: 

42 if get_current_settings().debug_mode: 

43 logger.debug(f"Could not open {path}: {exc}. Skipping.") 

44 return decorated_functions 

45 

46 for node in ast.walk(tree): 

47 if isinstance( 

48 node, 

49 ( 

50 ast.FunctionDef, 

51 ast.AsyncFunctionDef, 

52 ), 

53 ): 

54 for decorator in node.decorator_list: 

55 # handles @decorator_name 

56 is_name_match = ( 

57 isinstance(decorator, ast.Name) and decorator.id == decorator_name 

58 ) 

59 # handles @decorator_name() 

60 is_func_name_match = ( 

61 isinstance(decorator, ast.Call) 

62 and isinstance(decorator.func, ast.Name) 

63 and decorator.func.id == decorator_name 

64 ) 

65 # handles @decorator_module.decorator_name 

66 is_module_attribute_match = ( 

67 isinstance(decorator, ast.Attribute) 

68 and isinstance(decorator.value, ast.Name) 

69 and decorator.value.id == decorator_module 

70 and decorator.attr == decorator_name 

71 ) 

72 # handles @decorator_module.decorator_name() 

73 is_module_attribute_func_match = ( 

74 isinstance(decorator, ast.Call) 

75 and isinstance(decorator.func, ast.Attribute) 

76 and decorator.func.attr == decorator_name 

77 and isinstance(decorator.func.value, ast.Name) 

78 and decorator.func.value.id == decorator_module 

79 ) 

80 if is_name_match or is_module_attribute_match: 

81 decorated_functions.append( 

82 { 

83 decorator_name_key: node.name, 

84 "function_name": node.name, 

85 "filepath": str(path), 

86 } 

87 ) 

88 if is_func_name_match or is_module_attribute_func_match: 

89 name_kwarg_node = None 

90 if TYPE_CHECKING: 

91 assert isinstance(decorator, ast.Call) 

92 for kw in decorator.keywords: 

93 if kw.arg == "name": 

94 name_kwarg_node = kw 

95 break 

96 if name_kwarg_node is not None and isinstance( 

97 name_kwarg_node.value, ast.Constant 

98 ): 

99 decorated_fn_name = name_kwarg_node.value.value 

100 else: 

101 decorated_fn_name = node.name 

102 decorated_functions.append( 

103 { 

104 decorator_name_key: decorated_fn_name, 

105 "function_name": node.name, 

106 "filepath": str(path), 

107 } 

108 ) 

109 return decorated_functions 

110 

111 

112async def find_flow_functions_in_file(path: anyio.Path) -> list[DecoratedFnMetadata]: 1a

113 return await find_prefect_decorated_functions_in_file(path, "prefect", "flow") 

114 

115 

116async def find_task_functions_in_file(path: anyio.Path) -> list[DecoratedFnMetadata]: 1a

117 return await find_prefect_decorated_functions_in_file(path, "prefect", "task")