From 585550238629391e481dfabe61140b340eca186e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:07:15 -0300 Subject: [PATCH 1/6] feat(blockchain): pre-build proposer block one interval early Proposers do heavy leanVM work (compact + merge_type_1s_into_type_2, ~2s) synchronously at interval 0 today. This starts that work at the previous slot's interval 4 on spawn_blocking workers (Phase A build, actor-side XMSS sign, Phase B merge), stores the result, and publishes it at interval 0 with a head/justified revalidation; on any mismatch or miss it falls back to the existing synchronous build, so there is no regression. Also fixes a finalization-breaking bug surfaced on devnet: the prebuild captured its head via get_proposal_head, which is not read-only (it ticks the store clock). Called one interval early it advanced the clock prematurely and stalled finalization. Capture the head read-only via store.head() instead. --- crates/blockchain/src/lib.rs | 239 ++++++++++++++++++++++++++ crates/blockchain/src/metrics.rs | 58 +++++++ crates/blockchain/src/prebuild.rs | 274 ++++++++++++++++++++++++++++++ crates/blockchain/src/store.rs | 2 +- 4 files changed, 572 insertions(+), 1 deletion(-) create mode 100644 crates/blockchain/src/prebuild.rs diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index ab1b4b4f..4684c963 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -18,6 +18,10 @@ use crate::aggregation::{ AggregationSession, PRIOR_WORKER_JOIN_TIMEOUT, run_aggregation_worker, }; use crate::key_manager::ValidatorKeyPair; +use crate::prebuild::{ + PrebuildBuilt, PrebuildMergeInputs, PrebuildReady, PrebuildSession, PrebuildSnapshot, + PreparedBlock, prebuilt_block_is_usable, run_prebuild_build_worker, run_prebuild_merge_worker, +}; use spawned_concurrency::actor; use spawned_concurrency::error::ActorError; use spawned_concurrency::protocol; @@ -33,6 +37,7 @@ pub(crate) mod coverage; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; +pub(crate) mod prebuild; pub mod reaggregate; pub mod store; @@ -151,6 +156,8 @@ impl BlockChain { aggregator, pending_block_parents: HashMap::new(), current_aggregation: None, + current_prebuild: None, + prepared_block: None, last_tick_instant: None, attestation_committee_count, pre_merge_coverage: None, @@ -206,6 +213,12 @@ pub struct BlockChainServer { /// the next interval 2 takes over. current_aggregation: Option, + /// In-flight proposer pre-build started at the previous slot's interval 4. + current_prebuild: Option, + /// Completed pre-built block awaiting publication at its proposal slot's + /// interval 0. Cleared on use, on staleness, or when superseded. + prepared_block: Option, + /// Last tick instant for measuring interval duration. last_tick_instant: Option, @@ -328,6 +341,19 @@ impl BlockChainServer { } } + // Interval 4: if one of our validators proposes the NEXT slot, kick off + // an off-thread pre-build so the heavy leanVM work runs in the ~1600ms + // window up to that slot's interval 0 instead of inside interval 0. + if interval == 4 { + let next_slot = slot + 1; + let next_proposer = self + .sync_status + .gate_proposer(self.get_our_proposer(next_slot)); + if let Some(validator_id) = next_proposer { + self.start_prebuild_session(next_slot, validator_id, ctx); + } + } + // Now build and publish the block (after attestations have been accepted) if let Some(validator_id) = proposer_validator_id { self.propose_block(slot, validator_id); @@ -421,6 +447,53 @@ impl BlockChainServer { }); } + /// Snapshot block-build inputs and spawn the Phase A pre-build worker. + fn start_prebuild_session(&mut self, slot: u64, validator_id: u64, ctx: &Context) { + // Supersede any earlier in-flight pre-build (e.g. a missed slot). + if let Some(prior) = self.current_prebuild.take() { + prior.cancel.cancel(); + } + // A stale prepared block from an earlier slot must not survive into a + // new pre-build round. + self.prepared_block = None; + + // Build against the current canonical head, read-only. We must NOT use + // `get_proposal_head` here: it ticks the store to `slot` time and accepts + // attestations, which one interval early would advance the clock + // prematurely (skewing finalization) and diverge the captured head from + // the interval-0 state (making every prebuilt block stale). The + // interval-4 promote has already run in `store::on_tick` this tick, so + // `store.head()` reflects the latest accepted attestations. + let parent_root = self.store.head(); + let Some(head_state) = self.store.get_state(&parent_root) else { + warn!(%slot, "Pre-build skipped: missing head state"); + return; + }; + + let snapshot = PrebuildSnapshot { + slot, + validator_id, + parent_root, + head_state, + known_block_roots: self.store.get_block_roots(), + aggregated_payloads: self.store.known_aggregated_payloads(), + }; + + let cancel = CancellationToken::new(); + let worker_actor = ctx.actor_ref().clone(); + let worker_cancel = cancel.clone(); + let worker = tokio::task::spawn_blocking(move || { + run_prebuild_build_worker(snapshot, worker_actor, worker_cancel); + }); + + self.current_prebuild = Some(PrebuildSession { + session_id: slot, + cancel, + worker, + }); + info!(%slot, %validator_id, "Started proposer pre-build (phase A)"); + } + /// Returns the validator ID if any of our validators is the proposer for this slot. fn get_our_proposer(&self, slot: u64) -> Option { let head_state = self.store.head_state(); @@ -483,6 +556,42 @@ impl BlockChainServer { fn propose_block(&mut self, slot: u64, validator_id: u64) { info!(%slot, %validator_id, "We are the proposer for this slot"); + // Fast path: publish a block pre-built at the previous slot's interval 4, + // if it is still valid against the live head and justified checkpoint. + if let Some(prepared) = self.prepared_block.take() { + let live_head = store::get_proposal_head(&mut self.store, slot); + let store_justified_slot = self.store.latest_justified().slot; + if prebuilt_block_is_usable( + &prepared, + slot, + validator_id, + live_head, + store_justified_slot, + ) { + metrics::inc_prebuild_hit(); + let signed_block = prepared.signed_block; + if let Err(err) = self.process_block(signed_block.clone()) { + error!(%slot, %validator_id, %err, "Failed to process pre-built block; rebuilding"); + metrics::inc_block_building_failures(); + // Fall through to the synchronous build below. + } else { + metrics::inc_block_building_success(); + if let Some(ref p2p) = self.p2p { + let _ = p2p.publish_block(signed_block).inspect_err( + |err| error!(%slot, %validator_id, %err, "Failed to publish pre-built block"), + ); + } + info!(%slot, %validator_id, "Published pre-built block"); + return; + } + } else { + metrics::inc_prebuild_stale(); + info!(%slot, %validator_id, "Discarding stale pre-built block; rebuilding"); + } + } else { + metrics::inc_prebuild_miss(); + } + let _timing = metrics::time_block_building(); // Build the block with attestation signatures @@ -944,6 +1053,9 @@ impl BlockChainServer { /// `aggregate_job` call to finish (the proof itself cannot be interrupted). #[stopped] async fn on_stopped(&mut self, _ctx: &Context) { + if let Some(prebuild) = self.current_prebuild.take() { + prebuild.cancel.cancel(); + } let Some(session) = self.current_aggregation.take() else { return; }; @@ -1058,6 +1170,133 @@ impl Handler for BlockChainServer { } } +// ------------------------------------------------------------------------- +// Pre-build message handlers (worker → actor) +// ------------------------------------------------------------------------- + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: PrebuildBuilt, ctx: &Context) { + // Fence stale Phase A results. + let current = self.current_prebuild.as_ref().map(|s| s.session_id); + if current != Some(msg.session_id) { + trace!( + incoming = msg.session_id, + ?current, + "Dropping stale PrebuildBuilt" + ); + return; + } + + let slot = msg.session_id; + let validator_id = msg.validator_id; + + // Non-regression guard (leanSpec #595); re-checked at publish time. + let built_justified_slot = msg.post_checkpoints.justified.slot; + let block_root = msg.block.hash_tree_root(); + + // Sign on the actor — XMSS keys are stateful and single-owner. + let Ok(proposer_signature) = self + .key_manager + .sign_block_root(validator_id, slot as u32, &block_root) + .inspect_err(|err| error!(%slot, %validator_id, %err, "Pre-build sign failed")) + else { + self.current_prebuild = None; + return; + }; + + // Resolve pubkeys for the proposer and each attestation Type-1. + let head_state = self.store.head_state(); + let validators = &head_state.validators; + let Some(proposer_validator) = validators.get(validator_id as usize) else { + error!(%slot, %validator_id, "Pre-build proposer index out of range"); + self.current_prebuild = None; + return; + }; + let Ok(proposer_pubkey) = proposer_validator.get_proposal_pubkey().inspect_err( + |err| error!(%slot, %validator_id, %err, "Pre-build proposer pubkey decode failed"), + ) else { + self.current_prebuild = None; + return; + }; + let Ok(proposer_validator_signature) = ValidatorSignature::from_bytes(&proposer_signature) + .inspect_err( + |err| error!(%slot, %validator_id, %err, "Pre-build proposer signature decode failed"), + ) + else { + self.current_prebuild = None; + return; + }; + + let mut attestation_merge_inputs: Vec<(Vec, ByteList512KiB)> = + Vec::with_capacity(msg.type_one_proofs.len()); + let mut failed = false; + for t1 in &msg.type_one_proofs { + let mut pubkeys = Vec::new(); + for vid in t1.participant_indices() { + match validators + .get(vid as usize) + .and_then(|v| v.get_attestation_pubkey().ok()) + { + Some(pk) => pubkeys.push(pk), + None => { + error!(%slot, vid, "Pre-build participant pubkey resolution failed"); + failed = true; + break; + } + } + } + if failed { + break; + } + attestation_merge_inputs.push((pubkeys, t1.proof.clone())); + } + if failed { + self.current_prebuild = None; + return; + } + + let inputs = PrebuildMergeInputs { + slot, + validator_id, + parent_root: msg.parent_root, + built_justified_slot, + block: msg.block, + block_root, + proposer_pubkey, + proposer_signature: proposer_validator_signature, + attestation_merge_inputs, + }; + + // Phase B: heavy leanVM merge off-thread. Reuse the session's cancel + // token so a superseding round or shutdown stops it; results are fenced + // by slot in the PrebuildReady handler. + let Some(cancel) = self.current_prebuild.as_ref().map(|s| s.cancel.clone()) else { + return; + }; + let worker_actor = ctx.actor_ref().clone(); + tokio::task::spawn_blocking(move || { + run_prebuild_merge_worker(inputs, worker_actor, cancel); + }); + info!(%slot, %validator_id, "Pre-build signed; merging (phase B)"); + } +} + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: PrebuildReady, _ctx: &Context) { + let current = self.current_prebuild.as_ref().map(|s| s.session_id); + if current != Some(msg.session_id) { + trace!( + incoming = msg.session_id, + ?current, + "Dropping stale PrebuildReady" + ); + return; + } + info!(slot = msg.session_id, "Proposer pre-build ready"); + self.prepared_block = Some(msg.prepared); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 28e510f5..8ebe5221 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -873,6 +873,64 @@ pub fn observe_block_proposal_phase(phase: &str, elapsed: Duration) { .observe(elapsed.as_secs_f64()); } +/// Observe a proposer pre-build phase ("build" or "merge") duration. These run +/// off-thread at the previous slot's interval 4, so the histogram reaches well +/// past the 800ms interval budget. +pub fn observe_prebuild_phase(phase: &str, elapsed: Duration) { + static LEAN_BLOCK_PREBUILD_PHASE_SECONDS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram_vec!( + "lean_block_prebuild_phase_seconds", + "Proposer pre-build phase duration: build (off-thread build_block) and merge \ + (off-thread Type-1 → Type-2 aggregation).", + &["phase"], + vec![0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 4.0, 8.0] + ) + .unwrap() + }); + LEAN_BLOCK_PREBUILD_PHASE_SECONDS + .with_label_values(&[phase]) + .observe(elapsed.as_secs_f64()); +} + +/// Increment when a block pre-built at the previous slot's interval 4 was +/// published at interval 0 (the optimization paying off). +pub fn inc_prebuild_hit() { + static C: std::sync::LazyLock = std::sync::LazyLock::new(|| { + register_int_counter!( + "lean_block_prebuild_hit_total", + "Pre-built blocks published at interval 0." + ) + .unwrap() + }); + C.inc(); +} + +/// Increment when no pre-built block was ready at interval 0 (synchronous build). +pub fn inc_prebuild_miss() { + static C: std::sync::LazyLock = std::sync::LazyLock::new(|| { + register_int_counter!( + "lean_block_prebuild_miss_total", + "Proposals with no pre-built block ready (fell back to synchronous build)." + ) + .unwrap() + }); + C.inc(); +} + +/// Increment when a pre-built block existed but was stale at interval 0 +/// (head moved or justified advanced), forcing a synchronous rebuild. +pub fn inc_prebuild_stale() { + static C: std::sync::LazyLock = std::sync::LazyLock::new(|| { + register_int_counter!( + "lean_block_prebuild_stale_total", + "Pre-built blocks discarded as stale at interval 0." + ) + .unwrap() + }); + C.inc(); +} + /// Increment the completed block-proposal attestation selection runs counter. pub fn inc_block_proposal_attestation_builds() { LEAN_BLOCK_PROPOSAL_ATTESTATION_BUILDS_TOTAL.inc(); diff --git a/crates/blockchain/src/prebuild.rs b/crates/blockchain/src/prebuild.rs new file mode 100644 index 00000000..10d1fe89 --- /dev/null +++ b/crates/blockchain/src/prebuild.rs @@ -0,0 +1,274 @@ +//! Proposer pre-build: build and aggregate the next slot's block one interval +//! early on worker threads, then publish it at interval 0. +//! +//! Mirrors `aggregation.rs`: the actor stays on its message loop while +//! `spawn_blocking` workers run the expensive leanVM proofs and stream results +//! back as messages. Signing sits on the actor between the two worker phases +//! because XMSS keys are stateful (see `key_manager::sign_block_root`). + +use std::collections::{HashMap, HashSet}; + +use ethlambda_types::{ + attestation::AttestationData, + block::{Block, ByteList512KiB, MultiMessageAggregate, SignedBlock, TypeOneMultiSignature}, + primitives::H256, + signature::{ValidatorPublicKey, ValidatorSignature}, + state::State, +}; +use spawned_concurrency::message::Message; +use spawned_concurrency::tasks::ActorRef; +use tokio_util::sync::CancellationToken; +use tracing::{error, warn}; + +use crate::block_builder::{PostBlockCheckpoints, build_block}; +use crate::metrics; + +/// A block fully built and signed ahead of its proposal slot, awaiting +/// publication at interval 0. +pub(crate) struct PreparedBlock { + /// Proposal slot this block targets. + pub(crate) slot: u64, + /// Validator that will propose it. + pub(crate) validator_id: u64, + /// Head the block was built on. Must still be the canonical head at + /// publish time, or a late block / reorg has invalidated it. + pub(crate) parent_root: H256, + /// Justified slot the build closed over. Per leanSpec #595 the published + /// block must not lag the store's justified checkpoint; if the store's + /// justified slot advanced past this between build and publish, fall back. + pub(crate) built_justified_slot: u64, + /// Fully assembled block + Type-2 proof, ready to process and publish. + pub(crate) signed_block: SignedBlock, +} + +/// Decide whether a prepared block is still safe to publish at interval 0. +/// +/// Pure so it can be unit-tested without an actor or store. +pub(crate) fn prebuilt_block_is_usable( + prepared: &PreparedBlock, + proposal_slot: u64, + proposer_id: u64, + live_head: H256, + store_justified_slot: u64, +) -> bool { + prepared.slot == proposal_slot + && prepared.validator_id == proposer_id + && prepared.parent_root == live_head + && prepared.built_justified_slot >= store_justified_slot +} + +/// Tracks the in-flight pre-build so the actor can fence stale results by slot +/// and cancel on shutdown / supersession. Mirrors `AggregationSession`. +pub(crate) struct PrebuildSession { + /// Proposal slot; used as the fencing id. + pub(crate) session_id: u64, + pub(crate) cancel: CancellationToken, + #[allow(dead_code)] // held so the actor owns the worker handle for its lifetime + pub(crate) worker: tokio::task::JoinHandle<()>, +} + +/// Owned inputs for Phase A — captured on the actor thread, no store access. +pub(crate) struct PrebuildSnapshot { + pub(crate) slot: u64, + pub(crate) validator_id: u64, + pub(crate) parent_root: H256, + pub(crate) head_state: State, + pub(crate) known_block_roots: HashSet, + pub(crate) aggregated_payloads: HashMap)>, +} + +/// Owned inputs for Phase B. +pub(crate) struct PrebuildMergeInputs { + pub(crate) slot: u64, + pub(crate) validator_id: u64, + pub(crate) parent_root: H256, + pub(crate) built_justified_slot: u64, + pub(crate) block: Block, + pub(crate) block_root: H256, + /// Proposer's proposal pubkey, decoded on the actor. + pub(crate) proposer_pubkey: ValidatorPublicKey, + /// Proposer's raw XMSS signature over `block_root`, decoded on the actor. + pub(crate) proposer_signature: ValidatorSignature, + /// `(participant_pubkeys, proof_bytes)` for each attestation Type-1, + /// resolved on the actor (needs the validator registry). + pub(crate) attestation_merge_inputs: Vec<(Vec, ByteList512KiB)>, +} + +/// Phase A result: block is built (so block_root is known), not yet signed. +pub(crate) struct PrebuildBuilt { + pub(crate) session_id: u64, + pub(crate) validator_id: u64, + pub(crate) parent_root: H256, + pub(crate) block: Block, + pub(crate) type_one_proofs: Vec, + pub(crate) post_checkpoints: PostBlockCheckpoints, +} +impl Message for PrebuildBuilt { + type Result = (); +} + +/// Phase B result: block fully assembled with its Type-2 proof. +pub(crate) struct PrebuildReady { + pub(crate) session_id: u64, + pub(crate) prepared: PreparedBlock, +} +impl Message for PrebuildReady { + type Result = (); +} + +/// Phase A: build the block off-thread, send `PrebuildBuilt` back. +pub(crate) fn run_prebuild_build_worker( + snapshot: PrebuildSnapshot, + actor: ActorRef, + cancel: CancellationToken, +) { + if cancel.is_cancelled() { + return; + } + let start = std::time::Instant::now(); + let result = build_block( + &snapshot.head_state, + snapshot.slot, + snapshot.validator_id, + snapshot.parent_root, + &snapshot.known_block_roots, + &snapshot.aggregated_payloads, + ); + metrics::observe_prebuild_phase("build", start.elapsed()); + + let (block, type_one_proofs, post_checkpoints) = match result { + Ok(v) => v, + Err(err) => { + warn!(slot = snapshot.slot, %err, "Pre-build phase A failed"); + return; + } + }; + + let _ = actor.send(PrebuildBuilt { + session_id: snapshot.slot, + validator_id: snapshot.validator_id, + parent_root: snapshot.parent_root, + block, + type_one_proofs, + post_checkpoints, + }); +} + +/// Phase B: wrap proposer sig as Type-1, merge all Type-1s into the Type-2, +/// send `PrebuildReady`. +pub(crate) fn run_prebuild_merge_worker( + inputs: PrebuildMergeInputs, + actor: ActorRef, + cancel: CancellationToken, +) { + if cancel.is_cancelled() { + return; + } + let start = std::time::Instant::now(); + + let proposer_proof_bytes = match ethlambda_crypto::aggregate_signatures( + vec![inputs.proposer_pubkey.clone()], + vec![inputs.proposer_signature], + &inputs.block_root, + inputs.slot as u32, + ) { + Ok(b) => b, + Err(err) => { + error!(slot = inputs.slot, %err, "Pre-build wrap failed"); + return; + } + }; + + let mut merge_inputs = inputs.attestation_merge_inputs; + merge_inputs.push((vec![inputs.proposer_pubkey], proposer_proof_bytes)); + + let merged_bytes = match ethlambda_crypto::merge_type_1s_into_type_2(merge_inputs) { + Ok(b) => b, + Err(err) => { + error!(slot = inputs.slot, %err, "Pre-build merge failed"); + return; + } + }; + let proof = match MultiMessageAggregate::from_bytes(merged_bytes.iter().as_slice()) { + Ok(p) => p, + Err(err) => { + error!(slot = inputs.slot, %err, "Pre-build aggregate decode failed"); + return; + } + }; + metrics::observe_prebuild_phase("merge", start.elapsed()); + + let prepared = PreparedBlock { + slot: inputs.slot, + validator_id: inputs.validator_id, + parent_root: inputs.parent_root, + built_justified_slot: inputs.built_justified_slot, + signed_block: SignedBlock { + message: inputs.block, + proof, + }, + }; + let _ = actor.send(PrebuildReady { + session_id: inputs.slot, + prepared, + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use ethlambda_types::block::BlockBody; + + fn root(b: u8) -> H256 { + H256::from_slice(&[b; 32]) + } + + fn dummy_signed_block() -> SignedBlock { + SignedBlock { + message: Block { + slot: 0, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: H256::ZERO, + body: BlockBody::default(), + }, + proof: MultiMessageAggregate::default(), + } + } + + fn prepared(slot: u64, vid: u64, parent: H256, just: u64) -> PreparedBlock { + PreparedBlock { + slot, + validator_id: vid, + parent_root: parent, + built_justified_slot: just, + signed_block: dummy_signed_block(), + } + } + + #[test] + fn usable_when_head_and_justified_match() { + let p = prepared(10, 3, root(0xAB), 7); + assert!(prebuilt_block_is_usable(&p, 10, 3, root(0xAB), 7)); + } + + #[test] + fn unusable_when_head_moved() { + let p = prepared(10, 3, root(0xAB), 7); + assert!(!prebuilt_block_is_usable(&p, 10, 3, root(0xCD), 7)); + } + + #[test] + fn unusable_when_justified_advanced_past_build() { + let p = prepared(10, 3, root(0xAB), 7); + // store justified is now 8 > 7 → would regress justification. + assert!(!prebuilt_block_is_usable(&p, 10, 3, root(0xAB), 8)); + } + + #[test] + fn unusable_for_wrong_slot_or_proposer() { + let p = prepared(10, 3, root(0xAB), 7); + assert!(!prebuilt_block_is_usable(&p, 11, 3, root(0xAB), 7)); + assert!(!prebuilt_block_is_usable(&p, 10, 4, root(0xAB), 7)); + } +} diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 07c278d8..4fd35f81 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -719,7 +719,7 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { /// /// Ensures store is up-to-date and processes any pending attestations /// before returning the canonical head. -fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { +pub fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Calculate time corresponding to this slot let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; From 12f34c1edda89a9a0899d4e2afc3d19459a003d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:18:13 -0300 Subject: [PATCH 2/6] refactor(blockchain): build the pre-built block synchronously on the actor Replace the two-phase off-thread pre-build (spawn_blocking workers + PrebuildBuilt/PrebuildReady messages + session/cancel machinery) with a single synchronous build on the actor at the previous slot's interval 4. Removing the worker hop eliminates the timing race: the block is fully assembled before the interval-0 tick can consume it, so nearly every proposal is a hit instead of racing the merge against the 800ms window. Shared assemble_signed_block / process_and_publish_block helpers back both the synchronous proposal path and the pre-build. Blocking the actor during the build is acceptable: it has no other consensus-critical duty between interval 4 and the next slot. Devnet (4 nodes, 1 aggregator): finalization healthy (fin lag ~3), ~near-100% prebuild hit rate, zero skipped ticks, zero errors. --- crates/blockchain/src/lib.rs | 319 +++++++++++------------------- crates/blockchain/src/metrics.rs | 20 -- crates/blockchain/src/prebuild.rs | 190 +----------------- 3 files changed, 122 insertions(+), 407 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 4684c963..d503d8c3 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -8,7 +8,7 @@ use ethlambda_types::{ ShortRoot, aggregator::AggregatorController, attestation::{SignedAggregatedAttestation, SignedAttestation}, - block::{ByteList512KiB, MultiMessageAggregate, SignedBlock}, + block::{Block, ByteList512KiB, MultiMessageAggregate, SignedBlock, TypeOneMultiSignature}, primitives::{H256, HashTreeRoot as _}, signature::{ValidatorPublicKey, ValidatorSignature}, }; @@ -17,11 +17,9 @@ use crate::aggregation::{ AGGREGATION_DEADLINE, AggregateProduced, AggregationDeadline, AggregationDone, AggregationSession, PRIOR_WORKER_JOIN_TIMEOUT, run_aggregation_worker, }; +use crate::block_builder::build_block; use crate::key_manager::ValidatorKeyPair; -use crate::prebuild::{ - PrebuildBuilt, PrebuildMergeInputs, PrebuildReady, PrebuildSession, PrebuildSnapshot, - PreparedBlock, prebuilt_block_is_usable, run_prebuild_build_worker, run_prebuild_merge_worker, -}; +use crate::prebuild::{PreparedBlock, prebuilt_block_is_usable}; use spawned_concurrency::actor; use spawned_concurrency::error::ActorError; use spawned_concurrency::protocol; @@ -156,7 +154,6 @@ impl BlockChain { aggregator, pending_block_parents: HashMap::new(), current_aggregation: None, - current_prebuild: None, prepared_block: None, last_tick_instant: None, attestation_committee_count, @@ -213,10 +210,9 @@ pub struct BlockChainServer { /// the next interval 2 takes over. current_aggregation: Option, - /// In-flight proposer pre-build started at the previous slot's interval 4. - current_prebuild: Option, - /// Completed pre-built block awaiting publication at its proposal slot's - /// interval 0. Cleared on use, on staleness, or when superseded. + /// Block built synchronously at the previous slot's interval 4, awaiting + /// publication at this proposal slot's interval 0. Cleared on use, on + /// staleness, or when superseded by the next interval-4 build. prepared_block: Option, /// Last tick instant for measuring interval duration. @@ -341,16 +337,16 @@ impl BlockChainServer { } } - // Interval 4: if one of our validators proposes the NEXT slot, kick off - // an off-thread pre-build so the heavy leanVM work runs in the ~1600ms - // window up to that slot's interval 0 instead of inside interval 0. + // Interval 4: if one of our validators proposes the NEXT slot, build its + // block now (synchronously, blocking the actor) so the heavy leanVM work + // is done before interval 0 and the proposer only has to publish. if interval == 4 { let next_slot = slot + 1; let next_proposer = self .sync_status .gate_proposer(self.get_our_proposer(next_slot)); if let Some(validator_id) = next_proposer { - self.start_prebuild_session(next_slot, validator_id, ctx); + self.prebuild_block(next_slot, validator_id); } } @@ -447,16 +443,15 @@ impl BlockChainServer { }); } - /// Snapshot block-build inputs and spawn the Phase A pre-build worker. - fn start_prebuild_session(&mut self, slot: u64, validator_id: u64, ctx: &Context) { - // Supersede any earlier in-flight pre-build (e.g. a missed slot). - if let Some(prior) = self.current_prebuild.take() { - prior.cancel.cancel(); - } - // A stale prepared block from an earlier slot must not survive into a - // new pre-build round. - self.prepared_block = None; - + /// Build the next slot's block synchronously and stash it for publication + /// at interval 0. + /// + /// Runs on the actor thread, blocking it for the duration of the build + /// (the expensive part is the leanVM Type-1 → Type-2 merge). That is + /// acceptable here: between interval 4 and the next slot the actor has no + /// other consensus-critical duty, and a prepared block lets the proposer + /// publish at interval 0 without paying the build cost then. + fn prebuild_block(&mut self, slot: u64, validator_id: u64) { // Build against the current canonical head, read-only. We must NOT use // `get_proposal_head` here: it ticks the store to `slot` time and accepts // attestations, which one interval early would advance the clock @@ -469,29 +464,47 @@ impl BlockChainServer { warn!(%slot, "Pre-build skipped: missing head state"); return; }; + let known_block_roots = self.store.get_block_roots(); + let aggregated_payloads = self.store.known_aggregated_payloads(); - let snapshot = PrebuildSnapshot { + let _timing = metrics::time_block_building(); + let (block, type_one_proofs, post_checkpoints) = match build_block( + &head_state, slot, validator_id, parent_root, - head_state, - known_block_roots: self.store.get_block_roots(), - aggregated_payloads: self.store.known_aggregated_payloads(), + &known_block_roots, + &aggregated_payloads, + ) { + Ok(built) => built, + Err(err) => { + warn!(%slot, %validator_id, %err, "Pre-build failed"); + metrics::inc_block_building_failures(); + return; + } }; + let built_justified_slot = post_checkpoints.justified.slot; - let cancel = CancellationToken::new(); - let worker_actor = ctx.actor_ref().clone(); - let worker_cancel = cancel.clone(); - let worker = tokio::task::spawn_blocking(move || { - run_prebuild_build_worker(snapshot, worker_actor, worker_cancel); - }); + coverage::emit_proposal_coverage( + &self.store, + self.attestation_committee_count, + block.body.attestations.iter(), + ); - self.current_prebuild = Some(PrebuildSession { - session_id: slot, - cancel, - worker, + let Some(signed_block) = + self.assemble_signed_block(slot, validator_id, block, type_one_proofs) + else { + return; + }; + + self.prepared_block = Some(PreparedBlock { + slot, + validator_id, + parent_root, + built_justified_slot, + signed_block, }); - info!(%slot, %validator_id, "Started proposer pre-build (phase A)"); + info!(%slot, %validator_id, "Pre-built block ready"); } /// Returns the validator ID if any of our validators is the proposer for this slot. @@ -569,21 +582,15 @@ impl BlockChainServer { store_justified_slot, ) { metrics::inc_prebuild_hit(); - let signed_block = prepared.signed_block; - if let Err(err) = self.process_block(signed_block.clone()) { - error!(%slot, %validator_id, %err, "Failed to process pre-built block; rebuilding"); - metrics::inc_block_building_failures(); - // Fall through to the synchronous build below. - } else { - metrics::inc_block_building_success(); - if let Some(ref p2p) = self.p2p { - let _ = p2p.publish_block(signed_block).inspect_err( - |err| error!(%slot, %validator_id, %err, "Failed to publish pre-built block"), - ); - } - info!(%slot, %validator_id, "Published pre-built block"); + if self.process_and_publish_block( + slot, + validator_id, + prepared.signed_block, + "Published pre-built block", + ) { return; } + // Import failed; fall through to a fresh synchronous build. } else { metrics::inc_prebuild_stale(); info!(%slot, %validator_id, "Discarding stale pre-built block; rebuilding"); @@ -609,26 +616,46 @@ impl BlockChainServer { block.body.attestations.iter(), ); - // Sign the block root with the proposal key + let Some(signed_block) = + self.assemble_signed_block(slot, validator_id, block, type_one_proofs) + else { + return; + }; + self.process_and_publish_block(slot, validator_id, signed_block, "Published block"); + } + + /// Sign the block root and merge every Type-1 proof (attestations plus the + /// proposer's own signature) into the block's single Type-2 proof. + /// + /// Shared by the synchronous proposal path and `prebuild_block`. Returns + /// `None` on any signing/aggregation failure (already logged and counted). + fn assemble_signed_block( + &mut self, + slot: u64, + validator_id: u64, + block: Block, + type_one_proofs: Vec, + ) -> Option { + // Sign the block root with the proposal key. let block_root = block.hash_tree_root(); - let Ok(proposer_signature) = self + let proposer_signature = self .key_manager .sign_block_root(validator_id, slot as u32, &block_root) .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to sign block root")) - else { - metrics::inc_block_building_failures(); - return; - }; + .ok() + .or_else(|| { + metrics::inc_block_building_failures(); + None + })?; - // Assemble SignedBlock: wrap the proposer's raw XMSS signature into a - // singleton Type-1 SNARK, then merge it with every attestation Type-1 - // into the block's single Type-2 proof. + // Wrap the proposer's raw XMSS signature into a singleton Type-1 SNARK, + // then merge it with every attestation Type-1 into the single Type-2. let head_state = self.store.head_state(); let validators = &head_state.validators; let Some(proposer_validator) = validators.get(validator_id as usize) else { error!(%slot, %validator_id, "Proposer index out of range when assembling block"); metrics::inc_block_building_failures(); - return; + return None; }; // Decode the proposer's proposal pubkey once and reuse it both for the @@ -637,7 +664,7 @@ impl BlockChainServer { |err| error!(%slot, %validator_id, %err, "Failed to decode proposer proposal pubkey"), ) else { metrics::inc_block_building_failures(); - return; + return None; }; let Ok(proposer_validator_signature) = @@ -646,7 +673,7 @@ impl BlockChainServer { }) else { metrics::inc_block_building_failures(); - return; + return None; }; let Ok(proposer_proof_bytes) = ethlambda_crypto::aggregate_signatures( vec![proposer_pubkey.clone()], @@ -658,7 +685,7 @@ impl BlockChainServer { |err| error!(%slot, %validator_id, %err, "Failed to wrap proposer signature as Type-1"), ) else { metrics::inc_block_building_failures(); - return; + return None; }; let mut merge_inputs: Vec<(Vec, ByteList512KiB)> = @@ -688,7 +715,7 @@ impl BlockChainServer { } if resolve_failed { metrics::inc_block_building_failures(); - return; + return None; } merge_inputs.push((vec![proposer_pubkey], proposer_proof_bytes)); @@ -701,7 +728,7 @@ impl BlockChainServer { Err(err) => { error!(%slot, %validator_id, %err, "Failed to merge Type-1s into Type-2"); metrics::inc_block_building_failures(); - return; + return None; } }; let proof = match MultiMessageAggregate::from_bytes(merged_bytes.iter().as_slice()) { @@ -709,33 +736,41 @@ impl BlockChainServer { Err(err) => { error!(%slot, %validator_id, %err, "Failed to build multi-message aggregate"); metrics::inc_block_building_failures(); - return; + return None; } }; - // `type_one_proofs` is no longer needed past this point. - drop(type_one_proofs); - let signed_block = SignedBlock { + Some(SignedBlock { message: block, proof, - }; + }) + } - // Process the block locally before publishing + /// Import a freshly built block locally, then publish it to gossip. Returns + /// `true` on successful import; on failure logs, counts it, and returns + /// `false` so the caller can fall back to a fresh build. + fn process_and_publish_block( + &mut self, + slot: u64, + validator_id: u64, + signed_block: SignedBlock, + published_msg: &'static str, + ) -> bool { if let Err(err) = self.process_block(signed_block.clone()) { error!(%slot, %validator_id, %err, "Failed to process built block"); metrics::inc_block_building_failures(); - return; - }; + return false; + } metrics::inc_block_building_success(); - // Publish to gossip network if let Some(ref p2p) = self.p2p { let _ = p2p .publish_block(signed_block) .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to publish block")); } - info!(%slot, %validator_id, "Published block"); + info!(%slot, %validator_id, "{}", published_msg); + true } /// Run block import and refresh metrics. @@ -1053,9 +1088,6 @@ impl BlockChainServer { /// `aggregate_job` call to finish (the proof itself cannot be interrupted). #[stopped] async fn on_stopped(&mut self, _ctx: &Context) { - if let Some(prebuild) = self.current_prebuild.take() { - prebuild.cancel.cancel(); - } let Some(session) = self.current_aggregation.take() else { return; }; @@ -1170,133 +1202,6 @@ impl Handler for BlockChainServer { } } -// ------------------------------------------------------------------------- -// Pre-build message handlers (worker → actor) -// ------------------------------------------------------------------------- - -impl Handler for BlockChainServer { - async fn handle(&mut self, msg: PrebuildBuilt, ctx: &Context) { - // Fence stale Phase A results. - let current = self.current_prebuild.as_ref().map(|s| s.session_id); - if current != Some(msg.session_id) { - trace!( - incoming = msg.session_id, - ?current, - "Dropping stale PrebuildBuilt" - ); - return; - } - - let slot = msg.session_id; - let validator_id = msg.validator_id; - - // Non-regression guard (leanSpec #595); re-checked at publish time. - let built_justified_slot = msg.post_checkpoints.justified.slot; - let block_root = msg.block.hash_tree_root(); - - // Sign on the actor — XMSS keys are stateful and single-owner. - let Ok(proposer_signature) = self - .key_manager - .sign_block_root(validator_id, slot as u32, &block_root) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Pre-build sign failed")) - else { - self.current_prebuild = None; - return; - }; - - // Resolve pubkeys for the proposer and each attestation Type-1. - let head_state = self.store.head_state(); - let validators = &head_state.validators; - let Some(proposer_validator) = validators.get(validator_id as usize) else { - error!(%slot, %validator_id, "Pre-build proposer index out of range"); - self.current_prebuild = None; - return; - }; - let Ok(proposer_pubkey) = proposer_validator.get_proposal_pubkey().inspect_err( - |err| error!(%slot, %validator_id, %err, "Pre-build proposer pubkey decode failed"), - ) else { - self.current_prebuild = None; - return; - }; - let Ok(proposer_validator_signature) = ValidatorSignature::from_bytes(&proposer_signature) - .inspect_err( - |err| error!(%slot, %validator_id, %err, "Pre-build proposer signature decode failed"), - ) - else { - self.current_prebuild = None; - return; - }; - - let mut attestation_merge_inputs: Vec<(Vec, ByteList512KiB)> = - Vec::with_capacity(msg.type_one_proofs.len()); - let mut failed = false; - for t1 in &msg.type_one_proofs { - let mut pubkeys = Vec::new(); - for vid in t1.participant_indices() { - match validators - .get(vid as usize) - .and_then(|v| v.get_attestation_pubkey().ok()) - { - Some(pk) => pubkeys.push(pk), - None => { - error!(%slot, vid, "Pre-build participant pubkey resolution failed"); - failed = true; - break; - } - } - } - if failed { - break; - } - attestation_merge_inputs.push((pubkeys, t1.proof.clone())); - } - if failed { - self.current_prebuild = None; - return; - } - - let inputs = PrebuildMergeInputs { - slot, - validator_id, - parent_root: msg.parent_root, - built_justified_slot, - block: msg.block, - block_root, - proposer_pubkey, - proposer_signature: proposer_validator_signature, - attestation_merge_inputs, - }; - - // Phase B: heavy leanVM merge off-thread. Reuse the session's cancel - // token so a superseding round or shutdown stops it; results are fenced - // by slot in the PrebuildReady handler. - let Some(cancel) = self.current_prebuild.as_ref().map(|s| s.cancel.clone()) else { - return; - }; - let worker_actor = ctx.actor_ref().clone(); - tokio::task::spawn_blocking(move || { - run_prebuild_merge_worker(inputs, worker_actor, cancel); - }); - info!(%slot, %validator_id, "Pre-build signed; merging (phase B)"); - } -} - -impl Handler for BlockChainServer { - async fn handle(&mut self, msg: PrebuildReady, _ctx: &Context) { - let current = self.current_prebuild.as_ref().map(|s| s.session_id); - if current != Some(msg.session_id) { - trace!( - incoming = msg.session_id, - ?current, - "Dropping stale PrebuildReady" - ); - return; - } - info!(slot = msg.session_id, "Proposer pre-build ready"); - self.prepared_block = Some(msg.prepared); - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 8ebe5221..7280efa5 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -873,26 +873,6 @@ pub fn observe_block_proposal_phase(phase: &str, elapsed: Duration) { .observe(elapsed.as_secs_f64()); } -/// Observe a proposer pre-build phase ("build" or "merge") duration. These run -/// off-thread at the previous slot's interval 4, so the histogram reaches well -/// past the 800ms interval budget. -pub fn observe_prebuild_phase(phase: &str, elapsed: Duration) { - static LEAN_BLOCK_PREBUILD_PHASE_SECONDS: std::sync::LazyLock = - std::sync::LazyLock::new(|| { - register_histogram_vec!( - "lean_block_prebuild_phase_seconds", - "Proposer pre-build phase duration: build (off-thread build_block) and merge \ - (off-thread Type-1 → Type-2 aggregation).", - &["phase"], - vec![0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 4.0, 8.0] - ) - .unwrap() - }); - LEAN_BLOCK_PREBUILD_PHASE_SECONDS - .with_label_values(&[phase]) - .observe(elapsed.as_secs_f64()); -} - /// Increment when a block pre-built at the previous slot's interval 4 was /// published at interval 0 (the optimization paying off). pub fn inc_prebuild_hit() { diff --git a/crates/blockchain/src/prebuild.rs b/crates/blockchain/src/prebuild.rs index 10d1fe89..c159d622 100644 --- a/crates/blockchain/src/prebuild.rs +++ b/crates/blockchain/src/prebuild.rs @@ -1,27 +1,14 @@ -//! Proposer pre-build: build and aggregate the next slot's block one interval -//! early on worker threads, then publish it at interval 0. +//! Proposer pre-build: build the next slot's block one interval early so the +//! heavy leanVM work (compact + Type-1 → Type-2 merge) is already done by the +//! time the proposer publishes at interval 0. //! -//! Mirrors `aggregation.rs`: the actor stays on its message loop while -//! `spawn_blocking` workers run the expensive leanVM proofs and stream results -//! back as messages. Signing sits on the actor between the two worker phases -//! because XMSS keys are stateful (see `key_manager::sign_block_root`). +//! The build runs synchronously on the blockchain actor at the previous slot's +//! interval 4 (see `BlockChainServer::prebuild_block`). This module holds the +//! resulting [`PreparedBlock`] and the pure predicate that decides, at +//! interval 0, whether it is still safe to publish. -use std::collections::{HashMap, HashSet}; - -use ethlambda_types::{ - attestation::AttestationData, - block::{Block, ByteList512KiB, MultiMessageAggregate, SignedBlock, TypeOneMultiSignature}, - primitives::H256, - signature::{ValidatorPublicKey, ValidatorSignature}, - state::State, -}; -use spawned_concurrency::message::Message; -use spawned_concurrency::tasks::ActorRef; -use tokio_util::sync::CancellationToken; -use tracing::{error, warn}; - -use crate::block_builder::{PostBlockCheckpoints, build_block}; -use crate::metrics; +use ethlambda_types::block::SignedBlock; +use ethlambda_types::primitives::H256; /// A block fully built and signed ahead of its proposal slot, awaiting /// publication at interval 0. @@ -57,167 +44,10 @@ pub(crate) fn prebuilt_block_is_usable( && prepared.built_justified_slot >= store_justified_slot } -/// Tracks the in-flight pre-build so the actor can fence stale results by slot -/// and cancel on shutdown / supersession. Mirrors `AggregationSession`. -pub(crate) struct PrebuildSession { - /// Proposal slot; used as the fencing id. - pub(crate) session_id: u64, - pub(crate) cancel: CancellationToken, - #[allow(dead_code)] // held so the actor owns the worker handle for its lifetime - pub(crate) worker: tokio::task::JoinHandle<()>, -} - -/// Owned inputs for Phase A — captured on the actor thread, no store access. -pub(crate) struct PrebuildSnapshot { - pub(crate) slot: u64, - pub(crate) validator_id: u64, - pub(crate) parent_root: H256, - pub(crate) head_state: State, - pub(crate) known_block_roots: HashSet, - pub(crate) aggregated_payloads: HashMap)>, -} - -/// Owned inputs for Phase B. -pub(crate) struct PrebuildMergeInputs { - pub(crate) slot: u64, - pub(crate) validator_id: u64, - pub(crate) parent_root: H256, - pub(crate) built_justified_slot: u64, - pub(crate) block: Block, - pub(crate) block_root: H256, - /// Proposer's proposal pubkey, decoded on the actor. - pub(crate) proposer_pubkey: ValidatorPublicKey, - /// Proposer's raw XMSS signature over `block_root`, decoded on the actor. - pub(crate) proposer_signature: ValidatorSignature, - /// `(participant_pubkeys, proof_bytes)` for each attestation Type-1, - /// resolved on the actor (needs the validator registry). - pub(crate) attestation_merge_inputs: Vec<(Vec, ByteList512KiB)>, -} - -/// Phase A result: block is built (so block_root is known), not yet signed. -pub(crate) struct PrebuildBuilt { - pub(crate) session_id: u64, - pub(crate) validator_id: u64, - pub(crate) parent_root: H256, - pub(crate) block: Block, - pub(crate) type_one_proofs: Vec, - pub(crate) post_checkpoints: PostBlockCheckpoints, -} -impl Message for PrebuildBuilt { - type Result = (); -} - -/// Phase B result: block fully assembled with its Type-2 proof. -pub(crate) struct PrebuildReady { - pub(crate) session_id: u64, - pub(crate) prepared: PreparedBlock, -} -impl Message for PrebuildReady { - type Result = (); -} - -/// Phase A: build the block off-thread, send `PrebuildBuilt` back. -pub(crate) fn run_prebuild_build_worker( - snapshot: PrebuildSnapshot, - actor: ActorRef, - cancel: CancellationToken, -) { - if cancel.is_cancelled() { - return; - } - let start = std::time::Instant::now(); - let result = build_block( - &snapshot.head_state, - snapshot.slot, - snapshot.validator_id, - snapshot.parent_root, - &snapshot.known_block_roots, - &snapshot.aggregated_payloads, - ); - metrics::observe_prebuild_phase("build", start.elapsed()); - - let (block, type_one_proofs, post_checkpoints) = match result { - Ok(v) => v, - Err(err) => { - warn!(slot = snapshot.slot, %err, "Pre-build phase A failed"); - return; - } - }; - - let _ = actor.send(PrebuildBuilt { - session_id: snapshot.slot, - validator_id: snapshot.validator_id, - parent_root: snapshot.parent_root, - block, - type_one_proofs, - post_checkpoints, - }); -} - -/// Phase B: wrap proposer sig as Type-1, merge all Type-1s into the Type-2, -/// send `PrebuildReady`. -pub(crate) fn run_prebuild_merge_worker( - inputs: PrebuildMergeInputs, - actor: ActorRef, - cancel: CancellationToken, -) { - if cancel.is_cancelled() { - return; - } - let start = std::time::Instant::now(); - - let proposer_proof_bytes = match ethlambda_crypto::aggregate_signatures( - vec![inputs.proposer_pubkey.clone()], - vec![inputs.proposer_signature], - &inputs.block_root, - inputs.slot as u32, - ) { - Ok(b) => b, - Err(err) => { - error!(slot = inputs.slot, %err, "Pre-build wrap failed"); - return; - } - }; - - let mut merge_inputs = inputs.attestation_merge_inputs; - merge_inputs.push((vec![inputs.proposer_pubkey], proposer_proof_bytes)); - - let merged_bytes = match ethlambda_crypto::merge_type_1s_into_type_2(merge_inputs) { - Ok(b) => b, - Err(err) => { - error!(slot = inputs.slot, %err, "Pre-build merge failed"); - return; - } - }; - let proof = match MultiMessageAggregate::from_bytes(merged_bytes.iter().as_slice()) { - Ok(p) => p, - Err(err) => { - error!(slot = inputs.slot, %err, "Pre-build aggregate decode failed"); - return; - } - }; - metrics::observe_prebuild_phase("merge", start.elapsed()); - - let prepared = PreparedBlock { - slot: inputs.slot, - validator_id: inputs.validator_id, - parent_root: inputs.parent_root, - built_justified_slot: inputs.built_justified_slot, - signed_block: SignedBlock { - message: inputs.block, - proof, - }, - }; - let _ = actor.send(PrebuildReady { - session_id: inputs.slot, - prepared, - }); -} - #[cfg(test)] mod tests { use super::*; - use ethlambda_types::block::BlockBody; + use ethlambda_types::block::{Block, BlockBody, MultiMessageAggregate}; fn root(b: u8) -> H256 { H256::from_slice(&[b; 32]) From 09ced2a468c10e842406e637c3db61574835214f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:58:46 -0300 Subject: [PATCH 3/6] refactor(blockchain): drop prebuild outcome metrics, colocate types in block_builder Remove the hit/miss/stale pre-build counters (block_building_time and the proposal phase histograms already cover the timing), and move PreparedBlock and prebuilt_block_is_usable from the standalone prebuild module into block_builder alongside PostBlockCheckpoints, which is where the rest of the block-building types live. --- crates/blockchain/src/block_builder.rs | 95 +++++++++++++++++++++- crates/blockchain/src/lib.rs | 8 +- crates/blockchain/src/metrics.rs | 38 --------- crates/blockchain/src/prebuild.rs | 104 ------------------------- 4 files changed, 95 insertions(+), 150 deletions(-) delete mode 100644 crates/blockchain/src/prebuild.rs diff --git a/crates/blockchain/src/block_builder.rs b/crates/blockchain/src/block_builder.rs index 21cbbbef..e087ed4d 100644 --- a/crates/blockchain/src/block_builder.rs +++ b/crates/blockchain/src/block_builder.rs @@ -22,7 +22,7 @@ use ethlambda_state_transition::{ use ethlambda_types::{ ShortRoot, attestation::{AggregatedAttestation, AggregationBits, AttestationData}, - block::{AggregatedAttestations, Block, BlockBody, TypeOneMultiSignature}, + block::{AggregatedAttestations, Block, BlockBody, SignedBlock, TypeOneMultiSignature}, checkpoint::Checkpoint, primitives::{H256, HashTreeRoot as _}, state::{JustifiedSlots, State}, @@ -42,6 +42,40 @@ pub struct PostBlockCheckpoints { pub finalized: Checkpoint, } +/// A block built ahead of its proposal slot (at the previous slot's interval 4) +/// and signed, awaiting publication at interval 0. +pub(crate) struct PreparedBlock { + /// Proposal slot this block targets. + pub(crate) slot: u64, + /// Validator that will propose it. + pub(crate) validator_id: u64, + /// Head the block was built on. Must still be the canonical head at + /// publish time, or a late block / reorg has invalidated it. + pub(crate) parent_root: H256, + /// Justified slot the build closed over. Per leanSpec #595 the published + /// block must not lag the store's justified checkpoint; if the store's + /// justified slot advanced past this between build and publish, fall back. + pub(crate) built_justified_slot: u64, + /// Fully assembled block + Type-2 proof, ready to process and publish. + pub(crate) signed_block: SignedBlock, +} + +/// Decide whether a prepared block is still safe to publish at interval 0. +/// +/// Pure so it can be unit-tested without an actor or store. +pub(crate) fn prebuilt_block_is_usable( + prepared: &PreparedBlock, + proposal_slot: u64, + proposer_id: u64, + live_head: H256, + store_justified_slot: u64, +) -> bool { + prepared.slot == proposal_slot + && prepared.validator_id == proposer_id + && prepared.parent_root == live_head + && prepared.built_justified_slot >= store_justified_slot +} + /// Build a valid block on top of this state. /// /// Selects attestations via `select_attestations`, compacts duplicate @@ -1307,3 +1341,62 @@ mod tests { assert_eq!(covered, HashSet::from([0, 1, 2, 3])); } } + +#[cfg(test)] +mod prebuild_tests { + use super::*; + use ethlambda_types::block::{BlockBody, MultiMessageAggregate}; + + fn root(b: u8) -> H256 { + H256::from_slice(&[b; 32]) + } + + fn dummy_signed_block() -> SignedBlock { + SignedBlock { + message: Block { + slot: 0, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: H256::ZERO, + body: BlockBody::default(), + }, + proof: MultiMessageAggregate::default(), + } + } + + fn prepared(slot: u64, vid: u64, parent: H256, just: u64) -> PreparedBlock { + PreparedBlock { + slot, + validator_id: vid, + parent_root: parent, + built_justified_slot: just, + signed_block: dummy_signed_block(), + } + } + + #[test] + fn usable_when_head_and_justified_match() { + let p = prepared(10, 3, root(0xAB), 7); + assert!(prebuilt_block_is_usable(&p, 10, 3, root(0xAB), 7)); + } + + #[test] + fn unusable_when_head_moved() { + let p = prepared(10, 3, root(0xAB), 7); + assert!(!prebuilt_block_is_usable(&p, 10, 3, root(0xCD), 7)); + } + + #[test] + fn unusable_when_justified_advanced_past_build() { + let p = prepared(10, 3, root(0xAB), 7); + // store justified is now 8 > 7 → would regress justification. + assert!(!prebuilt_block_is_usable(&p, 10, 3, root(0xAB), 8)); + } + + #[test] + fn unusable_for_wrong_slot_or_proposer() { + let p = prepared(10, 3, root(0xAB), 7); + assert!(!prebuilt_block_is_usable(&p, 11, 3, root(0xAB), 7)); + assert!(!prebuilt_block_is_usable(&p, 10, 4, root(0xAB), 7)); + } +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index d503d8c3..d27a2a30 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -17,9 +17,8 @@ use crate::aggregation::{ AGGREGATION_DEADLINE, AggregateProduced, AggregationDeadline, AggregationDone, AggregationSession, PRIOR_WORKER_JOIN_TIMEOUT, run_aggregation_worker, }; -use crate::block_builder::build_block; +use crate::block_builder::{PreparedBlock, build_block, prebuilt_block_is_usable}; use crate::key_manager::ValidatorKeyPair; -use crate::prebuild::{PreparedBlock, prebuilt_block_is_usable}; use spawned_concurrency::actor; use spawned_concurrency::error::ActorError; use spawned_concurrency::protocol; @@ -35,7 +34,6 @@ pub(crate) mod coverage; pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; -pub(crate) mod prebuild; pub mod reaggregate; pub mod store; @@ -581,7 +579,6 @@ impl BlockChainServer { live_head, store_justified_slot, ) { - metrics::inc_prebuild_hit(); if self.process_and_publish_block( slot, validator_id, @@ -592,11 +589,8 @@ impl BlockChainServer { } // Import failed; fall through to a fresh synchronous build. } else { - metrics::inc_prebuild_stale(); info!(%slot, %validator_id, "Discarding stale pre-built block; rebuilding"); } - } else { - metrics::inc_prebuild_miss(); } let _timing = metrics::time_block_building(); diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 7280efa5..28e510f5 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -873,44 +873,6 @@ pub fn observe_block_proposal_phase(phase: &str, elapsed: Duration) { .observe(elapsed.as_secs_f64()); } -/// Increment when a block pre-built at the previous slot's interval 4 was -/// published at interval 0 (the optimization paying off). -pub fn inc_prebuild_hit() { - static C: std::sync::LazyLock = std::sync::LazyLock::new(|| { - register_int_counter!( - "lean_block_prebuild_hit_total", - "Pre-built blocks published at interval 0." - ) - .unwrap() - }); - C.inc(); -} - -/// Increment when no pre-built block was ready at interval 0 (synchronous build). -pub fn inc_prebuild_miss() { - static C: std::sync::LazyLock = std::sync::LazyLock::new(|| { - register_int_counter!( - "lean_block_prebuild_miss_total", - "Proposals with no pre-built block ready (fell back to synchronous build)." - ) - .unwrap() - }); - C.inc(); -} - -/// Increment when a pre-built block existed but was stale at interval 0 -/// (head moved or justified advanced), forcing a synchronous rebuild. -pub fn inc_prebuild_stale() { - static C: std::sync::LazyLock = std::sync::LazyLock::new(|| { - register_int_counter!( - "lean_block_prebuild_stale_total", - "Pre-built blocks discarded as stale at interval 0." - ) - .unwrap() - }); - C.inc(); -} - /// Increment the completed block-proposal attestation selection runs counter. pub fn inc_block_proposal_attestation_builds() { LEAN_BLOCK_PROPOSAL_ATTESTATION_BUILDS_TOTAL.inc(); diff --git a/crates/blockchain/src/prebuild.rs b/crates/blockchain/src/prebuild.rs deleted file mode 100644 index c159d622..00000000 --- a/crates/blockchain/src/prebuild.rs +++ /dev/null @@ -1,104 +0,0 @@ -//! Proposer pre-build: build the next slot's block one interval early so the -//! heavy leanVM work (compact + Type-1 → Type-2 merge) is already done by the -//! time the proposer publishes at interval 0. -//! -//! The build runs synchronously on the blockchain actor at the previous slot's -//! interval 4 (see `BlockChainServer::prebuild_block`). This module holds the -//! resulting [`PreparedBlock`] and the pure predicate that decides, at -//! interval 0, whether it is still safe to publish. - -use ethlambda_types::block::SignedBlock; -use ethlambda_types::primitives::H256; - -/// A block fully built and signed ahead of its proposal slot, awaiting -/// publication at interval 0. -pub(crate) struct PreparedBlock { - /// Proposal slot this block targets. - pub(crate) slot: u64, - /// Validator that will propose it. - pub(crate) validator_id: u64, - /// Head the block was built on. Must still be the canonical head at - /// publish time, or a late block / reorg has invalidated it. - pub(crate) parent_root: H256, - /// Justified slot the build closed over. Per leanSpec #595 the published - /// block must not lag the store's justified checkpoint; if the store's - /// justified slot advanced past this between build and publish, fall back. - pub(crate) built_justified_slot: u64, - /// Fully assembled block + Type-2 proof, ready to process and publish. - pub(crate) signed_block: SignedBlock, -} - -/// Decide whether a prepared block is still safe to publish at interval 0. -/// -/// Pure so it can be unit-tested without an actor or store. -pub(crate) fn prebuilt_block_is_usable( - prepared: &PreparedBlock, - proposal_slot: u64, - proposer_id: u64, - live_head: H256, - store_justified_slot: u64, -) -> bool { - prepared.slot == proposal_slot - && prepared.validator_id == proposer_id - && prepared.parent_root == live_head - && prepared.built_justified_slot >= store_justified_slot -} - -#[cfg(test)] -mod tests { - use super::*; - use ethlambda_types::block::{Block, BlockBody, MultiMessageAggregate}; - - fn root(b: u8) -> H256 { - H256::from_slice(&[b; 32]) - } - - fn dummy_signed_block() -> SignedBlock { - SignedBlock { - message: Block { - slot: 0, - proposer_index: 0, - parent_root: H256::ZERO, - state_root: H256::ZERO, - body: BlockBody::default(), - }, - proof: MultiMessageAggregate::default(), - } - } - - fn prepared(slot: u64, vid: u64, parent: H256, just: u64) -> PreparedBlock { - PreparedBlock { - slot, - validator_id: vid, - parent_root: parent, - built_justified_slot: just, - signed_block: dummy_signed_block(), - } - } - - #[test] - fn usable_when_head_and_justified_match() { - let p = prepared(10, 3, root(0xAB), 7); - assert!(prebuilt_block_is_usable(&p, 10, 3, root(0xAB), 7)); - } - - #[test] - fn unusable_when_head_moved() { - let p = prepared(10, 3, root(0xAB), 7); - assert!(!prebuilt_block_is_usable(&p, 10, 3, root(0xCD), 7)); - } - - #[test] - fn unusable_when_justified_advanced_past_build() { - let p = prepared(10, 3, root(0xAB), 7); - // store justified is now 8 > 7 → would regress justification. - assert!(!prebuilt_block_is_usable(&p, 10, 3, root(0xAB), 8)); - } - - #[test] - fn unusable_for_wrong_slot_or_proposer() { - let p = prepared(10, 3, root(0xAB), 7); - assert!(!prebuilt_block_is_usable(&p, 11, 3, root(0xAB), 7)); - assert!(!prebuilt_block_is_usable(&p, 10, 4, root(0xAB), 7)); - } -} From 7474c15d2122c7f3afa45d3f9743e53fd5d54f3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 19 Jun 2026 17:56:39 -0300 Subject: [PATCH 4/6] refactor: group on_tick conditionals by interval --- crates/blockchain/src/lib.rs | 105 +++++++++++++-------------- crates/blockchain/src/sync_status.rs | 32 -------- 2 files changed, 51 insertions(+), 86 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 7beaa62d..efe432f4 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -243,67 +243,23 @@ impl BlockChainServer { let scheduled_proposer = (interval == 0 && slot > 0) .then(|| self.get_our_proposer(slot)) .flatten(); - let proposer_validator_id = self.sync_status.gate_proposer(scheduled_proposer); - - if let Some(validator_id) = scheduled_proposer - && proposer_validator_id.is_none() - { - info!(%slot, %validator_id, "Skipping block proposal while syncing"); - } - - // Snapshot the pre-merge `new_payloads` set at the end-of-slot promote - // (interval 4), so the post-block report for this round sees its - // "timely" cohort just before it is promoted out of `new_payloads`. - // - // Only interval 4 — not the proposer's interval-0 promote. By interval 0 - // the round's votes have already been promoted at the previous slot's - // interval 4; `new_payloads` then holds only stragglers, and snapshotting - // them here would overwrite the good interval-4 snapshot the report still - // needs (those stragglers surface in the `late` section instead). Skip - // empty snapshots so a missed round keeps the last set we saw. Pure - // observability. - if interval == 4 - && let Some(snapshot) = coverage::snapshot_new_payloads(&self.store) - { - self.pre_merge_coverage = Some(snapshot); - } + let is_proposer = scheduled_proposer.is_some(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - store::on_tick( - &mut self.store, - timestamp_ms, - proposer_validator_id.is_some(), - ); + store::on_tick(&mut self.store, timestamp_ms, is_proposer); - if interval == 2 { - if is_aggregator { - coverage::emit_agg_start_new_coverage( - &self.store, - self.attestation_committee_count, - ); - self.start_aggregation_session(slot, ctx).await; - } else { - metrics::inc_aggregator_skipped_not_aggregator(); - } - } + // ==== interval 0 ==== - // Interval 4: if one of our validators proposes the NEXT slot, build its - // block now (synchronously, blocking the actor) so the heavy leanVM work - // is done before interval 0 and the proposer only has to publish. - if interval == 4 { - let next_slot = slot + 1; - let next_proposer = self - .sync_status - .gate_proposer(self.get_our_proposer(next_slot)); - if let Some(validator_id) = next_proposer { - self.prebuild_block(next_slot, validator_id); + // Now build and publish the block (after attestations have been accepted) + if let Some(validator_id) = scheduled_proposer { + if self.sync_status.duties_allowed() { + self.propose_block(slot, validator_id); + } else { + info!(%slot, %validator_id, "Skipping block proposal while syncing"); } } - // Now build and publish the block (after attestations have been accepted) - if let Some(validator_id) = proposer_validator_id { - self.propose_block(slot, validator_id); - } + // ==== interval 1 ==== // Produce attestations at interval 1 (all validators including proposer). // Reuse the same snapshot so self-delivery decisions match the rest @@ -328,6 +284,47 @@ impl BlockChainServer { } } + // ==== interval 2 ==== + + if interval == 2 { + if is_aggregator { + coverage::emit_agg_start_new_coverage( + &self.store, + self.attestation_committee_count, + ); + self.start_aggregation_session(slot, ctx).await; + } else { + metrics::inc_aggregator_skipped_not_aggregator(); + } + } + + // ==== interval 3 ==== + + // Interval 3 is handled inside [store::on_tick] + + // ==== interval 4 ==== + + if interval == 4 { + // Snapshot the pre-merge `new_payloads` set at the end-of-slot promote + // (interval 4), so the post-block report for this round sees its + // "timely" cohort just before it is promoted out of `new_payloads`. + if let Some(snapshot) = coverage::snapshot_new_payloads(&self.store) { + self.pre_merge_coverage = Some(snapshot); + } + + // If one of our validators proposes the NEXT slot, build its block + // now (synchronously, blocking the actor) so the heavy leanVM work + // is done before interval 0 and the proposer only has to publish. + let next_slot = slot + 1; + let next_proposer = self + .get_our_proposer(next_slot) + .filter(|_| self.sync_status.duties_allowed()); + + if let Some(validator_id) = next_proposer { + self.prebuild_block(next_slot, validator_id); + } + } + // Update safe target slot metric (updated by store.on_tick at interval 3) metrics::update_safe_target_slot(self.store.safe_target_slot()); // Update head slot metric (head may change when attestations are promoted at intervals 0/4) diff --git a/crates/blockchain/src/sync_status.rs b/crates/blockchain/src/sync_status.rs index e065d4ec..48b20a97 100644 --- a/crates/blockchain/src/sync_status.rs +++ b/crates/blockchain/src/sync_status.rs @@ -45,10 +45,6 @@ impl SyncStatusTracker { pub(crate) fn duties_allowed(&self) -> bool { !self.syncing } - - pub(crate) fn gate_proposer(&self, proposer: Option) -> Option { - proposer.filter(|_| self.duties_allowed()) - } } #[cfg(test)] @@ -112,32 +108,4 @@ mod tests { assert_eq!(tracker.update(15, 20, 20), SyncStatus::Synced); } - - #[test] - fn syncing_gates_proposals_and_attestations() { - let mut tracker = SyncStatusTracker::default(); - tracker.update(20, 0, 20); - - assert!(!tracker.duties_allowed()); - assert_eq!(tracker.gate_proposer(Some(3)), None); - } - - #[test] - fn caught_up_node_allows_proposals_and_attestations() { - let mut tracker = SyncStatusTracker::default(); - tracker.update(20, 0, 20); - tracker.update(20, 18, 20); - - assert!(tracker.duties_allowed()); - assert_eq!(tracker.gate_proposer(Some(3)), Some(3)); - } - - #[test] - fn network_stall_keeps_proposals_and_attestations_enabled() { - let mut tracker = SyncStatusTracker::default(); - tracker.update(100, 0, 0); - - assert!(tracker.duties_allowed()); - assert_eq!(tracker.gate_proposer(Some(3)), Some(3)); - } } From c23279d7e4ffe32c1cf3130ae50f2b24cc8fbe43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 19 Jun 2026 18:25:55 -0300 Subject: [PATCH 5/6] refactor(blockchain): share one block-build core across proposal paths Extract store::produce_block_on_head(head_root) and have both produce_block_with_signatures (ticking head, interval 0) and the new actor helper build_signed_block (read-only head, interval-4 pre-build) delegate to it. This removes the duplicated build sequence, restores the leanSpec #595 justified-divergence and proposer-auth checks plus the payload metric on the pre-build path, and makes the no-clock-tick requirement structural (the head is a parameter, the fn never ticks) rather than a comment. Also: resolve the proposal head once in propose_block (was calling the store-ticking get_proposal_head twice on the stale-fallback path), drop a redundant State clone, flatten the fast-path branch, and unify the sign-failure handling with its siblings. get_proposal_head is now pub(crate) with its tick side-effect documented. --- crates/blockchain/src/lib.rs | 148 +++++++++++++++------------------ crates/blockchain/src/store.rs | 35 ++++++-- 2 files changed, 92 insertions(+), 91 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index efe432f4..89be84e3 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -17,7 +17,7 @@ use crate::aggregation::{ AGGREGATION_DEADLINE, AggregateProduced, AggregationDeadline, AggregationDone, AggregationSession, PRIOR_WORKER_JOIN_TIMEOUT, run_aggregation_worker, }; -use crate::block_builder::{PreparedBlock, build_block, prebuilt_block_is_usable}; +use crate::block_builder::{PreparedBlock, prebuilt_block_is_usable}; use crate::key_manager::ValidatorKeyPair; use crate::sync_status::SyncStatusTracker; use spawned_concurrency::actor; @@ -399,47 +399,16 @@ impl BlockChainServer { /// other consensus-critical duty, and a prepared block lets the proposer /// publish at interval 0 without paying the build cost then. fn prebuild_block(&mut self, slot: u64, validator_id: u64) { - // Build against the current canonical head, read-only. We must NOT use - // `get_proposal_head` here: it ticks the store to `slot` time and accepts - // attestations, which one interval early would advance the clock - // prematurely (skewing finalization) and diverge the captured head from - // the interval-0 state (making every prebuilt block stale). The - // interval-4 promote has already run in `store::on_tick` this tick, so - // `store.head()` reflects the latest accepted attestations. + // Build against the current canonical head, READ-ONLY. We must not use + // `get_proposal_head` here: it ticks the store to `slot` time one interval + // early, which would skew finalization and diverge the captured head from + // the interval-0 state (making every prebuilt block stale). The interval-4 + // promote has already run in `store::on_tick` this tick, so `store.head()` + // reflects the latest accepted attestations. let parent_root = self.store.head(); - let Some(head_state) = self.store.get_state(&parent_root) else { - warn!(%slot, "Pre-build skipped: missing head state"); - return; - }; - let known_block_roots = self.store.get_block_roots(); - let aggregated_payloads = self.store.known_aggregated_payloads(); - - let _timing = metrics::time_block_building(); - let (block, type_one_proofs, post_checkpoints) = match build_block( - &head_state, - slot, - validator_id, - parent_root, - &known_block_roots, - &aggregated_payloads, - ) { - Ok(built) => built, - Err(err) => { - warn!(%slot, %validator_id, %err, "Pre-build failed"); - metrics::inc_block_building_failures(); - return; - } - }; - let built_justified_slot = post_checkpoints.justified.slot; - - coverage::emit_proposal_coverage( - &self.store, - self.attestation_committee_count, - block.body.attestations.iter(), - ); - let Some(signed_block) = - self.assemble_signed_block(slot, validator_id, block, type_one_proofs) + let Some((signed_block, built_justified_slot)) = + self.build_signed_block(slot, validator_id, parent_root) else { return; }; @@ -454,6 +423,41 @@ impl BlockChainServer { info!(%slot, %validator_id, "Pre-built block ready"); } + /// Build the block on `head_root` and assemble it into a `SignedBlock`. + /// + /// Shared by the interval-0 proposal path and the interval-4 pre-build; the + /// only difference between callers is how `head_root` is resolved (ticking + /// `get_proposal_head` vs read-only `store.head()`). Returns the signed block + /// and the justified slot it closed over, or `None` on any build/sign + /// failure (already logged and counted). + fn build_signed_block( + &mut self, + slot: u64, + validator_id: u64, + head_root: H256, + ) -> Option<(SignedBlock, u64)> { + let _timing = metrics::time_block_building(); + let (block, type_one_proofs, post_checkpoints) = + match store::produce_block_on_head(&mut self.store, slot, validator_id, head_root) { + Ok(built) => built, + Err(err) => { + error!(%slot, %validator_id, %err, "Failed to build block"); + metrics::inc_block_building_failures(); + return None; + } + }; + + coverage::emit_proposal_coverage( + &self.store, + self.attestation_committee_count, + block.body.attestations.iter(), + ); + + let signed_block = + self.assemble_signed_block(slot, validator_id, block, type_one_proofs)?; + Some((signed_block, post_checkpoints.justified.slot)) + } + /// Returns the validator ID if any of our validators is the proposer for this slot. fn get_our_proposer(&self, slot: u64) -> Option { let head_state = self.store.head_state(); @@ -516,55 +520,36 @@ impl BlockChainServer { fn propose_block(&mut self, slot: u64, validator_id: u64) { info!(%slot, %validator_id, "We are the proposer for this slot"); + // Resolve the canonical head once. This ticks the store to `slot` and + // accepts pending attestations, so both the pre-built-block revalidation + // and a fresh build below see the same interval-0 state. + let head_root = store::get_proposal_head(&mut self.store, slot); + // Fast path: publish a block pre-built at the previous slot's interval 4, // if it is still valid against the live head and justified checkpoint. if let Some(prepared) = self.prepared_block.take() { - let live_head = store::get_proposal_head(&mut self.store, slot); let store_justified_slot = self.store.latest_justified().slot; if prebuilt_block_is_usable( &prepared, slot, validator_id, - live_head, + head_root, store_justified_slot, + ) && self.process_and_publish_block( + slot, + validator_id, + prepared.signed_block, + "Published pre-built block", ) { - if self.process_and_publish_block( - slot, - validator_id, - prepared.signed_block, - "Published pre-built block", - ) { - return; - } - // Import failed; fall through to a fresh synchronous build. - } else { - info!(%slot, %validator_id, "Discarding stale pre-built block; rebuilding"); + return; } + // Stale, or import failed: fall through to a fresh synchronous build. + info!(%slot, %validator_id, "Pre-built block unusable; rebuilding"); } - let _timing = metrics::time_block_building(); - - // Build the block with attestation signatures - let Ok((block, type_one_proofs, _post_checkpoints)) = - store::produce_block_with_signatures(&mut self.store, slot, validator_id) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) - else { - metrics::inc_block_building_failures(); - return; - }; - - coverage::emit_proposal_coverage( - &self.store, - self.attestation_committee_count, - block.body.attestations.iter(), - ); - - let Some(signed_block) = - self.assemble_signed_block(slot, validator_id, block, type_one_proofs) - else { - return; - }; - self.process_and_publish_block(slot, validator_id, signed_block, "Published block"); + if let Some((signed_block, _)) = self.build_signed_block(slot, validator_id, head_root) { + self.process_and_publish_block(slot, validator_id, signed_block, "Published block"); + } } /// Sign the block root and merge every Type-1 proof (attestations plus the @@ -581,15 +566,14 @@ impl BlockChainServer { ) -> Option { // Sign the block root with the proposal key. let block_root = block.hash_tree_root(); - let proposer_signature = self + let Ok(proposer_signature) = self .key_manager .sign_block_root(validator_id, slot as u32, &block_root) .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to sign block root")) - .ok() - .or_else(|| { - metrics::inc_block_building_failures(); - None - })?; + else { + metrics::inc_block_building_failures(); + return None; + }; // Wrap the proposer's raw XMSS signature into a singleton Type-1 SNARK, // then merge it with every attestation Type-1 into the single Type-2. diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 7890bb08..54dc8e0a 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -727,9 +727,11 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { /// Get the head for block proposal at the given slot. /// -/// Ensures store is up-to-date and processes any pending attestations -/// before returning the canonical head. -pub fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { +/// NOT read-only: advances the store clock to `slot` and promotes pending +/// attestations before returning the canonical head. Use only at interval 0 +/// (the proposal tick); callers that must not move the clock should read +/// [`Store::head`] directly. +pub(crate) fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Calculate time corresponding to this slot let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; @@ -742,24 +744,39 @@ pub fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { store.head() } -/// Produce a block and per-aggregated-attestation signature payloads for the target slot. +/// Produce a block and its signature payloads, resolving the head via +/// [`get_proposal_head`] (which advances the store clock to `slot`). /// -/// Returns the finalized block and attestation signature payloads aligned -/// with `block.body.attestations`. +/// Use at interval 0. To build against an already-known head without ticking +/// the clock (e.g. a pre-build one interval early), call [`produce_block_on_head`]. pub fn produce_block_with_signatures( store: &mut Store, slot: u64, validator_index: u64, ) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { - // Get parent block and state to build upon let head_root = get_proposal_head(store, slot); + produce_block_on_head(store, slot, validator_index, head_root) +} + +/// Produce a block and per-aggregated-attestation signature payloads on top of +/// `head_root`, without moving the store clock. +/// +/// Returns the block and attestation signature payloads aligned with +/// `block.body.attestations`. Shared by the interval-0 proposal path and the +/// interval-4 pre-build; the only difference between them is how `head_root` is +/// resolved (ticking vs read-only). +pub(crate) fn produce_block_on_head( + store: &mut Store, + slot: u64, + validator_index: u64, + head_root: H256, +) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { let head_state = store .get_state(&head_root) .ok_or(StoreError::MissingParentState { parent_root: head_root, slot, - })? - .clone(); + })?; // Validate proposer authorization for this slot let num_validators = head_state.validators.len() as u64; From 752796f983875b034291702a474e2ed758a27707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 19 Jun 2026 19:06:24 -0300 Subject: [PATCH 6/6] docs: add code ticks to comment --- crates/blockchain/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 89be84e3..6c181f5d 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -300,7 +300,7 @@ impl BlockChainServer { // ==== interval 3 ==== - // Interval 3 is handled inside [store::on_tick] + // Interval 3 is handled inside [`store::on_tick`] // ==== interval 4 ====