From 31864f7a3e2caf1a98cb6c26cc18f6b59876c699 Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 29 Jun 2026 10:50:48 +0200 Subject: [PATCH 1/3] fix: increase defaultBroadcastBinSize --- pkg/topology/kademlia/kademlia.go | 36 ++++++++++++++- pkg/topology/kademlia/metrics.go | 77 +++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 2 deletions(-) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 57c4c41a950..cac4a3fa628 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -57,7 +57,7 @@ const ( defaultShortRetry = 10 * time.Second defaultTimeToRetry = 2 * defaultShortRetry defaultPruneWakeup = 5 * time.Minute - defaultBroadcastBinSize = 2 + defaultBroadcastBinSize = 6 ) var ( @@ -1042,6 +1042,15 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e depth := k.neighborhoodDepth() isNeighbor := swarm.Proximity(peer.Bytes(), k.base.Bytes()) >= depth + k.metrics.AnnounceTotal.Inc() + if isNeighbor { + k.metrics.AnnounceIsNeighborTotal.WithLabelValues("true").Inc() + } else { + k.metrics.AnnounceIsNeighborTotal.WithLabelValues("false").Inc() + } + + var outgoingGossip int + outer: for bin := range swarm.MaxBins { @@ -1052,11 +1061,18 @@ outer: if bin >= depth && isNeighbor { connectedPeers = k.binPeers(bin, false) // broadcast all neighborhood peers + k.recordAnnounceBinSelection("full", len(connectedPeers), len(connectedPeers)) } else { - connectedPeers, err = randomSubset(k.binPeers(bin, true), k.opt.BroadcastBinSize) + binPeers := k.binPeers(bin, true) + if len(binPeers) == 0 { + continue + } + connectedPeers, err = randomSubset(binPeers, k.opt.BroadcastBinSize) if err != nil { + k.metrics.AnnounceErrorsTotal.WithLabelValues("random_subset").Inc() return err } + k.recordAnnounceBinSelection("subset", len(binPeers), len(connectedPeers)) } for _, connectedPeer := range connectedPeers { @@ -1079,6 +1095,7 @@ outer: break outer default: } + outgoingGossip++ go func(connectedPeer swarm.Address) { // Create a new deadline ctx to prevent goroutine pile up cCtx, cCancel := context.WithTimeout(k.bgBroadcastCtx, time.Minute) @@ -1091,6 +1108,8 @@ outer: } } + k.metrics.AnnounceOutgoingPeerGossipTotal.Add(float64(outgoingGossip)) + if len(addrs) == 0 { return nil } @@ -1101,8 +1120,11 @@ outer: default: } + k.metrics.AnnouncePeersSentToNewPeer.Observe(float64(len(addrs))) + err := k.discovery.BroadcastPeers(ctx, peer, addrs...) if err != nil { + k.metrics.AnnounceErrorsTotal.WithLabelValues("broadcast_to_new").Inc() k.logger.Error(err, "could not broadcast to peer", "peer_address", peer) _ = k.p2p.Disconnect(peer, "failed broadcasting to peer") } @@ -1110,6 +1132,16 @@ outer: return err } +func (k *Kad) recordAnnounceBinSelection(mode string, available, selected int) { + if available == 0 { + return + } + k.metrics.AnnounceBinSelectionTotal.WithLabelValues(mode).Inc() + k.metrics.AnnounceBinPeersAvailable.Observe(float64(available)) + k.metrics.AnnounceBinPeersSelected.Observe(float64(selected)) + k.metrics.AnnounceBinCoverageRatio.Observe(float64(selected) / float64(available)) +} + // AnnounceTo announces a selected peer to another. func (k *Kad) AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error { if !fullnode { diff --git a/pkg/topology/kademlia/metrics.go b/pkg/topology/kademlia/metrics.go index 7fc9a53751e..376b0dfbcf1 100644 --- a/pkg/topology/kademlia/metrics.go +++ b/pkg/topology/kademlia/metrics.go @@ -31,6 +31,16 @@ type metrics struct { Blocklist prometheus.Counter ReachabilityStatus *prometheus.GaugeVec PeersReachabilityStatus *prometheus.GaugeVec + + AnnounceTotal prometheus.Counter + AnnounceIsNeighborTotal *prometheus.CounterVec + AnnounceBinSelectionTotal *prometheus.CounterVec + AnnounceBinPeersAvailable prometheus.Histogram + AnnounceBinPeersSelected prometheus.Histogram + AnnounceBinCoverageRatio prometheus.Histogram + AnnouncePeersSentToNewPeer prometheus.Histogram + AnnounceOutgoingPeerGossipTotal prometheus.Counter + AnnounceErrorsTotal *prometheus.CounterVec } // newMetrics is a convenient constructor for creating new metrics. @@ -164,6 +174,73 @@ func newMetrics() metrics { }, []string{"peers_reachability_status"}, ), + AnnounceTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_total", + Help: "Number of peer announce operations.", + }), + AnnounceIsNeighborTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_is_neighbor_total", + Help: "Number of announce operations for neighbor peers.", + }, + []string{"is_neighbor"}, + ), + AnnounceBinSelectionTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_bin_selection_total", + Help: "Number of bin peer selections during announce. The mode label is one of: full, subset.", + }, + []string{"mode"}, + ), + AnnounceBinPeersAvailable: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_bin_peers_available", + Help: "Number of connected peers available in a bin before announce selection.", + Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 18, 25, 32}, + }), + AnnounceBinPeersSelected: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_bin_peers_selected", + Help: "Number of peers selected from a bin during announce.", + Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 18, 25, 32}, + }), + AnnounceBinCoverageRatio: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_bin_coverage_ratio", + Help: "Ratio of selected peers to available peers in a bin during announce.", + Buckets: []float64{0.1, 0.2, 0.33, 0.5, 0.66, 0.75, 0.8, 0.9, 1.0}, + }), + AnnouncePeersSentToNewPeer: prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_peers_sent_to_new_peer", + Help: "Number of existing peers sent to a newly connected peer in a single announce.", + Buckets: []float64{1, 2, 5, 10, 15, 20, 30, 40, 50, 75, 100, 150, 200}, + }), + AnnounceOutgoingPeerGossipTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_outgoing_peer_gossip_total", + Help: "Number of outgoing peer gossip messages enqueued to existing peers during announce.", + }), + AnnounceErrorsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_errors_total", + Help: "Number of announce errors. The reason label is one of: random_subset, broadcast_to_new.", + }, + []string{"reason"}, + ), } } From c807c3a1e8c418783307c4eeb812a86cfab6421d Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 29 Jun 2026 15:04:54 +0200 Subject: [PATCH 2/3] fix: clean up --- pkg/topology/kademlia/kademlia.go | 12 +----- pkg/topology/kademlia/metrics.go | 68 ++++++++++--------------------- 2 files changed, 23 insertions(+), 57 deletions(-) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index cac4a3fa628..f90a4b8150e 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -1042,15 +1042,12 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e depth := k.neighborhoodDepth() isNeighbor := swarm.Proximity(peer.Bytes(), k.base.Bytes()) >= depth - k.metrics.AnnounceTotal.Inc() if isNeighbor { k.metrics.AnnounceIsNeighborTotal.WithLabelValues("true").Inc() } else { k.metrics.AnnounceIsNeighborTotal.WithLabelValues("false").Inc() } - var outgoingGossip int - outer: for bin := range swarm.MaxBins { @@ -1095,7 +1092,6 @@ outer: break outer default: } - outgoingGossip++ go func(connectedPeer swarm.Address) { // Create a new deadline ctx to prevent goroutine pile up cCtx, cCancel := context.WithTimeout(k.bgBroadcastCtx, time.Minute) @@ -1108,8 +1104,6 @@ outer: } } - k.metrics.AnnounceOutgoingPeerGossipTotal.Add(float64(outgoingGossip)) - if len(addrs) == 0 { return nil } @@ -1136,10 +1130,8 @@ func (k *Kad) recordAnnounceBinSelection(mode string, available, selected int) { if available == 0 { return } - k.metrics.AnnounceBinSelectionTotal.WithLabelValues(mode).Inc() - k.metrics.AnnounceBinPeersAvailable.Observe(float64(available)) - k.metrics.AnnounceBinPeersSelected.Observe(float64(selected)) - k.metrics.AnnounceBinCoverageRatio.Observe(float64(selected) / float64(available)) + k.metrics.AnnounceBinPeersAvailable.WithLabelValues(mode).Observe(float64(available)) + k.metrics.AnnounceBinPeersSelected.WithLabelValues(mode).Observe(float64(selected)) } // AnnounceTo announces a selected peer to another. diff --git a/pkg/topology/kademlia/metrics.go b/pkg/topology/kademlia/metrics.go index 376b0dfbcf1..60f17de3449 100644 --- a/pkg/topology/kademlia/metrics.go +++ b/pkg/topology/kademlia/metrics.go @@ -32,15 +32,11 @@ type metrics struct { ReachabilityStatus *prometheus.GaugeVec PeersReachabilityStatus *prometheus.GaugeVec - AnnounceTotal prometheus.Counter - AnnounceIsNeighborTotal *prometheus.CounterVec - AnnounceBinSelectionTotal *prometheus.CounterVec - AnnounceBinPeersAvailable prometheus.Histogram - AnnounceBinPeersSelected prometheus.Histogram - AnnounceBinCoverageRatio prometheus.Histogram - AnnouncePeersSentToNewPeer prometheus.Histogram - AnnounceOutgoingPeerGossipTotal prometheus.Counter - AnnounceErrorsTotal *prometheus.CounterVec + AnnounceIsNeighborTotal *prometheus.CounterVec + AnnounceBinPeersAvailable *prometheus.HistogramVec + AnnounceBinPeersSelected *prometheus.HistogramVec + AnnouncePeersSentToNewPeer prometheus.Histogram + AnnounceErrorsTotal *prometheus.CounterVec } // newMetrics is a convenient constructor for creating new metrics. @@ -174,51 +170,35 @@ func newMetrics() metrics { }, []string{"peers_reachability_status"}, ), - AnnounceTotal: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "announce_total", - Help: "Number of peer announce operations.", - }), AnnounceIsNeighborTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, Name: "announce_is_neighbor_total", - Help: "Number of announce operations for neighbor peers.", + Help: "Number of peer announce operations. The is_neighbor label is one of: true, false.", }, []string{"is_neighbor"}, ), - AnnounceBinSelectionTotal: prometheus.NewCounterVec( - prometheus.CounterOpts{ + AnnounceBinPeersAvailable: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Namespace: m.Namespace, Subsystem: subsystem, - Name: "announce_bin_selection_total", - Help: "Number of bin peer selections during announce. The mode label is one of: full, subset.", + Name: "announce_bin_peers_available", + Help: "Number of connected peers available in a bin before announce selection. The mode label is one of: full, subset.", + Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 18, 25, 32}, + }, + []string{"mode"}, + ), + AnnounceBinPeersSelected: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "announce_bin_peers_selected", + Help: "Number of peers selected from a bin during announce. The mode label is one of: full, subset.", + Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 18, 25, 32}, }, []string{"mode"}, ), - AnnounceBinPeersAvailable: prometheus.NewHistogram(prometheus.HistogramOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "announce_bin_peers_available", - Help: "Number of connected peers available in a bin before announce selection.", - Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 18, 25, 32}, - }), - AnnounceBinPeersSelected: prometheus.NewHistogram(prometheus.HistogramOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "announce_bin_peers_selected", - Help: "Number of peers selected from a bin during announce.", - Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 18, 25, 32}, - }), - AnnounceBinCoverageRatio: prometheus.NewHistogram(prometheus.HistogramOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "announce_bin_coverage_ratio", - Help: "Ratio of selected peers to available peers in a bin during announce.", - Buckets: []float64{0.1, 0.2, 0.33, 0.5, 0.66, 0.75, 0.8, 0.9, 1.0}, - }), AnnouncePeersSentToNewPeer: prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: m.Namespace, Subsystem: subsystem, @@ -226,12 +206,6 @@ func newMetrics() metrics { Help: "Number of existing peers sent to a newly connected peer in a single announce.", Buckets: []float64{1, 2, 5, 10, 15, 20, 30, 40, 50, 75, 100, 150, 200}, }), - AnnounceOutgoingPeerGossipTotal: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "announce_outgoing_peer_gossip_total", - Help: "Number of outgoing peer gossip messages enqueued to existing peers during announce.", - }), AnnounceErrorsTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: m.Namespace, From 36d1dce1e2fd949b64feb5ce12abf0731e5819cc Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 29 Jun 2026 15:08:43 +0200 Subject: [PATCH 3/3] fix: clean up 2 --- pkg/topology/kademlia/kademlia.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index f90a4b8150e..a4ca13bc216 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -1061,9 +1061,6 @@ outer: k.recordAnnounceBinSelection("full", len(connectedPeers), len(connectedPeers)) } else { binPeers := k.binPeers(bin, true) - if len(binPeers) == 0 { - continue - } connectedPeers, err = randomSubset(binPeers, k.opt.BroadcastBinSize) if err != nil { k.metrics.AnnounceErrorsTotal.WithLabelValues("random_subset").Inc()