Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
defaultShortRetry = 10 * time.Second
defaultTimeToRetry = 2 * defaultShortRetry
defaultPruneWakeup = 5 * time.Minute
defaultBroadcastBinSize = 2
defaultBroadcastBinSize = 6
)

var (
Expand Down Expand Up @@ -1042,6 +1042,12 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e
depth := k.neighborhoodDepth()
isNeighbor := swarm.Proximity(peer.Bytes(), k.base.Bytes()) >= depth

if isNeighbor {
k.metrics.AnnounceIsNeighborTotal.WithLabelValues("true").Inc()
} else {
k.metrics.AnnounceIsNeighborTotal.WithLabelValues("false").Inc()
}

outer:
for bin := range swarm.MaxBins {

Expand All @@ -1052,11 +1058,15 @@ outer:

if bin >= depth && isNeighbor {
connectedPeers = k.binPeers(bin, false) // broadcast all neighborhood peers
k.recordAnnounceBinSelection("full", len(connectedPeers), len(connectedPeers))
} else {
connectedPeers, err = randomSubset(k.binPeers(bin, true), k.opt.BroadcastBinSize)
binPeers := k.binPeers(bin, true)
connectedPeers, err = randomSubset(binPeers, k.opt.BroadcastBinSize)
if err != nil {
k.metrics.AnnounceErrorsTotal.WithLabelValues("random_subset").Inc()
return err
}
k.recordAnnounceBinSelection("subset", len(binPeers), len(connectedPeers))
}

for _, connectedPeer := range connectedPeers {
Expand Down Expand Up @@ -1101,15 +1111,26 @@ outer:
default:
}

k.metrics.AnnouncePeersSentToNewPeer.Observe(float64(len(addrs)))

err := k.discovery.BroadcastPeers(ctx, peer, addrs...)
if err != nil {
k.metrics.AnnounceErrorsTotal.WithLabelValues("broadcast_to_new").Inc()
k.logger.Error(err, "could not broadcast to peer", "peer_address", peer)
_ = k.p2p.Disconnect(peer, "failed broadcasting to peer")
}

return err
}

func (k *Kad) recordAnnounceBinSelection(mode string, available, selected int) {
if available == 0 {
return
}
k.metrics.AnnounceBinPeersAvailable.WithLabelValues(mode).Observe(float64(available))
k.metrics.AnnounceBinPeersSelected.WithLabelValues(mode).Observe(float64(selected))
}

// AnnounceTo announces a selected peer to another.
func (k *Kad) AnnounceTo(ctx context.Context, addressee, peer swarm.Address, fullnode bool) error {
if !fullnode {
Expand Down
51 changes: 51 additions & 0 deletions pkg/topology/kademlia/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ type metrics struct {
Blocklist prometheus.Counter
ReachabilityStatus *prometheus.GaugeVec
PeersReachabilityStatus *prometheus.GaugeVec

AnnounceIsNeighborTotal *prometheus.CounterVec
AnnounceBinPeersAvailable *prometheus.HistogramVec
AnnounceBinPeersSelected *prometheus.HistogramVec
AnnouncePeersSentToNewPeer prometheus.Histogram
AnnounceErrorsTotal *prometheus.CounterVec
}

// newMetrics is a convenient constructor for creating new metrics.
Expand Down Expand Up @@ -164,6 +170,51 @@ func newMetrics() metrics {
},
[]string{"peers_reachability_status"},
),
AnnounceIsNeighborTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "announce_is_neighbor_total",
Help: "Number of peer announce operations. The is_neighbor label is one of: true, false.",
},
[]string{"is_neighbor"},
),
AnnounceBinPeersAvailable: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "announce_bin_peers_available",
Help: "Number of connected peers available in a bin before announce selection. The mode label is one of: full, subset.",
Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 18, 25, 32},
},
[]string{"mode"},
),
AnnounceBinPeersSelected: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "announce_bin_peers_selected",
Help: "Number of peers selected from a bin during announce. The mode label is one of: full, subset.",
Buckets: []float64{1, 2, 3, 4, 5, 6, 8, 10, 12, 15, 18, 25, 32},
},
[]string{"mode"},
),
AnnouncePeersSentToNewPeer: prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "announce_peers_sent_to_new_peer",
Help: "Number of existing peers sent to a newly connected peer in a single announce.",
Buckets: []float64{1, 2, 5, 10, 15, 20, 30, 40, 50, 75, 100, 150, 200},
}),
AnnounceErrorsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "announce_errors_total",
Help: "Number of announce errors. The reason label is one of: random_subset, broadcast_to_new.",
},
[]string{"reason"},
),
}
}

Expand Down
Loading