Skip to content

perf: replace ArrayList consumer wheel with LongMap for O(1) keyed removal#3063

Open
He-Pin wants to merge 3 commits into
mainfrom
optimize/broadcast-hub-wheel-map
Open

perf: replace ArrayList consumer wheel with LongMap for O(1) keyed removal#3063
He-Pin wants to merge 3 commits into
mainfrom
optimize/broadcast-hub-wheel-map

Conversation

@He-Pin

@He-Pin He-Pin commented Jun 13, 2026

Copy link
Copy Markdown
Member

Motivation

BroadcastHub's findAndRemoveConsumer used ArrayList.removeIf which is O(k) per event with lambda allocation on every call. In high-fan-out scenarios (thousands of consumers clustered in the same wheel slot), this creates a producer backpressure bottleneck: the head can only advance after the head slot is empty, and draining a large slot requires k linear scans each of O(k) cost.

Inspired by akkadotnet/akka.net#8264 which replaced ImmutableList<Consumer>[] with Dictionary<long, Consumer>[].

Modification

Replace Array[java.util.ArrayList[Consumer]] with Array[LongMap[Consumer]] keyed by Consumer.id:

  • O(1) keyed removal: LongMap.getOrNull + -= (two primitive hash lookups) replaces ArrayList.removeIf (O(k) linear scan with lambda allocation)
  • Zero heap allocation per cycle: getOrNull returns raw V (not Option[V]), avoiding Some/None allocation on the hot path
  • Zero Long boxing: LongMap stores primitive long keys in contiguous open-addressing arrays, unlike HashMap[Long, _] which boxes to java.lang.Long
  • Lazy slot allocation: empty slots are null (no backing map), reducing baseline memory from ~40 bytes × bufferSize×2 to 0
  • GC-friendly release: when a slot drains to zero consumers, the LongMap reference is nulled for immediate GC
  • Null guards: Advance/NeedWakeup handlers now guard against findAndRemoveConsumer returning null, preventing latent NPE
  • Skipped empty slots: onUpstreamFailure and wakeupIdx skip null (empty) slots during iteration

Result

Operation Before (ArrayList) After (LongMap)
Remove by ID O(k) linear scan + lambda alloc O(1), zero alloc
Add O(1) amortized O(1) amortized
Slot empty check O(1) O(1) (null short-circuit)
Empty slot memory ~40 bytes × bufferSize×2 0 (null)
GC pressure per cycle Long boxing + lambda + Entry 0

No public API changes. Binary compatibility preserved.

Benchmark Results

Lockstep broadcast throughput (elements/sec, higher is better). 100K elements, 2 warmup + 3 measured runs, Apple M-series. Run with BroadcastHubBenchRunner.

Buffer=64 (128 wheel slots, maximum clustering):

Consumers ArrayList (old) LongMap (new) Speedup
64 305,657 296,756 0.97x
256 72,446 76,075 1.05x
1000 13,070 19,737 1.51x
2000 4,348 10,223 2.35x

Buffer=256 (512 wheel slots, moderate clustering):

Consumers ArrayList (old) LongMap (new) Speedup
64 1,099,345 1,148,340 1.04x
256 197,676 271,505 1.37x
1000 27,804 70,727 2.54x
2000 7,943 33,717 4.24x

Key observations:

  • 2.35x–4.24x speedup at 2000 consumers, with the gap widening as consumer count increases — confirming the O(k) linear scan was the dominant bottleneck
  • Crossover at ~256 consumers: below this, ArrayList's simpler access pattern is marginally faster (3% at 64 consumers); above it, LongMap's O(1) removal dominates
  • Graceful degradation: throughput scales inversely with consumer count, with no cliff-like collapse that O(k²) slot contention would cause
  • Low variance: StdDev is <2% for most configurations, indicating stable, deterministic performance
  • Buffer size effect: larger buffers spread consumers across more wheel slots — the 4.24x speedup at buffer=256/2000 consumers vs 2.35x at buffer=64/2000 consumers shows the optimization is most impactful when absolute consumer count is high regardless of slot count

Tests

  • sbt "stream-tests/Test/testOnly *HubSpec" → 50 passed, 0 failed (includes 2 new high-consumer tests)
  • sbt "++3.3.8; stream/compile" → success
  • sbt "stream/mimaReportBinaryIssues" → no issues
  • sbt "bench-jmh/compile" → success
  • sbt "bench-jmh/runMain org.apache.pekko.stream.BroadcastHubBenchRunner" → completed (results above, both old and new code)
  • sbt "bench-jmh/headerCreateAll" → clean
  • scalafmt --mode diff-ref=origin/main → clean
  • git diff --check → no whitespace errors

References

Inspired by akkadotnet/akka.net#8264. Pekko uses scala.collection.mutable.LongMap instead of Dictionary/HashMap for zero boxing on Long keys and contiguous open-addressing memory layout.

…moval

Motivation:
BroadcastHub's findAndRemoveConsumer used ArrayList.removeIf which is
O(k) per event with lambda allocation on every call. In high-fan-out
scenarios (thousands of consumers clustered in the same wheel slot),
this creates a producer backpressure bottleneck: the head can only
advance after the head slot is empty, and draining a large slot
requires k linear scans each of O(k) cost.

Modification:
Replace Array[java.util.ArrayList[Consumer]] with Array[LongMap[Consumer]]
keyed by Consumer.id. Slots are lazily allocated (null = empty) and
released to null when drained, eliminating baseline memory for empty
slots and enabling GC of drained LongMaps.

Hot path uses getOrNull + -= (two primitive hash lookups) instead of
remove (which would allocate Option), achieving zero heap allocation
per add/remove cycle. No Long boxing since LongMap stores primitive
long keys.

Adds null guards in Advance/NeedWakeup event handlers to prevent
latent NPE when findAndRemoveConsumer returns null. Updates
onUpstreamFailure and wakeupIdx to skip null (empty) slots.

Result:
Consumer add/remove is O(1) with zero Long boxing and zero Option
allocation. High-consumer lockstep scenarios see dramatically reduced
producer backpressure from wheel slot contention. Memory for empty
wheel slots drops from ~40 bytes per ArrayList to 0 (null).

Tests:
- sbt "stream-tests/Test/testOnly *HubSpec" → 50 passed, 0 failed
- sbt "++3.3.8; stream/compile" → success
- sbt "stream/mimaReportBinaryIssues" → no issues
- sbt "bench-jmh/compile" → success

References:
Inspired by akkadotnet/akka.net#8264 (Dictionary-based consumer wheel).
Pekko uses scala.collection.mutable.LongMap instead of HashMap for
zero boxing on Long keys and contiguous open-addressing memory layout.
@He-Pin He-Pin marked this pull request as draft June 13, 2026 22:08
@He-Pin He-Pin added the t:stream Pekko Streams label Jun 13, 2026
@He-Pin He-Pin added this to the 2.0.0-M4 milestone Jun 13, 2026
@He-Pin He-Pin marked this pull request as ready for review June 13, 2026 22:11
He-Pin added 2 commits June 14, 2026 06:37
Adds BroadcastHubBenchRunner for direct measurement of consumer wheel
throughput under high-fan-out scenarios, bypassing JMH infrastructure
classpath issues in the bench-jmh module.

Measures lockstep broadcast throughput at 4 consumer counts (64, 256,
1000, 2000) across 2 buffer sizes (64, 256) with 2 warmup + 3 measured
runs per configuration.

Results on Apple M-series (elements/sec, higher is better):

Buffer=64 (128 wheel slots, max clustering):
  64 consumers:    296,756 elem/s
  256 consumers:    76,075 elem/s
  1000 consumers:   19,737 elem/s
  2000 consumers:   10,223 elem/s

Buffer=256 (512 wheel slots, moderate clustering):
  64 consumers:  1,148,340 elem/s
  256 consumers:   271,505 elem/s
  1000 consumers:   70,727 elem/s
  2000 consumers:   33,717 elem/s

Throughput degrades gracefully with consumer count, demonstrating the
O(1) LongMap removal holds up under high per-slot contention.

Tests:
- sbt "bench-jmh/compile" → success
- sbt "bench-jmh/runMain org.apache.pekko.stream.BroadcastHubBenchRunner" → completed

References:
Refs #3063
…sults

Run headerCreateAll for the new benchmark runner file.

Comparison benchmark results (old ArrayList vs new LongMap):

Buffer=64 (128 wheel slots):
  Consumers  ArrayList(elem/s)  LongMap(elem/s)  Speedup
  64         305,657            296,756          0.97x
  256         72,446             76,075          1.05x
  1000        13,070             19,737          1.51x
  2000         4,348             10,223          2.35x

Buffer=256 (512 wheel slots):
  Consumers  ArrayList(elem/s)  LongMap(elem/s)  Speedup
  64       1,099,345          1,148,340          1.04x
  256        197,676            271,505          1.37x
  1000        27,804             70,727          2.54x
  2000         7,943             33,717          4.24x

The LongMap optimization provides 2.35x-4.24x speedup at 2000 consumers,
with the gap widening as consumer count increases — confirming the O(k)
linear scan was the dominant bottleneck.

Tests:
- sbt "bench-jmh/headerCreateAll" → header created
- sbt "bench-jmh/compile" → success

References:
Refs #3063
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

t:stream Pekko Streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant