Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl BlockChain {
is_aggregator: bool,
) -> BlockChain {
metrics::set_is_aggregator(is_aggregator);
metrics::set_node_sync_status(metrics::SyncStatus::Idle);
let genesis_time = store.config().genesis_time;
let key_manager = key_manager::KeyManager::new(validator_keys);
let handle = BlockChainServer {
Expand Down Expand Up @@ -217,11 +218,14 @@ impl BlockChainServer {
fn propose_block(&mut self, slot: u64, validator_id: u64) {
info!(%slot, %validator_id, "We are the proposer for this slot");

let _timing = metrics::time_block_building();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Timing scope wider than metric name implies

_timing is dropped at the end of propose_block, so lean_block_building_time_seconds captures not only block construction and signing but also the full state transition inside process_block() (which includes SSZ processing, fork-choice updates, and storage writes) and the gossip publish_block call. The nested lean_block_building_payload_aggregation_time_seconds already isolates the build_block() phase, so the outer metric's scope may surprise operators trying to attribute latency — the description in the metric string says "Time taken to build a block" but it measures the entire proposal pipeline.

Consider either tightening the timing scope to end before process_block / publish_block, or updating the metric description to "Time taken to propose a block (build + sign + process + publish)" to make the breadth explicit.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/blockchain/src/lib.rs
Line: 221

Comment:
**Timing scope wider than metric name implies**

`_timing` is dropped at the end of `propose_block`, so `lean_block_building_time_seconds` captures not only block construction and signing but also the full state transition inside `process_block()` (which includes SSZ processing, fork-choice updates, and storage writes) and the gossip `publish_block` call. The nested `lean_block_building_payload_aggregation_time_seconds` already isolates the `build_block()` phase, so the outer metric's scope may surprise operators trying to attribute latency — the description in the metric string says "Time taken to build a block" but it measures the entire proposal pipeline.

Consider either tightening the timing scope to end before `process_block` / `publish_block`, or updating the metric description to `"Time taken to propose a block (build + sign + process + publish)"` to make the breadth explicit.

How can I resolve this? If you propose a fix, please make it concise.


// Build the block with attestation signatures
let Ok((block, attestation_signatures, _post_checkpoints)) =
store::produce_block_with_signatures(&mut self.store, slot, validator_id)
.inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block"))
else {
metrics::inc_block_building_failures();
return;
};

Expand All @@ -232,6 +236,7 @@ impl BlockChainServer {
.sign_block_root(validator_id, slot as u32, &block_root)
.inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to sign block root"))
else {
metrics::inc_block_building_failures();
return;
};

Expand All @@ -249,9 +254,12 @@ impl BlockChainServer {
// Process the block locally before publishing
if let Err(err) = self.process_block(signed_block.clone()) {
error!(%slot, %validator_id, %err, "Failed to process built block");
metrics::inc_block_building_failures();
return;
};

metrics::inc_block_building_success();

// Publish to gossip network
if let Some(ref p2p) = self.p2p {
let _ = p2p
Expand All @@ -264,10 +272,21 @@ impl BlockChainServer {

fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> {
store::on_block(&mut self.store, signed_block)?;
metrics::update_head_slot(self.store.head_slot());
let head_slot = self.store.head_slot();
metrics::update_head_slot(head_slot);
metrics::update_latest_justified_slot(self.store.latest_justified().slot);
metrics::update_latest_finalized_slot(self.store.latest_finalized().slot);
metrics::update_validators_count(self.key_manager.validator_ids().len() as u64);

// Update sync status based on head slot vs wall clock slot
let current_slot = self.store.time() / INTERVALS_PER_SLOT;
let status = if head_slot >= current_slot {
metrics::SyncStatus::Synced
} else {
metrics::SyncStatus::Syncing
};
metrics::set_node_sync_status(status);

for table in ALL_TABLES {
metrics::update_table_bytes(table.name(), self.store.estimate_table_bytes(table));
}
Expand Down
116 changes: 115 additions & 1 deletion crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock<H
register_histogram!(
"lean_committee_signatures_aggregation_time_seconds",
"Time taken to aggregate committee signatures",
vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0]
vec![0.05, 0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0]
)
.unwrap()
});
Expand All @@ -276,6 +276,77 @@ static LEAN_FORK_CHOICE_REORG_DEPTH: std::sync::LazyLock<Histogram> =
.unwrap()
});

// --- Block Production ---

static LEAN_BLOCK_AGGREGATED_PAYLOADS: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
"lean_block_aggregated_payloads",
"Number of aggregated_payloads in a block",
vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0]
)
.unwrap()
});

static LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
"lean_block_building_payload_aggregation_time_seconds",
"Time taken to build aggregated_payloads during block building",
vec![0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0]
)
.unwrap()
});

static LEAN_BLOCK_BUILDING_TIME_SECONDS: std::sync::LazyLock<Histogram> =
std::sync::LazyLock::new(|| {
register_histogram!(
"lean_block_building_time_seconds",
"Time taken to build a block",
vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0]
)
.unwrap()
});

static LEAN_BLOCK_BUILDING_SUCCESS_TOTAL: std::sync::LazyLock<IntCounter> =
std::sync::LazyLock::new(|| {
register_int_counter!(
"lean_block_building_success_total",
"Successful block builds"
)
.unwrap()
});

static LEAN_BLOCK_BUILDING_FAILURES_TOTAL: std::sync::LazyLock<IntCounter> =
std::sync::LazyLock::new(|| {
register_int_counter!("lean_block_building_failures_total", "Failed block builds").unwrap()
});

// --- Sync Status ---

/// Node synchronization status.
pub enum SyncStatus {
Idle,
Syncing,
Synced,
}

impl SyncStatus {
fn as_str(&self) -> &'static str {
match self {
SyncStatus::Idle => "idle",
SyncStatus::Syncing => "syncing",
SyncStatus::Synced => "synced",
}
}

const ALL: &[&str] = &["idle", "syncing", "synced"];
}

static LEAN_NODE_SYNC_STATUS: std::sync::LazyLock<IntGaugeVec> = std::sync::LazyLock::new(|| {
register_int_gauge_vec!("lean_node_sync_status", "Node sync status", &["status"]).unwrap()
});

// --- Initialization ---

/// Register all metrics with the Prometheus registry so they appear in `/metrics` from startup.
Expand Down Expand Up @@ -315,6 +386,14 @@ pub fn init() {
std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH);
// Block production
std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS);
std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_TIME_SECONDS);
std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_SUCCESS_TOTAL);
std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_FAILURES_TOTAL);
// Sync status
std::sync::LazyLock::force(&LEAN_NODE_SYNC_STATUS);
}

// --- Public API ---
Expand Down Expand Up @@ -476,3 +555,38 @@ pub fn set_attestation_committee_count(count: u64) {
pub fn observe_fork_choice_reorg_depth(depth: u64) {
LEAN_FORK_CHOICE_REORG_DEPTH.observe(depth as f64);
}

/// Observe the number of aggregated payloads in a built block.
pub fn observe_block_aggregated_payloads(count: usize) {
LEAN_BLOCK_AGGREGATED_PAYLOADS.observe(count as f64);
}

/// Start timing payload aggregation during block building. Records duration when the guard is dropped.
pub fn time_block_building_payload_aggregation() -> TimingGuard {
TimingGuard::new(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS)
}

/// Start timing block building. Records duration when the guard is dropped.
pub fn time_block_building() -> TimingGuard {
TimingGuard::new(&LEAN_BLOCK_BUILDING_TIME_SECONDS)
}

/// Increment the successful block builds counter.
pub fn inc_block_building_success() {
LEAN_BLOCK_BUILDING_SUCCESS_TOTAL.inc();
}

/// Increment the failed block builds counter.
pub fn inc_block_building_failures() {
LEAN_BLOCK_BUILDING_FAILURES_TOTAL.inc();
}

/// Set the node sync status. Sets the given status label to 1 and all others to 0.
pub fn set_node_sync_status(status: SyncStatus) {
let active = status.as_str();
for label in SyncStatus::ALL {
LEAN_NODE_SYNC_STATUS
.with_label_values(&[label])
.set(i64::from(*label == active));
}
}
21 changes: 13 additions & 8 deletions crates/blockchain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,14 +1015,19 @@ pub fn produce_block_with_signatures(

let known_block_roots = store.get_block_roots();

let (block, signatures, post_checkpoints) = build_block(
&head_state,
slot,
validator_index,
head_root,
&known_block_roots,
&aggregated_payloads,
)?;
let (block, signatures, post_checkpoints) = {
let _timing = metrics::time_block_building_payload_aggregation();
build_block(
&head_state,
slot,
validator_index,
head_root,
&known_block_roots,
&aggregated_payloads,
)?
};

metrics::observe_block_aggregated_payloads(signatures.len());

Ok((block, signatures, post_checkpoints))
}
Expand Down
5 changes: 4 additions & 1 deletion crates/net/p2p/src/gossipsub/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::{
attestation_subnet_topic,
},
};
use crate::P2PServer;
use crate::{P2PServer, metrics};

pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
let Event::Message {
Expand All @@ -34,6 +34,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
else {
return;
};
metrics::observe_gossip_block_size(uncompressed_data.len());

let Ok(signed_block) = SignedBlock::from_ssz_bytes(&uncompressed_data)
.inspect_err(|err| error!(?err, "Failed to decode gossipped block"))
Expand Down Expand Up @@ -65,6 +66,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
else {
return;
};
metrics::observe_gossip_aggregation_size(uncompressed_data.len());

let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data)
.inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation"))
Expand Down Expand Up @@ -94,6 +96,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) {
else {
return;
};
metrics::observe_gossip_attestation_size(uncompressed_data.len());

let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data)
.inspect_err(|err| error!(?err, "Failed to decode gossipped attestation"))
Expand Down
62 changes: 62 additions & 0 deletions crates/net/p2p/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,68 @@ static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock<IntCounterVec> = LazyLock:
.unwrap()
});

// --- Gossip Message Size Histograms ---

static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!(
"lean_gossip_block_size_bytes",
"Bytes size of a gossip block message",
vec![
10_000.0,
50_000.0,
100_000.0,
250_000.0,
500_000.0,
1_000_000.0,
2_000_000.0,
5_000_000.0
]
)
.unwrap()
});

static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!(
"lean_gossip_attestation_size_bytes",
"Bytes size of a gossip attestation message",
vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0]
)
.unwrap()
});

static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram!(
"lean_gossip_aggregation_size_bytes",
"Bytes size of a gossip aggregated attestation message",
vec![
1024.0,
4096.0,
16384.0,
65536.0,
131_072.0,
262_144.0,
524_288.0,
1_048_576.0
]
)
.unwrap()
});

/// Observe the size of a gossip block message.
pub fn observe_gossip_block_size(bytes: usize) {
LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64);
}

/// Observe the size of a gossip attestation message.
pub fn observe_gossip_attestation_size(bytes: usize) {
LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64);
}

/// Observe the size of a gossip aggregated attestation message.
pub fn observe_gossip_aggregation_size(bytes: usize) {
LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64);
}

/// Set the attestation committee subnet gauge.
pub fn set_attestation_committee_subnet(subnet_id: u64) {
static LEAN_ATTESTATION_COMMITTEE_SUBNET: LazyLock<IntGauge> = LazyLock::new(|| {
Expand Down
Loading