Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ $ ./tools/sof-dump-status.py -p
apl
```

Example: merge multiple topology files for a single PCM/graph view
```
$ ./tools/tplgtool2.py --merge -d pcm,graph -D /tmp/ \
/path/to/function.tplg /path/to/feature.tplg
```
This writes a single merged graph (for example, `merged_2_tplgs.png`) and prints
duplicate warnings in merge mode when widget names, PCM IDs, or PCM names collide.

### test case result
| exit code | display | description |
| --------- | ------- | ---------------------- |
Expand Down
3 changes: 3 additions & 0 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ Topologies provide important information for testing.
standalone to prints a list of PCMs and generates a graph with more information than
the graph generated by the old tplgtoo.py. As of May 2024, the topology graph
displayed in SOF test results has been generated by this.
<br> Supports `--merge` to combine multiple `.tplg` files into one logical topology for
`-d pcm` and `-d graph` output. Merge mode also warns about duplicate widget names,
PCM IDs, and PCM names across the merged inputs.

* tplgtool.py
<br> Old toplogy parsing library. Still used by sof-tplgreader.py but new features
Expand Down
88 changes: 88 additions & 0 deletions tools/test_tplgtool2_merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3

import pathlib
import sys
import unittest

sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
import tplgtool2


class _FakeGroupedTplg:
def __init__(self, widget_names, pcm_pairs):
self.widget_list = [{"widget": {"name": name}} for name in widget_names]
self.pcm_list = [{"pcm_id": pcm_id, "pcm_name": pcm_name} for pcm_id, pcm_name in pcm_pairs]
self.graph_list = []


class _FakeGroupedTplgWithGraph:
def __init__(self, widgets, edges):
self.widget_list = [
{"widget": {"name": name, "sname": sname}}
for name, sname in widgets
]
self.graph_list = [
{"source": source, "sink": sink}
for source, sink in edges
]


class TestTplgtool2MergeWarnings(unittest.TestCase):
def test_no_duplicate_warnings(self):
fake = _FakeGroupedTplg(
widget_names=["BUF1.0", "PGA1.0"],
pcm_pairs=[(0, "Speaker"), (1, "Mic")],
)

warnings = tplgtool2.get_merge_duplicate_warnings(fake)

self.assertEqual(warnings, [])

def test_duplicate_warnings_include_widget_pcm_id_and_name(self):
fake = _FakeGroupedTplg(
widget_names=["BUF1.0", "BUF1.0", "PGA1.0"],
pcm_pairs=[(0, "Speaker"), (0, "Headset"), (2, "Speaker")],
)

warnings = tplgtool2.get_merge_duplicate_warnings(fake)

self.assertEqual(len(warnings), 3)
self.assertTrue(any("duplicate widget names found" in w and "BUF1.0" in w for w in warnings))
self.assertTrue(any("duplicate PCM IDs found" in w and "0" in w for w in warnings))
self.assertTrue(any("duplicate PCM names found" in w and "Speaker" in w for w in warnings))


class TestTplgtool2MergeSources(unittest.TestCase):
def test_collect_graph_name_sources_collects_widget_sname_and_edges(self):
fake = _FakeGroupedTplgWithGraph(
widgets=[("BUF1.0", "BUF_A"), ("PGA1.0", "")],
edges=[("BUF_A", "PGA1.0"), ("EXTERNAL_SRC", "BUF_A")],
)

sources = tplgtool2.collect_graph_name_sources(fake, "topo_a")

self.assertEqual(sources["BUF1.0"], {"topo_a"})
self.assertEqual(sources["BUF_A"], {"topo_a"})
self.assertEqual(sources["PGA1.0"], {"topo_a"})
self.assertEqual(sources["EXTERNAL_SRC"], {"topo_a"})

def test_collect_graph_name_sources_accumulates_multiple_files(self):
topo_a = _FakeGroupedTplgWithGraph(
widgets=[("BUF1.0", "BUF_SHARED")],
edges=[("BUF_SHARED", "PGA1.0")],
)
topo_b = _FakeGroupedTplgWithGraph(
widgets=[("BUF2.0", "BUF_SHARED")],
edges=[("BUF_SHARED", "PGA2.0")],
)

sources = tplgtool2.collect_graph_name_sources(topo_a, "topo_a")
sources = tplgtool2.collect_graph_name_sources(topo_b, "topo_b", sources)

self.assertEqual(sources["BUF_SHARED"], {"topo_a", "topo_b"})
self.assertEqual(sources["BUF1.0"], {"topo_a"})
self.assertEqual(sources["BUF2.0"], {"topo_b"})


if __name__ == "__main__":
unittest.main()
150 changes: 138 additions & 12 deletions tools/tplgtool2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
import sys
import typing
from collections import defaultdict
from collections import Counter, defaultdict
from construct import this, Container, ListContainer, Struct, Switch, Array, Bytes, GreedyBytes, GreedyRange, FocusedSeq, Pass, Padded, Padding, Prefixed, Flag, Byte, Int16ul, Int32ul, Int64ul, Terminated
from dataclasses import dataclass
from functools import cached_property, partial
Expand Down Expand Up @@ -787,7 +787,11 @@ def _build_nodes_names_in_graph(graph_list: "list[Container]", nodes_dict: "dict
names = {}
for edge in graph_list:
for ename in [edge["source"], edge["sink"]]:
node = nodes_dict[ename]
node = nodes_dict.get(ename)
if node is None:
# Some feature topologies reference graph endpoints not present in
# DAPM widgets. Keep these endpoints as-is instead of failing.
continue
for name in [node["widget"]["name"], node["widget"]["sname"]]:
if name != ename:
names[name] = ename
Expand All @@ -811,6 +815,17 @@ def _build_edges(graph_list: "list[Container]") -> "tuple[dict[str, list[str]],
backward_edge[edge["sink"]].append(edge["source"])
return forward_edge, backward_edge

@staticmethod
def _build_unknown_nodes(graph_list: "list[Container]", nodes_dict: "dict[str, Container]") -> "set[str]":
"Collect graph nodes that do not map to any widget name/sname."
unknown = set()
for edge in graph_list:
if edge["source"] not in nodes_dict:
unknown.add(edge["source"])
if edge["sink"] not in nodes_dict:
unknown.add(edge["sink"])
return unknown

@staticmethod
def _build_leaves(node_names: "list[str]", forward_edges: "dict[str, list[str]]", backward_edges: "dict[str, list[str]]") -> "tuple[set[str], set[str], set[str]]":
r"""Build leaves.
Expand All @@ -836,15 +851,24 @@ def _build_leaves(node_names: "list[str]", forward_edges: "dict[str, list[str]]"
tails.add(node)
return isolated, heads, tails

def __init__(self, grouped_tplg: GroupedTplg):
def __init__(
self,
grouped_tplg: GroupedTplg,
merged_node_sources: typing.Optional["dict[str, set[str]]"] = None,
):
"Build graph from grouped topology data."
self.errors = 0 # non fatal errors
self.show_core = 'auto'
self.without_nodeinfo = False
self._tplg = grouped_tplg
self._merged_node_sources = merged_node_sources or {}
self._nodes_dict = TplgGraph._build_nodes_dict(grouped_tplg.widget_list)
self._nodes_names_in_graph = TplgGraph._build_nodes_names_in_graph(grouped_tplg.graph_list, self._nodes_dict)
self._forward_edges, self._backward_edges = TplgGraph._build_edges(grouped_tplg.graph_list)
self._unknown_nodes = TplgGraph._build_unknown_nodes(grouped_tplg.graph_list, self._nodes_dict)
if self._unknown_nodes:
unknown_nodes = ", ".join(sorted(self._unknown_nodes))
print(f"WARNING: graph-only node(s) without matching widget: {unknown_nodes}", file=sys.stderr)
self._isolated, self._heads, self._tails = TplgGraph._build_leaves(map(self.node_name_in_graph, grouped_tplg.widget_list), self._forward_edges, self._backward_edges)

def _node_name_in_graph_from_name(self, name: str) -> str:
Expand Down Expand Up @@ -945,16 +969,46 @@ def _display_node_attrs(self, name: str, widget: Container):
if GroupedTplg.is_virtual_widget(widget):
attr['style'] = "dotted"
attr['color'] = 'blue'

source_labels = self._merged_node_sources.get(name)
if source_labels:
source_text = ','.join(sorted(source_labels))
label_prefix = 'connect' if len(source_labels) > 1 else 'from'
sublabel2 += f'<BR ALIGN="CENTER"/><SUB>{label_prefix}: {source_text}</SUB>'
attr['label'] = f'<{display_name}{sublabel}{sublabel2}>'
if len(source_labels) > 1:
# Make merge-collision nodes stand out in merged graphs.
attr['color'] = 'crimson'
attr['penwidth'] = '2'
return attr

def _display_edge_attr(self, edge: Container):
attr = {}
if GroupedTplg.is_virtual_widget(self._nodes_dict[edge["source"]])\
or GroupedTplg.is_virtual_widget(self._nodes_dict[edge["sink"]]):
source_widget = self._nodes_dict.get(edge["source"])
sink_widget = self._nodes_dict.get(edge["sink"])
if source_widget is None or sink_widget is None:
return attr
if GroupedTplg.is_virtual_widget(source_widget) \
or GroupedTplg.is_virtual_widget(sink_widget):
attr['style'] = "dotted"
attr['color'] = 'blue'
return attr

def _display_unknown_node_attrs(self, name: str):
"Style graph-only nodes so missing widgets are visible in rendered graph."
attr = {
'shape': 'box',
'style': 'dashed',
'color': 'orangered3',
'fontcolor': 'orangered3',
}
if self._forward_edges[name] and self._backward_edges[name]:
kind = 'graph-only junction'
else:
kind = 'graph-only endpoint'
attr['label'] = f'<{name}<BR ALIGN="CENTER"/><SUB>{kind}</SUB>>'
return attr

def draw(self, outfile: str, outdir: str = '.', file_format: str = "png", live_view: bool = False):
r"""Draw graph and write it to file.

Expand All @@ -978,6 +1032,8 @@ def draw(self, outfile: str, outdir: str = '.', file_format: str = "png", live_v
from tempfile import gettempdir

graph = Digraph("Topology Graph", format=file_format)
for name in self._unknown_nodes:
graph.node(name, **self._display_unknown_node_attrs(name))
for node in self._tplg.widget_list:
name = self.node_name_in_graph(node)
if name not in self._isolated: # skip isolated nodes.
Expand Down Expand Up @@ -1088,7 +1144,7 @@ def _find_connected_comp(self, node_name: str, name_predicate) -> "list[Containe
acc = set()
self._find_connected_node_recursively(self._forward_edges, node_name, name_predicate, acc)
self._find_connected_node_recursively(self._backward_edges, node_name, name_predicate, acc)
return [self._nodes_dict[name] for name in acc]
return [self._nodes_dict[name] for name in acc if name in self._nodes_dict]

def find_connected_comp(self, ref_node: Container, predicate) -> "list[Container]":
r"""Find specified components connected to `ref_node`.
Expand Down Expand Up @@ -1127,7 +1183,7 @@ def find_connected_comp(self, ref_node: Container, predicate) -> "list[Container
"""
return self._find_connected_comp(
self.node_name_in_graph(ref_node),
lambda name: predicate(self._nodes_dict[name]))
lambda name: name in self._nodes_dict and predicate(self._nodes_dict[name]))

def find_comp_for_pcm(self, pcm: Container, prefix: str) -> "list[list[Container]]":
r"""Find specified components for PCM.
Expand Down Expand Up @@ -1267,6 +1323,49 @@ def has_wname_prefix(widget):
# Double-negation: "no_wname false" => prefix
return not no_wname_prefix

def _find_duplicates(values) -> "list[typing.Any]":
"Return sorted values that appear more than once."
return sorted([value for value, count in Counter(values).items() if count > 1])

def collect_graph_name_sources(
tplg: GroupedTplg,
source_label: str,
acc: typing.Optional["dict[str, set[str]]"] = None,
) -> "dict[str, set[str]]":
"Collect graph node names and map each node name to source topology labels."
if acc is None:
acc = defaultdict(set)

for widget in tplg.widget_list:
acc[widget["widget"]["name"]].add(source_label)
sname = widget["widget"]["sname"]
if sname:
acc[sname].add(source_label)

for edge in tplg.graph_list:
acc[edge["source"]].add(source_label)
acc[edge["sink"]].add(source_label)

return acc

def get_merge_duplicate_warnings(tplg: GroupedTplg) -> "list[str]":
"Return warning messages for likely conflicts in merged topologies."
warnings = []

dup_widget_names = _find_duplicates(widget["widget"]["name"] for widget in tplg.widget_list)
if dup_widget_names:
warnings.append(f"duplicate widget names found: {dup_widget_names}")

dup_pcm_ids = _find_duplicates(pcm["pcm_id"] for pcm in tplg.pcm_list)
if dup_pcm_ids:
warnings.append(f"duplicate PCM IDs found: {dup_pcm_ids}")

dup_pcm_names = _find_duplicates(pcm["pcm_name"] for pcm in tplg.pcm_list)
if dup_pcm_names:
warnings.append(f"duplicate PCM names found: {dup_pcm_names}")

return warnings


if __name__ == "__main__":
from pathlib import Path
Expand All @@ -1283,6 +1382,8 @@ def parse_cmdline():
'if multiple information types are wanted, use "," to separate them, eg, `-d pcm,graph`')
parser.add_argument('filenames', nargs='+', type=str, help="""topology filenames or single pattern argument depending on
--tplgroot, see below. To pass multiple patterns use "," to separate them.""")
parser.add_argument('-m', '--merge', action='store_true',
help='merge multiple input topology files into one combined topology before dumping/graphing')
# The below options are used to control generated graph
parser.add_argument('-D', '--directory', type=str, default=".", help="output directory for generated graph")
parser.add_argument('-F', '--format', type=str, default="png", help="output format for generated graph, defaults to 'png'."
Expand Down Expand Up @@ -1355,17 +1456,42 @@ def main():
files = [ Path(f) for f in cmd_args.filenames ]

errors = 0
for f in files:
tplg = GroupedTplg(tplgFormat.parse_file(f))
assert set(dump_types) <= set(supported_dump), f"unsupported type in {dump_types}"
assert set(dump_types) <= set(supported_dump), f"unsupported type in {dump_types}"

if cmd_args.merge:
merged_raw_tplg = []
merged_node_sources = defaultdict(set)
for f in files:
raw_tplg = tplgFormat.parse_file(f)
merged_raw_tplg.extend(raw_tplg)
per_file_tplg = GroupedTplg(raw_tplg)
collect_graph_name_sources(per_file_tplg, f.name, merged_node_sources)

tplg = GroupedTplg(merged_raw_tplg)
for warning in get_merge_duplicate_warnings(tplg):
print(f"WARNING: {warning}", file=sys.stderr)

if 'pcm' in dump_types:
tplg.print_pcm_info()
if 'graph' in dump_types:
graph = TplgGraph(tplg)
graph = TplgGraph(tplg, merged_node_sources=merged_node_sources)
graph.show_core = cmd_args.show_core
graph.without_nodeinfo = cmd_args.without_nodeinfo
graph.draw(f.stem, outdir=cmd_args.directory, file_format=cmd_args.format, live_view=cmd_args.live_view)
graph_name = f"merged_{len(files)}_tplgs"
graph.draw(graph_name, outdir=cmd_args.directory, file_format=cmd_args.format,
live_view=cmd_args.live_view)
errors += graph.errors
else:
for f in files:
tplg = GroupedTplg(tplgFormat.parse_file(f))
if 'pcm' in dump_types:
tplg.print_pcm_info()
if 'graph' in dump_types:
graph = TplgGraph(tplg)
graph.show_core = cmd_args.show_core
graph.without_nodeinfo = cmd_args.without_nodeinfo
graph.draw(f.stem, outdir=cmd_args.directory, file_format=cmd_args.format, live_view=cmd_args.live_view)
errors += graph.errors

return errors

Expand Down
Loading