-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsim.py
More file actions
176 lines (147 loc) · 5.74 KB
/
sim.py
File metadata and controls
176 lines (147 loc) · 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from dataclasses import dataclass, field
from typing import Optional
import math
from .core.memory import MemoryHierarchy, MemorySimulator, MemLevel
from .core.compute import ComputeUnitConfig, ComputeSimulator, ComputeArch
from .core.roofline import RooflineModel, RooflinePoint, build_roofline
from .core.workload import WorkloadSpec, WorkloadKind
@dataclass
class HardwareConfig:
name: str
memory: MemoryHierarchy
compute: ComputeUnitConfig
def roofline_model(self, dtype: str = "fp32") -> RooflineModel:
peak_tflops = self.compute.effective_gflops(dtype) / 1000
# Use the highest-bandwidth memory level as the bandwidth ceiling.
bw_levels = sorted(
self.memory.levels.items(),
key=lambda kv: kv[1].bandwidth_gb_s,
reverse=True,
)
peak_bw = bw_levels[0][1].bandwidth_gb_s
return build_roofline(peak_tflops, peak_bw, label=self.name)
@dataclass
class WorkloadResult:
workload_label: str
workload_kind: WorkloadKind
hardware_name: str
dtype: str
flops: int
bytes_accessed: int
arithmetic_intensity: float
runtime_ms: float
compute_cycles: float
memory_stall_cycles: float
compute_utilization: float
effective_tflops: float
bottleneck: str
cache_hit_rate: float
bw_utilization: float
roofline: RooflinePoint
@dataclass
class SimulationRun:
hardware: HardwareConfig
results: list[WorkloadResult] = field(default_factory=list)
def add(self, r: WorkloadResult):
self.results.append(r)
def total_runtime_ms(self) -> float:
return sum(r.runtime_ms for r in self.results)
def by_bottleneck(self) -> dict[str, list[WorkloadResult]]:
out: dict[str, list] = {"compute": [], "memory": []}
for r in self.results:
out[r.bottleneck].append(r)
return out
def average_utilization(self) -> float:
if not self.results:
return 0.0
return sum(r.compute_utilization for r in self.results) / len(self.results)
class Simulator:
def __init__(self, hardware: HardwareConfig):
self.hw = hardware
self.mem_sim = MemorySimulator(hardware.memory)
self.cmp_sim = ComputeSimulator(hardware.compute)
def run_workload(
self,
workload: WorkloadSpec,
dtype_override: Optional[str] = None,
tiling_efficiency: float = 0.85,
) -> WorkloadResult:
dtype = dtype_override or workload.dtype
pattern = workload.memory_access_pattern()
flops = workload.flop_count()
nbytes = workload.bytes_accessed()
# Simulate memory subsystem.
self.mem_sim.reset()
mem_result = self.mem_sim.simulate_access(nbytes, pattern)
# Tiling / systolic efficiency adjustment.
if self.hw.compute.arch == ComputeArch.SYSTOLIC:
eff = self._systolic_tile_efficiency(workload)
else:
eff = tiling_efficiency
# Simulate compute.
cmp_result = self.cmp_sim.simulate(
flops=flops,
memory_stall_cycles=mem_result.stall_cycles,
dtype=dtype,
tiling_efficiency=eff,
)
# Roofline evaluation.
rf_model = self.hw.roofline_model(dtype)
runtime_s = cmp_result.runtime_ms * 1e-3
rf_point = rf_model.evaluate(flops, nbytes, runtime_s, workload.label)
return WorkloadResult(
workload_label=workload.label,
workload_kind=workload.kind,
hardware_name=self.hw.name,
dtype=dtype,
flops=flops,
bytes_accessed=nbytes,
arithmetic_intensity=workload.arithmetic_intensity(),
runtime_ms=cmp_result.runtime_ms,
compute_cycles=cmp_result.cycles,
memory_stall_cycles=mem_result.stall_cycles,
compute_utilization=cmp_result.utilization,
effective_tflops=cmp_result.effective_tflops,
bottleneck=cmp_result.bottleneck,
cache_hit_rate=mem_result.hit_rate,
bw_utilization=mem_result.bandwidth_utilization,
roofline=rf_point,
)
def run_suite(
self,
workloads: list[WorkloadSpec],
dtype_override: Optional[str] = None,
) -> SimulationRun:
run = SimulationRun(hardware=self.hw)
for wl in workloads:
result = self.run_workload(wl, dtype_override=dtype_override)
run.add(result)
return run
def _systolic_tile_efficiency(self, workload: WorkloadSpec) -> float:
from .core.workload import MatMulWorkload, AttentionWorkload
from .core.compute import ComputeArch
if isinstance(workload, MatMulWorkload):
eff = self.cmp_sim.systolic_array_efficiency(
workload.M, workload.N, workload.K
)
# GPU systolic arrays (CUDA tensor cores) tile efficiently via
# the CUDA runtime at large sizes — don't double-penalize.
if self.hw.compute.arch == ComputeArch.SYSTOLIC:
min_eff = 0.95 if workload.M >= 512 else 0.80
return max(eff, min_eff)
return eff
if isinstance(workload, AttentionWorkload):
p = workload.params
S, D = p["seq_len"], p["head_dim"]
return self.cmp_sim.systolic_array_efficiency(S, S, D)
return 0.80
def compare_hardware(
workloads: list[WorkloadSpec],
hardware_configs: list[HardwareConfig],
dtype_override: Optional[str] = None,
) -> dict[str, SimulationRun]:
results = {}
for hw in hardware_configs:
sim = Simulator(hw)
results[hw.name] = sim.run_suite(workloads, dtype_override=dtype_override)
return results