diff --git a/crates/blockchain/src/block_builder.rs b/crates/blockchain/src/block_builder.rs index 21cbbbef..e087ed4d 100644 --- a/crates/blockchain/src/block_builder.rs +++ b/crates/blockchain/src/block_builder.rs @@ -22,7 +22,7 @@ use ethlambda_state_transition::{ use ethlambda_types::{ ShortRoot, attestation::{AggregatedAttestation, AggregationBits, AttestationData}, - block::{AggregatedAttestations, Block, BlockBody, TypeOneMultiSignature}, + block::{AggregatedAttestations, Block, BlockBody, SignedBlock, TypeOneMultiSignature}, checkpoint::Checkpoint, primitives::{H256, HashTreeRoot as _}, state::{JustifiedSlots, State}, @@ -42,6 +42,40 @@ pub struct PostBlockCheckpoints { pub finalized: Checkpoint, } +/// A block built ahead of its proposal slot (at the previous slot's interval 4) +/// and signed, awaiting publication at interval 0. +pub(crate) struct PreparedBlock { + /// Proposal slot this block targets. + pub(crate) slot: u64, + /// Validator that will propose it. + pub(crate) validator_id: u64, + /// Head the block was built on. Must still be the canonical head at + /// publish time, or a late block / reorg has invalidated it. + pub(crate) parent_root: H256, + /// Justified slot the build closed over. Per leanSpec #595 the published + /// block must not lag the store's justified checkpoint; if the store's + /// justified slot advanced past this between build and publish, fall back. + pub(crate) built_justified_slot: u64, + /// Fully assembled block + Type-2 proof, ready to process and publish. + pub(crate) signed_block: SignedBlock, +} + +/// Decide whether a prepared block is still safe to publish at interval 0. +/// +/// Pure so it can be unit-tested without an actor or store. +pub(crate) fn prebuilt_block_is_usable( + prepared: &PreparedBlock, + proposal_slot: u64, + proposer_id: u64, + live_head: H256, + store_justified_slot: u64, +) -> bool { + prepared.slot == proposal_slot + && prepared.validator_id == proposer_id + && prepared.parent_root == live_head + && prepared.built_justified_slot >= store_justified_slot +} + /// Build a valid block on top of this state. /// /// Selects attestations via `select_attestations`, compacts duplicate @@ -1307,3 +1341,62 @@ mod tests { assert_eq!(covered, HashSet::from([0, 1, 2, 3])); } } + +#[cfg(test)] +mod prebuild_tests { + use super::*; + use ethlambda_types::block::{BlockBody, MultiMessageAggregate}; + + fn root(b: u8) -> H256 { + H256::from_slice(&[b; 32]) + } + + fn dummy_signed_block() -> SignedBlock { + SignedBlock { + message: Block { + slot: 0, + proposer_index: 0, + parent_root: H256::ZERO, + state_root: H256::ZERO, + body: BlockBody::default(), + }, + proof: MultiMessageAggregate::default(), + } + } + + fn prepared(slot: u64, vid: u64, parent: H256, just: u64) -> PreparedBlock { + PreparedBlock { + slot, + validator_id: vid, + parent_root: parent, + built_justified_slot: just, + signed_block: dummy_signed_block(), + } + } + + #[test] + fn usable_when_head_and_justified_match() { + let p = prepared(10, 3, root(0xAB), 7); + assert!(prebuilt_block_is_usable(&p, 10, 3, root(0xAB), 7)); + } + + #[test] + fn unusable_when_head_moved() { + let p = prepared(10, 3, root(0xAB), 7); + assert!(!prebuilt_block_is_usable(&p, 10, 3, root(0xCD), 7)); + } + + #[test] + fn unusable_when_justified_advanced_past_build() { + let p = prepared(10, 3, root(0xAB), 7); + // store justified is now 8 > 7 → would regress justification. + assert!(!prebuilt_block_is_usable(&p, 10, 3, root(0xAB), 8)); + } + + #[test] + fn unusable_for_wrong_slot_or_proposer() { + let p = prepared(10, 3, root(0xAB), 7); + assert!(!prebuilt_block_is_usable(&p, 11, 3, root(0xAB), 7)); + assert!(!prebuilt_block_is_usable(&p, 10, 4, root(0xAB), 7)); + } +} diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index a09ea07a..6c181f5d 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -8,7 +8,7 @@ use ethlambda_types::{ ShortRoot, aggregator::AggregatorController, attestation::{SignedAggregatedAttestation, SignedAttestation}, - block::{ByteList512KiB, MultiMessageAggregate, SignedBlock}, + block::{Block, ByteList512KiB, MultiMessageAggregate, SignedBlock, TypeOneMultiSignature}, primitives::{H256, HashTreeRoot as _}, signature::{ValidatorPublicKey, ValidatorSignature}, }; @@ -17,6 +17,7 @@ use crate::aggregation::{ AGGREGATION_DEADLINE, AggregateProduced, AggregationDeadline, AggregationDone, AggregationSession, PRIOR_WORKER_JOIN_TIMEOUT, run_aggregation_worker, }; +use crate::block_builder::{PreparedBlock, prebuilt_block_is_usable}; use crate::key_manager::ValidatorKeyPair; use crate::sync_status::SyncStatusTracker; use spawned_concurrency::actor; @@ -103,6 +104,7 @@ impl BlockChain { aggregator, pending_block_parents: HashMap::new(), current_aggregation: None, + prepared_block: None, last_tick_instant: None, attestation_committee_count, pre_merge_coverage: None, @@ -158,6 +160,11 @@ pub struct BlockChainServer { /// the next interval 2 takes over. current_aggregation: Option, + /// Block built synchronously at the previous slot's interval 4, awaiting + /// publication at this proposal slot's interval 0. Cleared on use, on + /// staleness, or when superseded by the next interval-4 build. + prepared_block: Option, + /// Last tick instant for measuring interval duration. last_tick_instant: Option, @@ -236,54 +243,23 @@ impl BlockChainServer { let scheduled_proposer = (interval == 0 && slot > 0) .then(|| self.get_our_proposer(slot)) .flatten(); - let proposer_validator_id = self.sync_status.gate_proposer(scheduled_proposer); - - if let Some(validator_id) = scheduled_proposer - && proposer_validator_id.is_none() - { - info!(%slot, %validator_id, "Skipping block proposal while syncing"); - } - - // Snapshot the pre-merge `new_payloads` set at the end-of-slot promote - // (interval 4), so the post-block report for this round sees its - // "timely" cohort just before it is promoted out of `new_payloads`. - // - // Only interval 4 — not the proposer's interval-0 promote. By interval 0 - // the round's votes have already been promoted at the previous slot's - // interval 4; `new_payloads` then holds only stragglers, and snapshotting - // them here would overwrite the good interval-4 snapshot the report still - // needs (those stragglers surface in the `late` section instead). Skip - // empty snapshots so a missed round keeps the last set we saw. Pure - // observability. - if interval == 4 - && let Some(snapshot) = coverage::snapshot_new_payloads(&self.store) - { - self.pre_merge_coverage = Some(snapshot); - } + let is_proposer = scheduled_proposer.is_some(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - store::on_tick( - &mut self.store, - timestamp_ms, - proposer_validator_id.is_some(), - ); + store::on_tick(&mut self.store, timestamp_ms, is_proposer); - if interval == 2 { - if is_aggregator { - coverage::emit_agg_start_new_coverage( - &self.store, - self.attestation_committee_count, - ); - self.start_aggregation_session(slot, ctx).await; + // ==== interval 0 ==== + + // Now build and publish the block (after attestations have been accepted) + if let Some(validator_id) = scheduled_proposer { + if self.sync_status.duties_allowed() { + self.propose_block(slot, validator_id); } else { - metrics::inc_aggregator_skipped_not_aggregator(); + info!(%slot, %validator_id, "Skipping block proposal while syncing"); } } - // Now build and publish the block (after attestations have been accepted) - if let Some(validator_id) = proposer_validator_id { - self.propose_block(slot, validator_id); - } + // ==== interval 1 ==== // Produce attestations at interval 1 (all validators including proposer). // Reuse the same snapshot so self-delivery decisions match the rest @@ -308,6 +284,47 @@ impl BlockChainServer { } } + // ==== interval 2 ==== + + if interval == 2 { + if is_aggregator { + coverage::emit_agg_start_new_coverage( + &self.store, + self.attestation_committee_count, + ); + self.start_aggregation_session(slot, ctx).await; + } else { + metrics::inc_aggregator_skipped_not_aggregator(); + } + } + + // ==== interval 3 ==== + + // Interval 3 is handled inside [`store::on_tick`] + + // ==== interval 4 ==== + + if interval == 4 { + // Snapshot the pre-merge `new_payloads` set at the end-of-slot promote + // (interval 4), so the post-block report for this round sees its + // "timely" cohort just before it is promoted out of `new_payloads`. + if let Some(snapshot) = coverage::snapshot_new_payloads(&self.store) { + self.pre_merge_coverage = Some(snapshot); + } + + // If one of our validators proposes the NEXT slot, build its block + // now (synchronously, blocking the actor) so the heavy leanVM work + // is done before interval 0 and the proposer only has to publish. + let next_slot = slot + 1; + let next_proposer = self + .get_our_proposer(next_slot) + .filter(|_| self.sync_status.duties_allowed()); + + if let Some(validator_id) = next_proposer { + self.prebuild_block(next_slot, validator_id); + } + } + // Update safe target slot metric (updated by store.on_tick at interval 3) metrics::update_safe_target_slot(self.store.safe_target_slot()); // Update head slot metric (head may change when attestations are promoted at intervals 0/4) @@ -373,6 +390,74 @@ impl BlockChainServer { }); } + /// Build the next slot's block synchronously and stash it for publication + /// at interval 0. + /// + /// Runs on the actor thread, blocking it for the duration of the build + /// (the expensive part is the leanVM Type-1 → Type-2 merge). That is + /// acceptable here: between interval 4 and the next slot the actor has no + /// other consensus-critical duty, and a prepared block lets the proposer + /// publish at interval 0 without paying the build cost then. + fn prebuild_block(&mut self, slot: u64, validator_id: u64) { + // Build against the current canonical head, READ-ONLY. We must not use + // `get_proposal_head` here: it ticks the store to `slot` time one interval + // early, which would skew finalization and diverge the captured head from + // the interval-0 state (making every prebuilt block stale). The interval-4 + // promote has already run in `store::on_tick` this tick, so `store.head()` + // reflects the latest accepted attestations. + let parent_root = self.store.head(); + + let Some((signed_block, built_justified_slot)) = + self.build_signed_block(slot, validator_id, parent_root) + else { + return; + }; + + self.prepared_block = Some(PreparedBlock { + slot, + validator_id, + parent_root, + built_justified_slot, + signed_block, + }); + info!(%slot, %validator_id, "Pre-built block ready"); + } + + /// Build the block on `head_root` and assemble it into a `SignedBlock`. + /// + /// Shared by the interval-0 proposal path and the interval-4 pre-build; the + /// only difference between callers is how `head_root` is resolved (ticking + /// `get_proposal_head` vs read-only `store.head()`). Returns the signed block + /// and the justified slot it closed over, or `None` on any build/sign + /// failure (already logged and counted). + fn build_signed_block( + &mut self, + slot: u64, + validator_id: u64, + head_root: H256, + ) -> Option<(SignedBlock, u64)> { + let _timing = metrics::time_block_building(); + let (block, type_one_proofs, post_checkpoints) = + match store::produce_block_on_head(&mut self.store, slot, validator_id, head_root) { + Ok(built) => built, + Err(err) => { + error!(%slot, %validator_id, %err, "Failed to build block"); + metrics::inc_block_building_failures(); + return None; + } + }; + + coverage::emit_proposal_coverage( + &self.store, + self.attestation_committee_count, + block.body.attestations.iter(), + ); + + let signed_block = + self.assemble_signed_block(slot, validator_id, block, type_one_proofs)?; + Some((signed_block, post_checkpoints.justified.slot)) + } + /// Returns the validator ID if any of our validators is the proposer for this slot. fn get_our_proposer(&self, slot: u64) -> Option { let head_state = self.store.head_state(); @@ -435,24 +520,51 @@ impl BlockChainServer { fn propose_block(&mut self, slot: u64, validator_id: u64) { info!(%slot, %validator_id, "We are the proposer for this slot"); - let _timing = metrics::time_block_building(); - - // Build the block with attestation signatures - let Ok((block, type_one_proofs, _post_checkpoints)) = - store::produce_block_with_signatures(&mut self.store, slot, validator_id) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) - else { - metrics::inc_block_building_failures(); - return; - }; + // Resolve the canonical head once. This ticks the store to `slot` and + // accepts pending attestations, so both the pre-built-block revalidation + // and a fresh build below see the same interval-0 state. + let head_root = store::get_proposal_head(&mut self.store, slot); + + // Fast path: publish a block pre-built at the previous slot's interval 4, + // if it is still valid against the live head and justified checkpoint. + if let Some(prepared) = self.prepared_block.take() { + let store_justified_slot = self.store.latest_justified().slot; + if prebuilt_block_is_usable( + &prepared, + slot, + validator_id, + head_root, + store_justified_slot, + ) && self.process_and_publish_block( + slot, + validator_id, + prepared.signed_block, + "Published pre-built block", + ) { + return; + } + // Stale, or import failed: fall through to a fresh synchronous build. + info!(%slot, %validator_id, "Pre-built block unusable; rebuilding"); + } - coverage::emit_proposal_coverage( - &self.store, - self.attestation_committee_count, - block.body.attestations.iter(), - ); + if let Some((signed_block, _)) = self.build_signed_block(slot, validator_id, head_root) { + self.process_and_publish_block(slot, validator_id, signed_block, "Published block"); + } + } - // Sign the block root with the proposal key + /// Sign the block root and merge every Type-1 proof (attestations plus the + /// proposer's own signature) into the block's single Type-2 proof. + /// + /// Shared by the synchronous proposal path and `prebuild_block`. Returns + /// `None` on any signing/aggregation failure (already logged and counted). + fn assemble_signed_block( + &mut self, + slot: u64, + validator_id: u64, + block: Block, + type_one_proofs: Vec, + ) -> Option { + // Sign the block root with the proposal key. let block_root = block.hash_tree_root(); let Ok(proposer_signature) = self .key_manager @@ -460,18 +572,17 @@ impl BlockChainServer { .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to sign block root")) else { metrics::inc_block_building_failures(); - return; + return None; }; - // Assemble SignedBlock: wrap the proposer's raw XMSS signature into a - // singleton Type-1 SNARK, then merge it with every attestation Type-1 - // into the block's single Type-2 proof. + // Wrap the proposer's raw XMSS signature into a singleton Type-1 SNARK, + // then merge it with every attestation Type-1 into the single Type-2. let head_state = self.store.head_state(); let validators = &head_state.validators; let Some(proposer_validator) = validators.get(validator_id as usize) else { error!(%slot, %validator_id, "Proposer index out of range when assembling block"); metrics::inc_block_building_failures(); - return; + return None; }; // Decode the proposer's proposal pubkey once and reuse it both for the @@ -480,7 +591,7 @@ impl BlockChainServer { |err| error!(%slot, %validator_id, %err, "Failed to decode proposer proposal pubkey"), ) else { metrics::inc_block_building_failures(); - return; + return None; }; let Ok(proposer_validator_signature) = @@ -489,7 +600,7 @@ impl BlockChainServer { }) else { metrics::inc_block_building_failures(); - return; + return None; }; let Ok(proposer_proof_bytes) = ethlambda_crypto::aggregate_signatures( vec![proposer_pubkey.clone()], @@ -501,7 +612,7 @@ impl BlockChainServer { |err| error!(%slot, %validator_id, %err, "Failed to wrap proposer signature as Type-1"), ) else { metrics::inc_block_building_failures(); - return; + return None; }; let mut merge_inputs: Vec<(Vec, ByteList512KiB)> = @@ -531,7 +642,7 @@ impl BlockChainServer { } if resolve_failed { metrics::inc_block_building_failures(); - return; + return None; } merge_inputs.push((vec![proposer_pubkey], proposer_proof_bytes)); @@ -544,7 +655,7 @@ impl BlockChainServer { Err(err) => { error!(%slot, %validator_id, %err, "Failed to merge Type-1s into Type-2"); metrics::inc_block_building_failures(); - return; + return None; } }; let proof = match MultiMessageAggregate::from_bytes(merged_bytes.iter().as_slice()) { @@ -552,33 +663,41 @@ impl BlockChainServer { Err(err) => { error!(%slot, %validator_id, %err, "Failed to build multi-message aggregate"); metrics::inc_block_building_failures(); - return; + return None; } }; - // `type_one_proofs` is no longer needed past this point. - drop(type_one_proofs); - let signed_block = SignedBlock { + Some(SignedBlock { message: block, proof, - }; + }) + } - // Process the block locally before publishing + /// Import a freshly built block locally, then publish it to gossip. Returns + /// `true` on successful import; on failure logs, counts it, and returns + /// `false` so the caller can fall back to a fresh build. + fn process_and_publish_block( + &mut self, + slot: u64, + validator_id: u64, + signed_block: SignedBlock, + published_msg: &'static str, + ) -> bool { if let Err(err) = self.process_block(signed_block.clone()) { error!(%slot, %validator_id, %err, "Failed to process built block"); metrics::inc_block_building_failures(); - return; - }; + return false; + } metrics::inc_block_building_success(); - // Publish to gossip network if let Some(ref p2p) = self.p2p { let _ = p2p .publish_block(signed_block) .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to publish block")); } - info!(%slot, %validator_id, "Published block"); + info!(%slot, %validator_id, "{}", published_msg); + true } /// Run block import and refresh metrics. diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 79faa1bd..54dc8e0a 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -727,9 +727,11 @@ pub fn produce_attestation_data(store: &Store, slot: u64) -> AttestationData { /// Get the head for block proposal at the given slot. /// -/// Ensures store is up-to-date and processes any pending attestations -/// before returning the canonical head. -fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { +/// NOT read-only: advances the store clock to `slot` and promotes pending +/// attestations before returning the canonical head. Use only at interval 0 +/// (the proposal tick); callers that must not move the clock should read +/// [`Store::head`] directly. +pub(crate) fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { // Calculate time corresponding to this slot let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; @@ -742,24 +744,39 @@ fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { store.head() } -/// Produce a block and per-aggregated-attestation signature payloads for the target slot. +/// Produce a block and its signature payloads, resolving the head via +/// [`get_proposal_head`] (which advances the store clock to `slot`). /// -/// Returns the finalized block and attestation signature payloads aligned -/// with `block.body.attestations`. +/// Use at interval 0. To build against an already-known head without ticking +/// the clock (e.g. a pre-build one interval early), call [`produce_block_on_head`]. pub fn produce_block_with_signatures( store: &mut Store, slot: u64, validator_index: u64, ) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { - // Get parent block and state to build upon let head_root = get_proposal_head(store, slot); + produce_block_on_head(store, slot, validator_index, head_root) +} + +/// Produce a block and per-aggregated-attestation signature payloads on top of +/// `head_root`, without moving the store clock. +/// +/// Returns the block and attestation signature payloads aligned with +/// `block.body.attestations`. Shared by the interval-0 proposal path and the +/// interval-4 pre-build; the only difference between them is how `head_root` is +/// resolved (ticking vs read-only). +pub(crate) fn produce_block_on_head( + store: &mut Store, + slot: u64, + validator_index: u64, + head_root: H256, +) -> Result<(Block, Vec, PostBlockCheckpoints), StoreError> { let head_state = store .get_state(&head_root) .ok_or(StoreError::MissingParentState { parent_root: head_root, slot, - })? - .clone(); + })?; // Validate proposer authorization for this slot let num_validators = head_state.validators.len() as u64; diff --git a/crates/blockchain/src/sync_status.rs b/crates/blockchain/src/sync_status.rs index e065d4ec..48b20a97 100644 --- a/crates/blockchain/src/sync_status.rs +++ b/crates/blockchain/src/sync_status.rs @@ -45,10 +45,6 @@ impl SyncStatusTracker { pub(crate) fn duties_allowed(&self) -> bool { !self.syncing } - - pub(crate) fn gate_proposer(&self, proposer: Option) -> Option { - proposer.filter(|_| self.duties_allowed()) - } } #[cfg(test)] @@ -112,32 +108,4 @@ mod tests { assert_eq!(tracker.update(15, 20, 20), SyncStatus::Synced); } - - #[test] - fn syncing_gates_proposals_and_attestations() { - let mut tracker = SyncStatusTracker::default(); - tracker.update(20, 0, 20); - - assert!(!tracker.duties_allowed()); - assert_eq!(tracker.gate_proposer(Some(3)), None); - } - - #[test] - fn caught_up_node_allows_proposals_and_attestations() { - let mut tracker = SyncStatusTracker::default(); - tracker.update(20, 0, 20); - tracker.update(20, 18, 20); - - assert!(tracker.duties_allowed()); - assert_eq!(tracker.gate_proposer(Some(3)), Some(3)); - } - - #[test] - fn network_stall_keeps_proposals_and_attestations_enabled() { - let mut tracker = SyncStatusTracker::default(); - tracker.update(100, 0, 0); - - assert!(tracker.duties_allowed()); - assert_eq!(tracker.gate_proposer(Some(3)), Some(3)); - } }