Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/visualization.py: 0%

86 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Utilities for working with Flow.visualize() 

3""" 

4 

5from collections.abc import Coroutine 

6from functools import partial 

7from typing import Any, Literal, Optional, Union, overload 

8 

9import graphviz # type: ignore # no typing stubs available 

10from typing_extensions import Self 

11 

12from prefect._internal.concurrency.api import from_async 

13 

14 

15class FlowVisualizationError(Exception): 

16 pass 

17 

18 

19class VisualizationUnsupportedError(Exception): 

20 pass 

21 

22 

23class TaskVizTrackerState: 

24 current: Optional["TaskVizTracker"] = None 

25 

26 

27class GraphvizImportError(Exception): 

28 pass 

29 

30 

31class GraphvizExecutableNotFoundError(Exception): 

32 pass 

33 

34 

35def get_task_viz_tracker() -> Optional["TaskVizTracker"]: 

36 return TaskVizTrackerState.current 

37 

38 

39@overload 

40def track_viz_task( 

41 is_async: Literal[True], 

42 task_name: str, 

43 parameters: dict[str, Any], 

44 viz_return_value: Optional[Any] = None, 

45) -> Coroutine[Any, Any, Any]: ... 

46 

47 

48@overload 

49def track_viz_task( 

50 is_async: Literal[False], 

51 task_name: str, 

52 parameters: dict[str, Any], 

53 viz_return_value: Optional[Any] = None, 

54) -> Any: ... 

55 

56 

57def track_viz_task( 

58 is_async: bool, 

59 task_name: str, 

60 parameters: dict[str, Any], 

61 viz_return_value: Optional[Any] = None, 

62) -> Union[Coroutine[Any, Any, Any], Any]: 

63 """Return a result if sync otherwise return a coroutine that returns the result""" 

64 if is_async: 

65 return from_async.wait_for_call_in_loop_thread( 

66 partial(_track_viz_task, task_name, parameters, viz_return_value) 

67 ) 

68 else: 

69 return _track_viz_task(task_name, parameters, viz_return_value) 

70 

71 

72def _track_viz_task( 

73 task_name: str, 

74 parameters: dict[str, Any], 

75 viz_return_value: Optional[Any] = None, 

76) -> Any: 

77 task_run_tracker = get_task_viz_tracker() 

78 if task_run_tracker: 

79 upstream_tasks: list[VizTask] = [] 

80 for _, v in parameters.items(): 

81 if isinstance(v, VizTask): 

82 upstream_tasks.append(v) 

83 # if it's an object that we've already seen, 

84 # we can use the object id to find if there is a trackable task 

85 # if so, add it to the upstream tasks 

86 elif id(v) in task_run_tracker.object_id_to_task: 

87 upstream_tasks.append(task_run_tracker.object_id_to_task[id(v)]) 

88 

89 viz_task = VizTask( 

90 name=task_name, 

91 upstream_tasks=upstream_tasks, 

92 ) 

93 task_run_tracker.add_task(viz_task) 

94 

95 if viz_return_value: 

96 task_run_tracker.link_viz_return_value_to_viz_task( 

97 viz_return_value, viz_task 

98 ) 

99 return viz_return_value 

100 

101 return viz_task 

102 

103 

104class VizTask: 

105 def __init__( 

106 self, 

107 name: str, 

108 upstream_tasks: Optional[list["VizTask"]] = None, 

109 ): 

110 self.name = name 

111 self.upstream_tasks: list[VizTask] = upstream_tasks if upstream_tasks else [] 

112 

113 

114class TaskVizTracker: 

115 def __init__(self): 

116 self.tasks: list[VizTask] = [] 

117 self.dynamic_task_counter: dict[str, int] = {} 

118 self.object_id_to_task: dict[int, VizTask] = {} 

119 

120 def add_task(self, task: VizTask) -> None: 

121 if task.name not in self.dynamic_task_counter: 

122 self.dynamic_task_counter[task.name] = 0 

123 else: 

124 self.dynamic_task_counter[task.name] += 1 

125 

126 task.name = f"{task.name}-{self.dynamic_task_counter[task.name]}" 

127 self.tasks.append(task) 

128 

129 def __enter__(self) -> Self: 

130 TaskVizTrackerState.current = self 

131 return self 

132 

133 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: 

134 TaskVizTrackerState.current = None 

135 

136 def link_viz_return_value_to_viz_task( 

137 self, viz_return_value: Any, viz_task: VizTask 

138 ) -> None: 

139 """ 

140 We cannot track booleans, Ellipsis, None, NotImplemented, or the integers from -5 to 256 

141 because they are singletons. 

142 """ 

143 from prefect.utilities.engine import UNTRACKABLE_TYPES 

144 

145 if (type(viz_return_value) in UNTRACKABLE_TYPES) or ( 

146 isinstance(viz_return_value, int) and (-5 <= viz_return_value <= 256) 

147 ): 

148 return 

149 self.object_id_to_task[id(viz_return_value)] = viz_task 

150 

151 

152def build_task_dependencies(task_run_tracker: TaskVizTracker) -> graphviz.Digraph: 

153 """ 

154 Constructs a Graphviz directed graph object that represents the dependencies 

155 between tasks in the given TaskVizTracker. 

156 

157 Parameters: 

158 - task_run_tracker (TaskVizTracker): An object containing tasks and their 

159 dependencies. 

160 

161 Returns: 

162 - graphviz.Digraph: A directed graph object depicting the relationships and 

163 dependencies between tasks. 

164 

165 Raises: 

166 - GraphvizImportError: If there's an ImportError related to graphviz. 

167 - FlowVisualizationError: If there's any other error during the visualization 

168 process or if return values of tasks are directly accessed without 

169 specifying a `viz_return_value`. 

170 """ 

171 try: 

172 g = graphviz.Digraph() 

173 for task in task_run_tracker.tasks: 

174 g.node(task.name) # type: ignore[reportUnknownMemberType] 

175 for upstream in task.upstream_tasks: 

176 g.edge(upstream.name, task.name) # type: ignore[reportUnknownMemberType] 

177 return g 

178 except ImportError as exc: 

179 raise GraphvizImportError from exc 

180 except Exception: 

181 raise FlowVisualizationError( 

182 "Something went wrong building the flow's visualization." 

183 " If you're interacting with the return value of a task" 

184 " directly inside of your flow, you must set a set a `viz_return_value`" 

185 ", for example `@task(viz_return_value=[1, 2, 3])`." 

186 ) 

187 

188 

189def visualize_task_dependencies(graph: graphviz.Digraph, flow_run_name: str) -> None: 

190 """ 

191 Renders and displays a Graphviz directed graph representing task dependencies. 

192 

193 The graph is rendered in PNG format and saved with the name specified by 

194 flow_run_name. After rendering, the visualization is opened and displayed. 

195 

196 Parameters: 

197 - graph (graphviz.Digraph): The directed graph object to visualize. 

198 - flow_run_name (str): The name to use when saving the rendered graph image. 

199 

200 Raises: 

201 - GraphvizExecutableNotFoundError: If Graphviz isn't found on the system. 

202 - FlowVisualizationError: If there's any other error during the visualization 

203 process or if return values of tasks are directly accessed without 

204 specifying a `viz_return_value`. 

205 """ 

206 try: 

207 graph.render(filename=flow_run_name, view=True, format="png", cleanup=True) # type: ignore[reportUnknownMemberType] 

208 except graphviz.backend.ExecutableNotFound as exc: 

209 msg = ( 

210 "It appears you do not have Graphviz installed, or it is not on your " 

211 "PATH. Please install Graphviz from http://www.graphviz.org/download/. " 

212 "Note: Just installing the `graphviz` python package is not " 

213 "sufficient." 

214 ) 

215 raise GraphvizExecutableNotFoundError(msg) from exc 

216 except Exception: 

217 raise FlowVisualizationError( 

218 "Something went wrong building the flow's visualization." 

219 " If you're interacting with the return value of a task" 

220 " directly inside of your flow, you must set a set a `viz_return_value`" 

221 ", for example `@task(viz_return_value=[1, 2, 3])`." 

222 )