Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/visualization.py: 0%
86 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Utilities for working with Flow.visualize()
3"""
5from collections.abc import Coroutine
6from functools import partial
7from typing import Any, Literal, Optional, Union, overload
9import graphviz # type: ignore # no typing stubs available
10from typing_extensions import Self
12from prefect._internal.concurrency.api import from_async
15class FlowVisualizationError(Exception):
16 pass
19class VisualizationUnsupportedError(Exception):
20 pass
23class TaskVizTrackerState:
24 current: Optional["TaskVizTracker"] = None
27class GraphvizImportError(Exception):
28 pass
31class GraphvizExecutableNotFoundError(Exception):
32 pass
35def get_task_viz_tracker() -> Optional["TaskVizTracker"]:
36 return TaskVizTrackerState.current
39@overload
40def track_viz_task(
41 is_async: Literal[True],
42 task_name: str,
43 parameters: dict[str, Any],
44 viz_return_value: Optional[Any] = None,
45) -> Coroutine[Any, Any, Any]: ...
48@overload
49def track_viz_task(
50 is_async: Literal[False],
51 task_name: str,
52 parameters: dict[str, Any],
53 viz_return_value: Optional[Any] = None,
54) -> Any: ...
57def track_viz_task(
58 is_async: bool,
59 task_name: str,
60 parameters: dict[str, Any],
61 viz_return_value: Optional[Any] = None,
62) -> Union[Coroutine[Any, Any, Any], Any]:
63 """Return a result if sync otherwise return a coroutine that returns the result"""
64 if is_async:
65 return from_async.wait_for_call_in_loop_thread(
66 partial(_track_viz_task, task_name, parameters, viz_return_value)
67 )
68 else:
69 return _track_viz_task(task_name, parameters, viz_return_value)
72def _track_viz_task(
73 task_name: str,
74 parameters: dict[str, Any],
75 viz_return_value: Optional[Any] = None,
76) -> Any:
77 task_run_tracker = get_task_viz_tracker()
78 if task_run_tracker:
79 upstream_tasks: list[VizTask] = []
80 for _, v in parameters.items():
81 if isinstance(v, VizTask):
82 upstream_tasks.append(v)
83 # if it's an object that we've already seen,
84 # we can use the object id to find if there is a trackable task
85 # if so, add it to the upstream tasks
86 elif id(v) in task_run_tracker.object_id_to_task:
87 upstream_tasks.append(task_run_tracker.object_id_to_task[id(v)])
89 viz_task = VizTask(
90 name=task_name,
91 upstream_tasks=upstream_tasks,
92 )
93 task_run_tracker.add_task(viz_task)
95 if viz_return_value:
96 task_run_tracker.link_viz_return_value_to_viz_task(
97 viz_return_value, viz_task
98 )
99 return viz_return_value
101 return viz_task
104class VizTask:
105 def __init__(
106 self,
107 name: str,
108 upstream_tasks: Optional[list["VizTask"]] = None,
109 ):
110 self.name = name
111 self.upstream_tasks: list[VizTask] = upstream_tasks if upstream_tasks else []
114class TaskVizTracker:
115 def __init__(self):
116 self.tasks: list[VizTask] = []
117 self.dynamic_task_counter: dict[str, int] = {}
118 self.object_id_to_task: dict[int, VizTask] = {}
120 def add_task(self, task: VizTask) -> None:
121 if task.name not in self.dynamic_task_counter:
122 self.dynamic_task_counter[task.name] = 0
123 else:
124 self.dynamic_task_counter[task.name] += 1
126 task.name = f"{task.name}-{self.dynamic_task_counter[task.name]}"
127 self.tasks.append(task)
129 def __enter__(self) -> Self:
130 TaskVizTrackerState.current = self
131 return self
133 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
134 TaskVizTrackerState.current = None
136 def link_viz_return_value_to_viz_task(
137 self, viz_return_value: Any, viz_task: VizTask
138 ) -> None:
139 """
140 We cannot track booleans, Ellipsis, None, NotImplemented, or the integers from -5 to 256
141 because they are singletons.
142 """
143 from prefect.utilities.engine import UNTRACKABLE_TYPES
145 if (type(viz_return_value) in UNTRACKABLE_TYPES) or (
146 isinstance(viz_return_value, int) and (-5 <= viz_return_value <= 256)
147 ):
148 return
149 self.object_id_to_task[id(viz_return_value)] = viz_task
152def build_task_dependencies(task_run_tracker: TaskVizTracker) -> graphviz.Digraph:
153 """
154 Constructs a Graphviz directed graph object that represents the dependencies
155 between tasks in the given TaskVizTracker.
157 Parameters:
158 - task_run_tracker (TaskVizTracker): An object containing tasks and their
159 dependencies.
161 Returns:
162 - graphviz.Digraph: A directed graph object depicting the relationships and
163 dependencies between tasks.
165 Raises:
166 - GraphvizImportError: If there's an ImportError related to graphviz.
167 - FlowVisualizationError: If there's any other error during the visualization
168 process or if return values of tasks are directly accessed without
169 specifying a `viz_return_value`.
170 """
171 try:
172 g = graphviz.Digraph()
173 for task in task_run_tracker.tasks:
174 g.node(task.name) # type: ignore[reportUnknownMemberType]
175 for upstream in task.upstream_tasks:
176 g.edge(upstream.name, task.name) # type: ignore[reportUnknownMemberType]
177 return g
178 except ImportError as exc:
179 raise GraphvizImportError from exc
180 except Exception:
181 raise FlowVisualizationError(
182 "Something went wrong building the flow's visualization."
183 " If you're interacting with the return value of a task"
184 " directly inside of your flow, you must set a set a `viz_return_value`"
185 ", for example `@task(viz_return_value=[1, 2, 3])`."
186 )
189def visualize_task_dependencies(graph: graphviz.Digraph, flow_run_name: str) -> None:
190 """
191 Renders and displays a Graphviz directed graph representing task dependencies.
193 The graph is rendered in PNG format and saved with the name specified by
194 flow_run_name. After rendering, the visualization is opened and displayed.
196 Parameters:
197 - graph (graphviz.Digraph): The directed graph object to visualize.
198 - flow_run_name (str): The name to use when saving the rendered graph image.
200 Raises:
201 - GraphvizExecutableNotFoundError: If Graphviz isn't found on the system.
202 - FlowVisualizationError: If there's any other error during the visualization
203 process or if return values of tasks are directly accessed without
204 specifying a `viz_return_value`.
205 """
206 try:
207 graph.render(filename=flow_run_name, view=True, format="png", cleanup=True) # type: ignore[reportUnknownMemberType]
208 except graphviz.backend.ExecutableNotFound as exc:
209 msg = (
210 "It appears you do not have Graphviz installed, or it is not on your "
211 "PATH. Please install Graphviz from http://www.graphviz.org/download/. "
212 "Note: Just installing the `graphviz` python package is not "
213 "sufficient."
214 )
215 raise GraphvizExecutableNotFoundError(msg) from exc
216 except Exception:
217 raise FlowVisualizationError(
218 "Something went wrong building the flow's visualization."
219 " If you're interacting with the return value of a task"
220 " directly inside of your flow, you must set a set a `viz_return_value`"
221 ", for example `@task(viz_return_value=[1, 2, 3])`."
222 )