diff --git a/README.md b/README.md
index 1407f152..c0e3dc9d 100644
--- a/README.md
+++ b/README.md
@@ -88,6 +88,14 @@ $ ./tools/sof-dump-status.py -p
apl
```
+Example: merge multiple topology files for a single PCM/graph view
+```
+$ ./tools/tplgtool2.py --merge -d pcm,graph -D /tmp/ \
+ /path/to/function.tplg /path/to/feature.tplg
+```
+This writes a single merged graph (for example, `merged_2_tplgs.png`) and prints
+duplicate warnings in merge mode when widget names, PCM IDs, or PCM names collide.
+
### test case result
| exit code | display | description |
| --------- | ------- | ---------------------- |
diff --git a/tools/README.md b/tools/README.md
index 1925c918..da675d35 100644
--- a/tools/README.md
+++ b/tools/README.md
@@ -52,6 +52,9 @@ Topologies provide important information for testing.
standalone to prints a list of PCMs and generates a graph with more information than
the graph generated by the old tplgtoo.py. As of May 2024, the topology graph
displayed in SOF test results has been generated by this.
+
Supports `--merge` to combine multiple `.tplg` files into one logical topology for
+ `-d pcm` and `-d graph` output. Merge mode also warns about duplicate widget names,
+ PCM IDs, and PCM names across the merged inputs.
* tplgtool.py
Old toplogy parsing library. Still used by sof-tplgreader.py but new features
diff --git a/tools/test_tplgtool2_merge.py b/tools/test_tplgtool2_merge.py
new file mode 100644
index 00000000..1de9f5e0
--- /dev/null
+++ b/tools/test_tplgtool2_merge.py
@@ -0,0 +1,88 @@
+#!/usr/bin/env python3
+
+import pathlib
+import sys
+import unittest
+
+sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
+import tplgtool2
+
+
+class _FakeGroupedTplg:
+ def __init__(self, widget_names, pcm_pairs):
+ self.widget_list = [{"widget": {"name": name}} for name in widget_names]
+ self.pcm_list = [{"pcm_id": pcm_id, "pcm_name": pcm_name} for pcm_id, pcm_name in pcm_pairs]
+ self.graph_list = []
+
+
+class _FakeGroupedTplgWithGraph:
+ def __init__(self, widgets, edges):
+ self.widget_list = [
+ {"widget": {"name": name, "sname": sname}}
+ for name, sname in widgets
+ ]
+ self.graph_list = [
+ {"source": source, "sink": sink}
+ for source, sink in edges
+ ]
+
+
+class TestTplgtool2MergeWarnings(unittest.TestCase):
+ def test_no_duplicate_warnings(self):
+ fake = _FakeGroupedTplg(
+ widget_names=["BUF1.0", "PGA1.0"],
+ pcm_pairs=[(0, "Speaker"), (1, "Mic")],
+ )
+
+ warnings = tplgtool2.get_merge_duplicate_warnings(fake)
+
+ self.assertEqual(warnings, [])
+
+ def test_duplicate_warnings_include_widget_pcm_id_and_name(self):
+ fake = _FakeGroupedTplg(
+ widget_names=["BUF1.0", "BUF1.0", "PGA1.0"],
+ pcm_pairs=[(0, "Speaker"), (0, "Headset"), (2, "Speaker")],
+ )
+
+ warnings = tplgtool2.get_merge_duplicate_warnings(fake)
+
+ self.assertEqual(len(warnings), 3)
+ self.assertTrue(any("duplicate widget names found" in w and "BUF1.0" in w for w in warnings))
+ self.assertTrue(any("duplicate PCM IDs found" in w and "0" in w for w in warnings))
+ self.assertTrue(any("duplicate PCM names found" in w and "Speaker" in w for w in warnings))
+
+
+class TestTplgtool2MergeSources(unittest.TestCase):
+ def test_collect_graph_name_sources_collects_widget_sname_and_edges(self):
+ fake = _FakeGroupedTplgWithGraph(
+ widgets=[("BUF1.0", "BUF_A"), ("PGA1.0", "")],
+ edges=[("BUF_A", "PGA1.0"), ("EXTERNAL_SRC", "BUF_A")],
+ )
+
+ sources = tplgtool2.collect_graph_name_sources(fake, "topo_a")
+
+ self.assertEqual(sources["BUF1.0"], {"topo_a"})
+ self.assertEqual(sources["BUF_A"], {"topo_a"})
+ self.assertEqual(sources["PGA1.0"], {"topo_a"})
+ self.assertEqual(sources["EXTERNAL_SRC"], {"topo_a"})
+
+ def test_collect_graph_name_sources_accumulates_multiple_files(self):
+ topo_a = _FakeGroupedTplgWithGraph(
+ widgets=[("BUF1.0", "BUF_SHARED")],
+ edges=[("BUF_SHARED", "PGA1.0")],
+ )
+ topo_b = _FakeGroupedTplgWithGraph(
+ widgets=[("BUF2.0", "BUF_SHARED")],
+ edges=[("BUF_SHARED", "PGA2.0")],
+ )
+
+ sources = tplgtool2.collect_graph_name_sources(topo_a, "topo_a")
+ sources = tplgtool2.collect_graph_name_sources(topo_b, "topo_b", sources)
+
+ self.assertEqual(sources["BUF_SHARED"], {"topo_a", "topo_b"})
+ self.assertEqual(sources["BUF1.0"], {"topo_a"})
+ self.assertEqual(sources["BUF2.0"], {"topo_b"})
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tools/tplgtool2.py b/tools/tplgtool2.py
index 0adfb998..0ae0e072 100755
--- a/tools/tplgtool2.py
+++ b/tools/tplgtool2.py
@@ -7,7 +7,7 @@
import re
import sys
import typing
-from collections import defaultdict
+from collections import Counter, defaultdict
from construct import this, Container, ListContainer, Struct, Switch, Array, Bytes, GreedyBytes, GreedyRange, FocusedSeq, Pass, Padded, Padding, Prefixed, Flag, Byte, Int16ul, Int32ul, Int64ul, Terminated
from dataclasses import dataclass
from functools import cached_property, partial
@@ -787,7 +787,11 @@ def _build_nodes_names_in_graph(graph_list: "list[Container]", nodes_dict: "dict
names = {}
for edge in graph_list:
for ename in [edge["source"], edge["sink"]]:
- node = nodes_dict[ename]
+ node = nodes_dict.get(ename)
+ if node is None:
+ # Some feature topologies reference graph endpoints not present in
+ # DAPM widgets. Keep these endpoints as-is instead of failing.
+ continue
for name in [node["widget"]["name"], node["widget"]["sname"]]:
if name != ename:
names[name] = ename
@@ -811,6 +815,17 @@ def _build_edges(graph_list: "list[Container]") -> "tuple[dict[str, list[str]],
backward_edge[edge["sink"]].append(edge["source"])
return forward_edge, backward_edge
+ @staticmethod
+ def _build_unknown_nodes(graph_list: "list[Container]", nodes_dict: "dict[str, Container]") -> "set[str]":
+ "Collect graph nodes that do not map to any widget name/sname."
+ unknown = set()
+ for edge in graph_list:
+ if edge["source"] not in nodes_dict:
+ unknown.add(edge["source"])
+ if edge["sink"] not in nodes_dict:
+ unknown.add(edge["sink"])
+ return unknown
+
@staticmethod
def _build_leaves(node_names: "list[str]", forward_edges: "dict[str, list[str]]", backward_edges: "dict[str, list[str]]") -> "tuple[set[str], set[str], set[str]]":
r"""Build leaves.
@@ -836,15 +851,24 @@ def _build_leaves(node_names: "list[str]", forward_edges: "dict[str, list[str]]"
tails.add(node)
return isolated, heads, tails
- def __init__(self, grouped_tplg: GroupedTplg):
+ def __init__(
+ self,
+ grouped_tplg: GroupedTplg,
+ merged_node_sources: typing.Optional["dict[str, set[str]]"] = None,
+ ):
"Build graph from grouped topology data."
self.errors = 0 # non fatal errors
self.show_core = 'auto'
self.without_nodeinfo = False
self._tplg = grouped_tplg
+ self._merged_node_sources = merged_node_sources or {}
self._nodes_dict = TplgGraph._build_nodes_dict(grouped_tplg.widget_list)
self._nodes_names_in_graph = TplgGraph._build_nodes_names_in_graph(grouped_tplg.graph_list, self._nodes_dict)
self._forward_edges, self._backward_edges = TplgGraph._build_edges(grouped_tplg.graph_list)
+ self._unknown_nodes = TplgGraph._build_unknown_nodes(grouped_tplg.graph_list, self._nodes_dict)
+ if self._unknown_nodes:
+ unknown_nodes = ", ".join(sorted(self._unknown_nodes))
+ print(f"WARNING: graph-only node(s) without matching widget: {unknown_nodes}", file=sys.stderr)
self._isolated, self._heads, self._tails = TplgGraph._build_leaves(map(self.node_name_in_graph, grouped_tplg.widget_list), self._forward_edges, self._backward_edges)
def _node_name_in_graph_from_name(self, name: str) -> str:
@@ -945,16 +969,46 @@ def _display_node_attrs(self, name: str, widget: Container):
if GroupedTplg.is_virtual_widget(widget):
attr['style'] = "dotted"
attr['color'] = 'blue'
+
+ source_labels = self._merged_node_sources.get(name)
+ if source_labels:
+ source_text = ','.join(sorted(source_labels))
+ label_prefix = 'connect' if len(source_labels) > 1 else 'from'
+ sublabel2 += f'
{label_prefix}: {source_text}'
+ attr['label'] = f'<{display_name}{sublabel}{sublabel2}>'
+ if len(source_labels) > 1:
+ # Make merge-collision nodes stand out in merged graphs.
+ attr['color'] = 'crimson'
+ attr['penwidth'] = '2'
return attr
def _display_edge_attr(self, edge: Container):
attr = {}
- if GroupedTplg.is_virtual_widget(self._nodes_dict[edge["source"]])\
- or GroupedTplg.is_virtual_widget(self._nodes_dict[edge["sink"]]):
+ source_widget = self._nodes_dict.get(edge["source"])
+ sink_widget = self._nodes_dict.get(edge["sink"])
+ if source_widget is None or sink_widget is None:
+ return attr
+ if GroupedTplg.is_virtual_widget(source_widget) \
+ or GroupedTplg.is_virtual_widget(sink_widget):
attr['style'] = "dotted"
attr['color'] = 'blue'
return attr
+ def _display_unknown_node_attrs(self, name: str):
+ "Style graph-only nodes so missing widgets are visible in rendered graph."
+ attr = {
+ 'shape': 'box',
+ 'style': 'dashed',
+ 'color': 'orangered3',
+ 'fontcolor': 'orangered3',
+ }
+ if self._forward_edges[name] and self._backward_edges[name]:
+ kind = 'graph-only junction'
+ else:
+ kind = 'graph-only endpoint'
+ attr['label'] = f'<{name}
{kind}>'
+ return attr
+
def draw(self, outfile: str, outdir: str = '.', file_format: str = "png", live_view: bool = False):
r"""Draw graph and write it to file.
@@ -978,6 +1032,8 @@ def draw(self, outfile: str, outdir: str = '.', file_format: str = "png", live_v
from tempfile import gettempdir
graph = Digraph("Topology Graph", format=file_format)
+ for name in self._unknown_nodes:
+ graph.node(name, **self._display_unknown_node_attrs(name))
for node in self._tplg.widget_list:
name = self.node_name_in_graph(node)
if name not in self._isolated: # skip isolated nodes.
@@ -1088,7 +1144,7 @@ def _find_connected_comp(self, node_name: str, name_predicate) -> "list[Containe
acc = set()
self._find_connected_node_recursively(self._forward_edges, node_name, name_predicate, acc)
self._find_connected_node_recursively(self._backward_edges, node_name, name_predicate, acc)
- return [self._nodes_dict[name] for name in acc]
+ return [self._nodes_dict[name] for name in acc if name in self._nodes_dict]
def find_connected_comp(self, ref_node: Container, predicate) -> "list[Container]":
r"""Find specified components connected to `ref_node`.
@@ -1127,7 +1183,7 @@ def find_connected_comp(self, ref_node: Container, predicate) -> "list[Container
"""
return self._find_connected_comp(
self.node_name_in_graph(ref_node),
- lambda name: predicate(self._nodes_dict[name]))
+ lambda name: name in self._nodes_dict and predicate(self._nodes_dict[name]))
def find_comp_for_pcm(self, pcm: Container, prefix: str) -> "list[list[Container]]":
r"""Find specified components for PCM.
@@ -1267,6 +1323,49 @@ def has_wname_prefix(widget):
# Double-negation: "no_wname false" => prefix
return not no_wname_prefix
+def _find_duplicates(values) -> "list[typing.Any]":
+ "Return sorted values that appear more than once."
+ return sorted([value for value, count in Counter(values).items() if count > 1])
+
+def collect_graph_name_sources(
+ tplg: GroupedTplg,
+ source_label: str,
+ acc: typing.Optional["dict[str, set[str]]"] = None,
+) -> "dict[str, set[str]]":
+ "Collect graph node names and map each node name to source topology labels."
+ if acc is None:
+ acc = defaultdict(set)
+
+ for widget in tplg.widget_list:
+ acc[widget["widget"]["name"]].add(source_label)
+ sname = widget["widget"]["sname"]
+ if sname:
+ acc[sname].add(source_label)
+
+ for edge in tplg.graph_list:
+ acc[edge["source"]].add(source_label)
+ acc[edge["sink"]].add(source_label)
+
+ return acc
+
+def get_merge_duplicate_warnings(tplg: GroupedTplg) -> "list[str]":
+ "Return warning messages for likely conflicts in merged topologies."
+ warnings = []
+
+ dup_widget_names = _find_duplicates(widget["widget"]["name"] for widget in tplg.widget_list)
+ if dup_widget_names:
+ warnings.append(f"duplicate widget names found: {dup_widget_names}")
+
+ dup_pcm_ids = _find_duplicates(pcm["pcm_id"] for pcm in tplg.pcm_list)
+ if dup_pcm_ids:
+ warnings.append(f"duplicate PCM IDs found: {dup_pcm_ids}")
+
+ dup_pcm_names = _find_duplicates(pcm["pcm_name"] for pcm in tplg.pcm_list)
+ if dup_pcm_names:
+ warnings.append(f"duplicate PCM names found: {dup_pcm_names}")
+
+ return warnings
+
if __name__ == "__main__":
from pathlib import Path
@@ -1283,6 +1382,8 @@ def parse_cmdline():
'if multiple information types are wanted, use "," to separate them, eg, `-d pcm,graph`')
parser.add_argument('filenames', nargs='+', type=str, help="""topology filenames or single pattern argument depending on
--tplgroot, see below. To pass multiple patterns use "," to separate them.""")
+ parser.add_argument('-m', '--merge', action='store_true',
+ help='merge multiple input topology files into one combined topology before dumping/graphing')
# The below options are used to control generated graph
parser.add_argument('-D', '--directory', type=str, default=".", help="output directory for generated graph")
parser.add_argument('-F', '--format', type=str, default="png", help="output format for generated graph, defaults to 'png'."
@@ -1355,17 +1456,42 @@ def main():
files = [ Path(f) for f in cmd_args.filenames ]
errors = 0
- for f in files:
- tplg = GroupedTplg(tplgFormat.parse_file(f))
- assert set(dump_types) <= set(supported_dump), f"unsupported type in {dump_types}"
+ assert set(dump_types) <= set(supported_dump), f"unsupported type in {dump_types}"
+
+ if cmd_args.merge:
+ merged_raw_tplg = []
+ merged_node_sources = defaultdict(set)
+ for f in files:
+ raw_tplg = tplgFormat.parse_file(f)
+ merged_raw_tplg.extend(raw_tplg)
+ per_file_tplg = GroupedTplg(raw_tplg)
+ collect_graph_name_sources(per_file_tplg, f.name, merged_node_sources)
+
+ tplg = GroupedTplg(merged_raw_tplg)
+ for warning in get_merge_duplicate_warnings(tplg):
+ print(f"WARNING: {warning}", file=sys.stderr)
+
if 'pcm' in dump_types:
tplg.print_pcm_info()
if 'graph' in dump_types:
- graph = TplgGraph(tplg)
+ graph = TplgGraph(tplg, merged_node_sources=merged_node_sources)
graph.show_core = cmd_args.show_core
graph.without_nodeinfo = cmd_args.without_nodeinfo
- graph.draw(f.stem, outdir=cmd_args.directory, file_format=cmd_args.format, live_view=cmd_args.live_view)
+ graph_name = f"merged_{len(files)}_tplgs"
+ graph.draw(graph_name, outdir=cmd_args.directory, file_format=cmd_args.format,
+ live_view=cmd_args.live_view)
errors += graph.errors
+ else:
+ for f in files:
+ tplg = GroupedTplg(tplgFormat.parse_file(f))
+ if 'pcm' in dump_types:
+ tplg.print_pcm_info()
+ if 'graph' in dump_types:
+ graph = TplgGraph(tplg)
+ graph.show_core = cmd_args.show_core
+ graph.without_nodeinfo = cmd_args.without_nodeinfo
+ graph.draw(f.stem, outdir=cmd_args.directory, file_format=cmd_args.format, live_view=cmd_args.live_view)
+ errors += graph.errors
return errors