From 04582972f6d794559d65483f5316ea5433defef3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 16 Apr 2026 17:46:17 -0300 Subject: [PATCH 1/2] fix: make signature aggregation asynchronous --- Cargo.lock | 1 + crates/blockchain/Cargo.toml | 1 + crates/blockchain/src/lib.rs | 294 +++++++++++++++++- crates/blockchain/src/metrics.rs | 10 + crates/blockchain/src/store.rs | 294 +++++++++--------- .../blockchain/tests/forkchoice_spectests.rs | 4 +- .../blockchain/tests/signature_spectests.rs | 2 +- 7 files changed, 452 insertions(+), 154 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1378eca1..ba59e2a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2083,6 +2083,7 @@ dependencies = [ "spawned-concurrency 0.5.0", "thiserror 2.0.18", "tokio", + "tokio-util", "tracing", ] diff --git a/crates/blockchain/Cargo.toml b/crates/blockchain/Cargo.toml index d3370e49..65c6ecf2 100644 --- a/crates/blockchain/Cargo.toml +++ b/crates/blockchain/Cargo.toml @@ -22,6 +22,7 @@ ethlambda-types.workspace = true spawned-concurrency.workspace = true tokio.workspace = true +tokio-util = { version = "0.7", default-features = false } rayon.workspace = true thiserror.workspace = true diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 9ee4ee8e..2d38f1bb 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -1,5 +1,5 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant, SystemTime}; use ethlambda_network_api::{BlockChainToP2PRef, InitP2P}; use ethlambda_state_transition::is_proposer; @@ -12,14 +12,28 @@ use ethlambda_types::{ }; use crate::key_manager::ValidatorKeyPair; +use crate::store::{AggregatedGroupOutput, AggregationSnapshot}; use spawned_concurrency::actor; use spawned_concurrency::error::ActorError; +use spawned_concurrency::message::Message; use spawned_concurrency::protocol; use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after}; +use tokio_util::sync::CancellationToken; use tracing::{error, info, trace, warn}; use crate::store::StoreError; +/// Soft deadline for committee-signature aggregation measured from the +/// interval-2 tick. After this much wall time elapses, the actor signals the +/// worker to stop via its cancellation token. The 50 ms budget before the next +/// interval (interval 3 at +800 ms) is reserved for publishing any late-arriving +/// aggregates and for gossip propagation margin. +const AGGREGATION_DEADLINE: Duration = Duration::from_millis(750); +/// Upper bound we wait for a prior worker to exit if it is still running when +/// the next session is about to start. Reached only in pathological cases +/// (mismatched timers, stuck proofs); we warn before blocking. +const PRIOR_WORKER_JOIN_TIMEOUT: Duration = Duration::from_secs(2); + pub(crate) mod fork_choice_tree; pub mod key_manager; pub mod metrics; @@ -57,6 +71,7 @@ impl BlockChain { pending_blocks: HashMap::new(), is_aggregator, pending_block_parents: HashMap::new(), + current_aggregation: None, } .start(); let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time)) @@ -98,10 +113,70 @@ pub struct BlockChainServer { /// Whether this node acts as a committee aggregator. is_aggregator: bool, + + /// In-flight committee-signature aggregation, if any. Present only while a + /// worker started at the most recent interval 2 is still running or until + /// the next interval 2 takes over. + current_aggregation: Option, +} + +/// Tracks an in-flight off-thread aggregation worker so the actor can cancel, +/// join, and correlate incoming result messages with the right session. +struct AggregationSession { + /// Slot at which this session was started; used as a fencing id so we can + /// drop late-arriving messages from a prior session. + session_id: u64, + /// Child of the actor cancellation token; fires either at the deadline or + /// when the actor itself is stopping. + cancel: CancellationToken, + /// Handle to the `spawn_blocking` worker. Held so `stopped()` / new-session + /// start can await completion. + worker: tokio::task::JoinHandle<()>, + /// Kept alive so the timer is implicitly cancelled when the field is + /// replaced or the actor stops (see `spawned_concurrency::tasks::time`). + _deadline_timer: spawned_concurrency::tasks::TimerHandle, +} + +// ------------------------------------------------------------------------- +// Worker → actor messages and deadline self-message +// ------------------------------------------------------------------------- + +/// Fire-and-forget message from the aggregation worker: one successfully +/// produced aggregate. +pub struct AggregateProduced { + session_id: u64, + output: AggregatedGroupOutput, +} +impl Message for AggregateProduced { + type Result = (); +} + +/// Fire-and-forget message from the aggregation worker, sent after the worker +/// loop exits (either by running out of jobs or on cancellation). +pub struct AggregationDone { + session_id: u64, + groups_considered: usize, + groups_aggregated: usize, + total_raw_sigs: usize, + total_children: usize, + total_elapsed: Duration, + cancelled: bool, +} +impl Message for AggregationDone { + type Result = (); +} + +/// Self-message scheduled via `send_after` at interval-2 start. Fires the +/// worker's cancellation token so no new `try_aggregate` calls start. +pub struct AggregationDeadline { + session_id: u64, +} +impl Message for AggregationDeadline { + type Result = (); } impl BlockChainServer { - fn on_tick(&mut self, timestamp_ms: u64) { + async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context) { let genesis_time_ms = self.store.config().genesis_time * 1000; // Calculate current slot and interval from milliseconds @@ -127,19 +202,17 @@ impl BlockChainServer { .flatten(); // Tick the store first - this accepts attestations at interval 0 if we have a proposal - let new_aggregates = store::on_tick( + store::on_tick( &mut self.store, timestamp_ms, proposer_validator_id.is_some(), - self.is_aggregator, ); - if let Some(ref p2p) = self.p2p { - for aggregate in new_aggregates { - let _ = p2p - .publish_aggregated_attestation(aggregate) - .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); - } + // Interval 2 on an aggregator: kick off the off-thread aggregation worker. + // The actor continues processing other messages while the worker runs; + // results stream back as `AggregateProduced` messages. + if interval == 2 && self.is_aggregator { + self.start_aggregation_session(slot, ctx).await; } // Now build and publish the block (after attestations have been accepted) @@ -158,6 +231,63 @@ impl BlockChainServer { metrics::update_head_slot(self.store.head_slot()); } + /// Kick off a committee-signature aggregation session: + /// 1. If a prior session is still running (pathological), warn and join it. + /// 2. Snapshot the aggregation inputs from the store. + /// 3. Spawn a `spawn_blocking` worker that streams results back as messages. + /// 4. Schedule the `AggregationDeadline` self-message at +750 ms. + async fn start_aggregation_session(&mut self, slot: u64, ctx: &Context) { + if let Some(prior) = self.current_aggregation.take() { + prior.cancel.cancel(); + if !prior.worker.is_finished() { + warn!( + prior_session_id = prior.session_id, + new_session_id = slot, + "Prior aggregation worker still running at next session start; joining before proceeding" + ); + } + match tokio::time::timeout(PRIOR_WORKER_JOIN_TIMEOUT, prior.worker).await { + Ok(Ok(())) => {} + Ok(Err(err)) => warn!(?err, "Prior aggregation worker task ended abnormally"), + Err(_) => warn!( + timeout_secs = PRIOR_WORKER_JOIN_TIMEOUT.as_secs(), + "Timed out joining prior aggregation worker" + ), + } + } + + let Some(snapshot) = store::snapshot_aggregation_inputs(&self.store) else { + // No gossip sigs and no pending payloads — nothing to aggregate this slot. + return; + }; + + let session_id = slot; + // Independent token per session. Shutdown propagates via our + // #[stopped] hook which cancels any current session; the deadline + // timer cancels this specific session at +AGGREGATION_DEADLINE. + let cancel = CancellationToken::new(); + let actor_ref = ctx.actor_ref(); + + let worker_cancel = cancel.clone(); + let worker_actor = actor_ref.clone(); + let worker = tokio::task::spawn_blocking(move || { + run_aggregation_worker(snapshot, worker_actor, worker_cancel, session_id); + }); + + let deadline_timer = send_after( + AGGREGATION_DEADLINE, + ctx.clone(), + AggregationDeadline { session_id }, + ); + + self.current_aggregation = Some(AggregationSession { + session_id, + cancel, + worker, + _deadline_timer: deadline_timer, + }); + } + /// Returns the validator ID if any of our validators is the proposer for this slot. fn get_our_proposer(&self, slot: u64) -> Option { let head_state = self.store.head_state(); @@ -497,7 +627,7 @@ impl BlockChainServer { let timestamp = SystemTime::UNIX_EPOCH .elapsed() .expect("already past the unix epoch"); - self.on_tick(timestamp.as_millis() as u64); + self.on_tick(timestamp.as_millis() as u64, ctx).await; // Schedule the next tick at the next 800ms interval boundary let ms_since_epoch = timestamp.as_millis() as u64; let ms_to_next_interval = @@ -508,6 +638,32 @@ impl BlockChainServer { block_chain_protocol::Tick, ); } + + /// Actor lifecycle hook: wait for any in-flight aggregation worker to exit + /// before the actor is fully stopped. The worker's cancellation token is a + /// child of the actor's cancellation token, so by the time `stopped` runs + /// the worker has already been asked to exit; we only wait for its current + /// `try_aggregate` call to finish (bounded at PRIOR_WORKER_JOIN_TIMEOUT). + #[stopped] + async fn on_stopped(&mut self, _ctx: &Context) { + let Some(session) = self.current_aggregation.take() else { + return; + }; + session.cancel.cancel(); + match tokio::time::timeout(PRIOR_WORKER_JOIN_TIMEOUT, session.worker).await { + Ok(Ok(())) => { + info!( + session_id = session.session_id, + "Aggregation worker joined on shutdown" + ); + } + Ok(Err(err)) => warn!(?err, "Aggregation worker task ended abnormally on shutdown"), + Err(_) => warn!( + timeout_secs = PRIOR_WORKER_JOIN_TIMEOUT.as_secs(), + "Timed out joining aggregation worker on shutdown" + ), + } + } } // --- Manual Handler impls for network-api messages --- @@ -540,3 +696,119 @@ impl Handler for BlockChainServer { self.on_gossip_aggregated_attestation(msg.attestation); } } + +// ------------------------------------------------------------------------- +// Aggregation message handlers (worker → actor, actor → self for deadline) +// ------------------------------------------------------------------------- + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: AggregateProduced, _ctx: &Context) { + // Drop results from a prior session (or from an unexpected late worker). + // Current session may be None if the actor already cleaned it up; accept + // the message only when ids match. + let current = self.current_aggregation.as_ref().map(|s| s.session_id); + if current != Some(msg.session_id) { + trace!( + incoming_session_id = msg.session_id, + current_session_id = ?current, + "Dropping stale aggregate produced for non-current session" + ); + return; + } + + store::apply_aggregated_group(&mut self.store, &msg.output); + + if let Some(ref p2p) = self.p2p { + let aggregate = SignedAggregatedAttestation { + data: msg.output.hashed.data().clone(), + proof: msg.output.proof, + }; + let _ = p2p + .publish_aggregated_attestation(aggregate) + .inspect_err(|err| error!(%err, "Failed to publish aggregated attestation")); + } + } +} + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: AggregationDone, _ctx: &Context) { + // Record the metric observation here: the histogram measures one + // aggregation session end-to-end (snapshot to worker exit). + metrics::observe_committee_signatures_aggregation(msg.total_elapsed); + + let aggregation_elapsed = msg.total_elapsed; + info!( + ?aggregation_elapsed, + session_id = msg.session_id, + groups_considered = msg.groups_considered, + groups_aggregated = msg.groups_aggregated, + total_raw_sigs = msg.total_raw_sigs, + total_children = msg.total_children, + cancelled = msg.cancelled, + aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64, + "Committee signatures aggregated" + ); + } +} + +impl Handler for BlockChainServer { + async fn handle(&mut self, msg: AggregationDeadline, _ctx: &Context) { + if let Some(session) = &self.current_aggregation + && session.session_id == msg.session_id + { + session.cancel.cancel(); + } + } +} + +// ------------------------------------------------------------------------- +// Worker loop — runs on a `spawn_blocking` thread, no store access. +// ------------------------------------------------------------------------- + +fn run_aggregation_worker( + snapshot: AggregationSnapshot, + actor: ActorRef, + cancel: CancellationToken, + session_id: u64, +) { + let start = Instant::now(); + let groups_considered = snapshot.groups_considered; + let mut groups_aggregated = 0usize; + let mut total_raw_sigs = 0usize; + let mut total_children = 0usize; + + for job in snapshot.jobs { + if cancel.is_cancelled() { + break; + } + + let job_raw_sigs = job.raw_ids.len(); + let job_children = job.children.len(); + + let Some(output) = store::aggregate_job(job) else { + continue; + }; + + groups_aggregated += 1; + total_raw_sigs += job_raw_sigs; + total_children += job_children; + + if actor + .send(AggregateProduced { session_id, output }) + .is_err() + { + // Actor is gone; no point producing more. + break; + } + } + + let _ = actor.send(AggregationDone { + session_id, + groups_considered, + groups_aggregated, + total_raw_sigs, + total_children, + total_elapsed: start.elapsed(), + cancelled: cancel.is_cancelled(), + }); +} diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index e59c5b9a..c56ba042 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -519,6 +519,16 @@ pub fn time_committee_signatures_aggregation() -> TimingGuard { TimingGuard::new(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS) } +/// Observe a committee signatures aggregation duration directly. +/// +/// Used by the off-thread aggregation worker: the duration is tracked in the +/// worker and reported back to the actor via an `AggregationDone` message, +/// where we observe it here rather than using a drop-guard across a thread +/// boundary. +pub fn observe_committee_signatures_aggregation(elapsed: std::time::Duration) { + LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS.observe(elapsed.as_secs_f64()); +} + /// Update a table byte size gauge. pub fn update_table_bytes(table_name: &str, bytes: u64) { LEAN_TABLE_BYTES diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 04d3c0c6..f53b8866 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -11,7 +11,10 @@ use ethlambda_types::{ AggregatedAttestation, AggregationBits, Attestation, AttestationData, HashedAttestationData, SignedAggregatedAttestation, SignedAttestation, validator_indices, }, - block::{AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody, SignedBlock}, + block::{ + AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody, ByteListMiB, + SignedBlock, + }, checkpoint::Checkpoint, primitives::{H256, HashTreeRoot as _}, signature::{ValidatorPublicKey, ValidatorSignature}, @@ -121,47 +124,71 @@ fn update_safe_target(store: &mut Store) { store.set_safe_target(safe_target); } -/// Aggregate committee signatures at interval 2 using mixed aggregation. -/// -/// Iterates over the union of attestation data with gossip signatures OR pending -/// new payloads (`new.keys() | gossip_sigs.keys()` in the spec). For each entry: -/// -/// 1. **Selects** existing proofs from new/known payload buffers (greedy set-cover) -/// 2. **Fills** uncovered validators with raw gossip signatures -/// 3. **Aggregates** both children proofs and raw signatures in a single `xmss_aggregate` call +/// A single pre-prepared aggregation group. /// -/// This matches the spec's incremental proof-building strategy: previous proofs -/// are fed as children so only genuinely new signatures are aggregated from scratch, -/// keeping proof trees shallow and avoiding redundant cryptographic work. -/// -/// Results are inserted into the new (pending) payload buffer. They become -/// fork-choice-active after `accept_new_attestations` promotes them to known -/// at interval 0 (with proposal) or interval 4. -fn aggregate_committee_signatures(store: &mut Store) -> Vec { +/// Built on the actor thread from a store snapshot; consumed by an off-thread +/// worker that only needs to run the expensive `aggregate_mixed` call. Holding +/// this struct requires no store access. +pub struct AggregationJob { + pub hashed: HashedAttestationData, + pub data_root: H256, + pub slot: u64, + /// Pre-resolved `(participant_pubkeys, proof_data)` pairs for children + /// selected via greedy coverage. + pub children: Vec<(Vec, ByteListMiB)>, + pub accepted_child_ids: Vec, + pub raw_pubkeys: Vec, + pub raw_sigs: Vec, + pub raw_ids: Vec, + /// Gossip-signature keys to delete on successful aggregation. + pub keys_to_delete: Vec<(u64, H256)>, +} + +/// All input needed to run a session of committee-signature aggregation off-thread. +pub struct AggregationSnapshot { + pub jobs: Vec, + pub groups_considered: usize, +} + +/// Result of one successful aggregation group. Carried back to the actor thread +/// as a message payload so the store can be updated and gossip publish fired. +pub struct AggregatedGroupOutput { + pub hashed: HashedAttestationData, + pub proof: AggregatedSignatureProof, + pub participants: Vec, + pub raw_sigs_count: usize, + pub children_count: usize, + pub keys_to_delete: Vec<(u64, H256)>, +} + +/// Build a snapshot of everything needed to aggregate. Runs on the actor +/// thread, touches the store, does no heavy cryptography. Returns `None` when +/// there is nothing to aggregate so callers can avoid spawning an empty worker. +pub fn snapshot_aggregation_inputs(store: &Store) -> Option { let gossip_groups = store.iter_gossip_signatures(); let new_payload_keys = store.new_payload_keys(); if gossip_groups.is_empty() && new_payload_keys.is_empty() { - return Vec::new(); + return None; } - let _timing = metrics::time_committee_signatures_aggregation(); - - let mut new_aggregates: Vec = Vec::new(); let head_state = store.head_state(); let validators = &head_state.validators; - let mut keys_to_delete: Vec<(u64, H256)> = Vec::new(); - let mut payload_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new(); - let gossip_roots: HashSet = gossip_groups .iter() .map(|(hashed, _)| hashed.root()) .collect(); + let groups_considered = gossip_groups.len() + + new_payload_keys + .iter() + .filter(|(root, _)| !gossip_roots.contains(root)) + .count(); + + let mut jobs = Vec::with_capacity(groups_considered); + // --- Pass 1: attestation data with gossip signatures --- - // - // Each entry may also have existing proofs (new/known) that become children. for (hashed, validator_sigs) in &gossip_groups { let data_root = hashed.root(); let slot = hashed.data().slot; @@ -169,11 +196,9 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec Vec = validator_sigs + .iter() + .map(|(vid, _)| (*vid, data_root)) + .collect(); - new_aggregates.push(SignedAggregatedAttestation { - data: hashed.data().clone(), - proof: proof.clone(), + jobs.push(AggregationJob { + hashed: hashed.clone(), + data_root, + slot, + children, + accepted_child_ids, + raw_pubkeys, + raw_sigs, + raw_ids, + keys_to_delete, }); - payload_entries.push((hashed.clone(), proof)); - - // Delete all gossip sigs for this data: raw ones were consumed, - // covered ones are redundant (already captured in child proofs). - keys_to_delete.extend(validator_sigs.iter().map(|(vid, _)| (*vid, data_root))); - - metrics::inc_pq_sig_aggregated_signatures(); - metrics::inc_pq_sig_attestations_in_aggregated_signatures(all_ids.len() as u64); } // --- Pass 2: attestation data with new payloads but no gossip signatures --- - // - // Matches the `new.keys()` part of the spec's `new.keys() | gossip_sigs.keys()`. - // These entries have 0 raw signatures; they're only aggregated if 2+ existing - // proofs can be merged into one (pure recursive aggregation). for (data_root, att_data) in &new_payload_keys { if gossip_roots.contains(data_root) { continue; } - // Short-circuit: avoid cloning proofs when there aren't enough to merge. if store.proof_count_for_data(data_root) < 2 { continue; } @@ -241,61 +255,41 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec, - raw_sigs: Vec, - raw_ids: &[u64], - data_root: &H256, - slot: u64, - head_state: &State, -) -> Option<(AggregatedSignatureProof, Vec)> { - let validators = &head_state.validators; - - // Resolve each child's participant pubkeys. Skip children whose pubkeys - // can't be fully resolved: passing fewer pubkeys than the proof expects - // would produce an invalid aggregate. - let mut children_for_aggregation = Vec::with_capacity(child_proofs.len()); + validators: &[ethlambda_types::state::Validator], +) -> (Vec<(Vec, ByteListMiB)>, Vec) { + let mut children = Vec::with_capacity(child_proofs.len()); let mut accepted_child_ids: Vec = Vec::new(); + for proof in child_proofs { let participant_ids: Vec = proof.participant_indices().collect(); let child_pubkeys: Vec = participant_ids @@ -311,38 +305,66 @@ fn try_aggregate( continue; } accepted_child_ids.extend(&participant_ids); - children_for_aggregation.push((child_pubkeys, proof.proof_data.clone())); + children.push((child_pubkeys, proof.proof_data.clone())); } - // Re-check after potentially dropping children with unresolvable pubkeys. - if raw_ids.is_empty() && children_for_aggregation.len() < 2 { + (children, accepted_child_ids) +} + +/// Run the expensive `aggregate_mixed` call for a single prepared job. +/// +/// Pure function — no store access, safe to call from a `tokio::task::spawn_blocking` +/// worker. Returns `None` on cryptographic failure. +pub fn aggregate_job(job: AggregationJob) -> Option { + if job.raw_ids.is_empty() && job.children.len() < 2 { return None; } - let slot_u32: u32 = slot.try_into().expect("slot exceeds u32"); + let slot_u32: u32 = job.slot.try_into().expect("slot exceeds u32"); + let raw_sigs_count = job.raw_ids.len(); + let children_count = job.children.len(); + let proof_data = { let _timing = metrics::time_pq_sig_aggregated_signatures_building(); aggregate_mixed( - children_for_aggregation, - raw_pubkeys, - raw_sigs, - data_root, + job.children, + job.raw_pubkeys, + job.raw_sigs, + &job.data_root, slot_u32, ) } .inspect_err(|err| warn!(%err, "Failed to aggregate committee signatures")) .ok()?; - let mut all_ids: Vec = raw_ids.to_vec(); - all_ids.extend(&accepted_child_ids); - all_ids.sort_unstable(); - all_ids.dedup(); + let mut participants: Vec = job.raw_ids; + participants.extend(&job.accepted_child_ids); + participants.sort_unstable(); + participants.dedup(); + + let aggregation_bits = aggregation_bits_from_validator_indices(&participants); + + Some(AggregatedGroupOutput { + hashed: job.hashed, + proof: AggregatedSignatureProof::new(aggregation_bits, proof_data), + participants, + raw_sigs_count, + children_count, + keys_to_delete: job.keys_to_delete, + }) +} - let participants = aggregation_bits_from_validator_indices(&all_ids); - Some(( - AggregatedSignatureProof::new(participants, proof_data), - all_ids, - )) +/// Apply a worker-produced aggregate to the store: insert the new payload and +/// delete its consumed/redundant gossip signatures. Idempotent wrt the gossip +/// delete — absent keys are silently skipped. +pub fn apply_aggregated_group(store: &mut Store, output: &AggregatedGroupOutput) { + store.insert_new_aggregated_payload(output.hashed.clone(), output.proof.clone()); + metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); + store.delete_gossip_signatures(&output.keys_to_delete); + metrics::update_gossip_signatures(store.gossip_signatures_count()); + + metrics::inc_pq_sig_aggregated_signatures(); + metrics::inc_pq_sig_attestations_in_aggregated_signatures(output.participants.len() as u64); } /// Greedy set-cover selection of proofs to maximize validator coverage. @@ -463,14 +485,7 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() /// 800ms interval. Slot and interval-within-slot are derived as: /// slot = store.time() / INTERVALS_PER_SLOT /// interval = store.time() % INTERVALS_PER_SLOT -pub fn on_tick( - store: &mut Store, - timestamp_ms: u64, - has_proposal: bool, - is_aggregator: bool, -) -> Vec { - let mut new_aggregates: Vec = Vec::new(); - +pub fn on_tick(store: &mut Store, timestamp_ms: u64, has_proposal: bool) { // Convert UNIX timestamp (ms) to interval count since genesis let genesis_time_ms = store.config().genesis_time * 1000; let time_delta_ms = timestamp_ms.saturating_sub(genesis_time_ms); @@ -494,7 +509,11 @@ pub fn on_tick( let is_final_tick = store.time() == time; let should_signal_proposal = has_proposal && is_final_tick; - // NOTE: here we assume on_tick never skips intervals + // NOTE: here we assume on_tick never skips intervals. + // Interval 2 (committee-signature aggregation) is no longer handled here: + // the blockchain actor orchestrates the aggregation worker directly so + // the actor's message loop stays unblocked during the expensive XMSS + // proofs. See `BlockChainServer::start_aggregation_session` in `lib.rs`. match interval { 0 => { // Start of slot - process attestations if proposal exists @@ -506,10 +525,7 @@ pub fn on_tick( // Vote propagation — no action } 2 => { - // Aggregation interval - if is_aggregator { - new_aggregates.extend(aggregate_committee_signatures(store)); - } + // Aggregation is driven by the actor (off-thread); nothing to do here. } 3 => { // Update safe target for validators @@ -522,8 +538,6 @@ pub fn on_tick( _ => unreachable!("slots only have 5 intervals"), } } - - new_aggregates } /// Process a gossiped attestation with signature verification. @@ -980,7 +994,7 @@ fn get_proposal_head(store: &mut Store, slot: u64) -> H256 { let slot_time_ms = store.config().genesis_time * 1000 + slot * MILLISECONDS_PER_SLOT; // Advance time to current slot (ticking intervals) - on_tick(store, slot_time_ms, true, false); + on_tick(store, slot_time_ms, true); // Process any pending attestations before proposal accept_new_attestations(store, false); diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index e189d49d..b02b2f8b 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -81,7 +81,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; // NOTE: the has_proposal argument is set to true, following the spec - store::on_tick(&mut store, block_time_ms, true, false); + store::on_tick(&mut store, block_time_ms, true); let result = store::on_block_without_verification(&mut store, signed_block); match (result.is_ok(), step.valid) { @@ -106,7 +106,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { "tick" => { let timestamp_ms = step.time.expect("tick step missing time") * 1000; // NOTE: the has_proposal argument is set to false, following the spec - store::on_tick(&mut store, timestamp_ms, false, false); + store::on_tick(&mut store, timestamp_ms, false); } "attestation" => { let att_data = step diff --git a/crates/blockchain/tests/signature_spectests.rs b/crates/blockchain/tests/signature_spectests.rs index c136590c..e7c1a888 100644 --- a/crates/blockchain/tests/signature_spectests.rs +++ b/crates/blockchain/tests/signature_spectests.rs @@ -51,7 +51,7 @@ fn run(path: &Path) -> datatest_stable::Result<()> { // Advance time to the block's slot let block_time_ms = genesis_time * 1000 + signed_block.message.slot * MILLISECONDS_PER_SLOT; - store::on_tick(&mut st, block_time_ms, true, false); + store::on_tick(&mut st, block_time_ms, true); // Process the block (this includes signature verification) let result = store::on_block(&mut st, signed_block); From 0175fbf86849d43903a2115d079b2ee4f667a228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 16 Apr 2026 18:42:36 -0300 Subject: [PATCH 2/2] refactor: simplify --- crates/blockchain/src/lib.rs | 33 ++---- crates/blockchain/src/metrics.rs | 14 +-- crates/blockchain/src/store.rs | 188 +++++++++++++++---------------- 3 files changed, 105 insertions(+), 130 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 2d38f1bb..aacdc762 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -137,13 +137,8 @@ struct AggregationSession { _deadline_timer: spawned_concurrency::tasks::TimerHandle, } -// ------------------------------------------------------------------------- -// Worker → actor messages and deadline self-message -// ------------------------------------------------------------------------- - -/// Fire-and-forget message from the aggregation worker: one successfully -/// produced aggregate. -pub struct AggregateProduced { +/// One successful aggregate streamed back from the worker. +pub(crate) struct AggregateProduced { session_id: u64, output: AggregatedGroupOutput, } @@ -151,9 +146,8 @@ impl Message for AggregateProduced { type Result = (); } -/// Fire-and-forget message from the aggregation worker, sent after the worker -/// loop exits (either by running out of jobs or on cancellation). -pub struct AggregationDone { +/// Emitted by the worker after its loop exits (completion or cancellation). +pub(crate) struct AggregationDone { session_id: u64, groups_considered: usize, groups_aggregated: usize, @@ -166,9 +160,9 @@ impl Message for AggregationDone { type Result = (); } -/// Self-message scheduled via `send_after` at interval-2 start. Fires the -/// worker's cancellation token so no new `try_aggregate` calls start. -pub struct AggregationDeadline { +/// Self-message scheduled via `send_after` at interval-2 start. Cancels the +/// session's token so the worker stops starting new aggregations. +pub(crate) struct AggregationDeadline { session_id: u64, } impl Message for AggregationDeadline { @@ -208,9 +202,6 @@ impl BlockChainServer { proposer_validator_id.is_some(), ); - // Interval 2 on an aggregator: kick off the off-thread aggregation worker. - // The actor continues processing other messages while the worker runs; - // results stream back as `AggregateProduced` messages. if interval == 2 && self.is_aggregator { self.start_aggregation_session(slot, ctx).await; } @@ -640,10 +631,9 @@ impl BlockChainServer { } /// Actor lifecycle hook: wait for any in-flight aggregation worker to exit - /// before the actor is fully stopped. The worker's cancellation token is a - /// child of the actor's cancellation token, so by the time `stopped` runs - /// the worker has already been asked to exit; we only wait for its current - /// `try_aggregate` call to finish (bounded at PRIOR_WORKER_JOIN_TIMEOUT). + /// before the actor is fully stopped. We cancel the session's token and + /// wait up to PRIOR_WORKER_JOIN_TIMEOUT for the worker's current + /// `aggregate_job` call to finish (the proof itself cannot be interrupted). #[stopped] async fn on_stopped(&mut self, _ctx: &Context) { let Some(session) = self.current_aggregation.take() else { @@ -732,8 +722,7 @@ impl Handler for BlockChainServer { impl Handler for BlockChainServer { async fn handle(&mut self, msg: AggregationDone, _ctx: &Context) { - // Record the metric observation here: the histogram measures one - // aggregation session end-to-end (snapshot to worker exit). + store::finalize_aggregation_session(&self.store); metrics::observe_committee_signatures_aggregation(msg.total_elapsed); let aggregation_elapsed = msg.total_elapsed; diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index c56ba042..aa6e195d 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -514,17 +514,9 @@ pub fn time_pq_sig_aggregated_signatures_verification() -> TimingGuard { TimingGuard::new(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS) } -/// Start timing committee signatures aggregation. Records duration when the guard is dropped. -pub fn time_committee_signatures_aggregation() -> TimingGuard { - TimingGuard::new(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS) -} - -/// Observe a committee signatures aggregation duration directly. -/// -/// Used by the off-thread aggregation worker: the duration is tracked in the -/// worker and reported back to the actor via an `AggregationDone` message, -/// where we observe it here rather than using a drop-guard across a thread -/// boundary. +/// Observe committee-signature aggregation duration. Measured in the +/// off-thread worker and reported back via an `AggregationDone` message, so a +/// drop-guard that crosses the thread boundary is not appropriate here. pub fn observe_committee_signatures_aggregation(elapsed: std::time::Duration) { LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS.observe(elapsed.as_secs_f64()); } diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index f53b8866..ec90383a 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -130,35 +130,32 @@ fn update_safe_target(store: &mut Store) { /// worker that only needs to run the expensive `aggregate_mixed` call. Holding /// this struct requires no store access. pub struct AggregationJob { - pub hashed: HashedAttestationData, - pub data_root: H256, - pub slot: u64, + pub(crate) hashed: HashedAttestationData, + pub(crate) slot: u64, /// Pre-resolved `(participant_pubkeys, proof_data)` pairs for children /// selected via greedy coverage. - pub children: Vec<(Vec, ByteListMiB)>, - pub accepted_child_ids: Vec, - pub raw_pubkeys: Vec, - pub raw_sigs: Vec, - pub raw_ids: Vec, + pub(crate) children: Vec<(Vec, ByteListMiB)>, + pub(crate) accepted_child_ids: Vec, + pub(crate) raw_pubkeys: Vec, + pub(crate) raw_sigs: Vec, + pub(crate) raw_ids: Vec, /// Gossip-signature keys to delete on successful aggregation. - pub keys_to_delete: Vec<(u64, H256)>, + pub(crate) keys_to_delete: Vec<(u64, H256)>, } /// All input needed to run a session of committee-signature aggregation off-thread. pub struct AggregationSnapshot { - pub jobs: Vec, - pub groups_considered: usize, + pub(crate) jobs: Vec, + pub(crate) groups_considered: usize, } /// Result of one successful aggregation group. Carried back to the actor thread /// as a message payload so the store can be updated and gossip publish fired. pub struct AggregatedGroupOutput { - pub hashed: HashedAttestationData, - pub proof: AggregatedSignatureProof, - pub participants: Vec, - pub raw_sigs_count: usize, - pub children_count: usize, - pub keys_to_delete: Vec<(u64, H256)>, + pub(crate) hashed: HashedAttestationData, + pub(crate) proof: AggregatedSignatureProof, + pub(crate) participants: Vec, + pub(crate) keys_to_delete: Vec<(u64, H256)>, } /// Build a snapshot of everything needed to aggregate. Runs on the actor @@ -188,95 +185,88 @@ pub fn snapshot_aggregation_inputs(store: &Store) -> Option let mut jobs = Vec::with_capacity(groups_considered); - // --- Pass 1: attestation data with gossip signatures --- + // Pass 1: attestation data with gossip signatures (may also reuse existing proofs as children). for (hashed, validator_sigs) in &gossip_groups { - let data_root = hashed.root(); - let slot = hashed.data().slot; - - let (new_proofs, known_proofs) = store.existing_proofs_for_data(&data_root); - let (child_proofs, covered) = select_proofs_greedily(&new_proofs, &known_proofs); - - let mut raw_sigs = Vec::new(); - let mut raw_pubkeys = Vec::new(); - let mut raw_ids = Vec::new(); - for (vid, sig) in validator_sigs { - if covered.contains(vid) { - continue; - } - let Some(validator) = validators.get(*vid as usize) else { - continue; - }; - let Ok(pubkey) = validator.get_attestation_pubkey() else { - continue; - }; - raw_sigs.push(sig.clone()); - raw_pubkeys.push(pubkey); - raw_ids.push(*vid); - } - - let (children, accepted_child_ids) = resolve_child_pubkeys(&child_proofs, validators); - - if raw_ids.is_empty() && children.len() < 2 { - continue; + if let Some(job) = build_job(store, validators, hashed.clone(), Some(validator_sigs)) { + jobs.push(job); } - - let keys_to_delete: Vec<(u64, H256)> = validator_sigs - .iter() - .map(|(vid, _)| (*vid, data_root)) - .collect(); - - jobs.push(AggregationJob { - hashed: hashed.clone(), - data_root, - slot, - children, - accepted_child_ids, - raw_pubkeys, - raw_sigs, - raw_ids, - keys_to_delete, - }); } - // --- Pass 2: attestation data with new payloads but no gossip signatures --- + // Pass 2: attestation data with pending proofs but no gossip signatures — pure recursive merge. for (data_root, att_data) in &new_payload_keys { if gossip_roots.contains(data_root) { continue; } - + // Cheap pre-check to skip the expensive `existing_proofs_for_data` clone when + // fewer than 2 proofs are present (merge needs at least 2). if store.proof_count_for_data(data_root) < 2 { continue; } - - let (new_proofs, known_proofs) = store.existing_proofs_for_data(data_root); - let (child_proofs, _covered) = select_proofs_greedily(&new_proofs, &known_proofs); - - if child_proofs.len() < 2 { - continue; + let hashed = HashedAttestationData::new(att_data.clone()); + if let Some(job) = build_job(store, validators, hashed, None) { + jobs.push(job); } + } - let (children, accepted_child_ids) = resolve_child_pubkeys(&child_proofs, validators); + Some(AggregationSnapshot { + jobs, + groups_considered, + }) +} - if children.len() < 2 { +/// Build one `AggregationJob` for a given attestation data. Returns `None` when +/// there is not enough material for a viable aggregation (no raw sigs and fewer +/// than two children). `validator_sigs` is `None` for Pass 2 (payload-only). +fn build_job( + store: &Store, + validators: &[ethlambda_types::state::Validator], + hashed: HashedAttestationData, + validator_sigs: Option<&[(u64, ValidatorSignature)]>, +) -> Option { + let data_root = hashed.root(); + let (new_proofs, known_proofs) = store.existing_proofs_for_data(&data_root); + let (child_proofs, covered) = select_proofs_greedily(&new_proofs, &known_proofs); + + let mut raw_sigs = Vec::new(); + let mut raw_pubkeys = Vec::new(); + let mut raw_ids = Vec::new(); + for (vid, sig) in validator_sigs.into_iter().flatten() { + if covered.contains(vid) { continue; } + let Some(validator) = validators.get(*vid as usize) else { + continue; + }; + let Ok(pubkey) = validator.get_attestation_pubkey() else { + continue; + }; + raw_sigs.push(sig.clone()); + raw_pubkeys.push(pubkey); + raw_ids.push(*vid); + } - jobs.push(AggregationJob { - hashed: HashedAttestationData::new(att_data.clone()), - data_root: *data_root, - slot: att_data.slot, - children, - accepted_child_ids, - raw_pubkeys: Vec::new(), - raw_sigs: Vec::new(), - raw_ids: Vec::new(), - keys_to_delete: Vec::new(), - }); + let (children, accepted_child_ids) = resolve_child_pubkeys(&child_proofs, validators); + + if raw_ids.is_empty() && children.len() < 2 { + return None; } - Some(AggregationSnapshot { - jobs, - groups_considered, + let keys_to_delete: Vec<(u64, H256)> = validator_sigs + .into_iter() + .flatten() + .map(|(vid, _)| (*vid, data_root)) + .collect(); + + let slot = hashed.data().slot; + Some(AggregationJob { + hashed, + slot, + children, + accepted_child_ids, + raw_pubkeys, + raw_sigs, + raw_ids, + keys_to_delete, }) } @@ -321,8 +311,7 @@ pub fn aggregate_job(job: AggregationJob) -> Option { } let slot_u32: u32 = job.slot.try_into().expect("slot exceeds u32"); - let raw_sigs_count = job.raw_ids.len(); - let children_count = job.children.len(); + let data_root = job.hashed.root(); let proof_data = { let _timing = metrics::time_pq_sig_aggregated_signatures_building(); @@ -330,7 +319,7 @@ pub fn aggregate_job(job: AggregationJob) -> Option { job.children, job.raw_pubkeys, job.raw_sigs, - &job.data_root, + &data_root, slot_u32, ) } @@ -348,25 +337,30 @@ pub fn aggregate_job(job: AggregationJob) -> Option { hashed: job.hashed, proof: AggregatedSignatureProof::new(aggregation_bits, proof_data), participants, - raw_sigs_count, - children_count, keys_to_delete: job.keys_to_delete, }) } -/// Apply a worker-produced aggregate to the store: insert the new payload and -/// delete its consumed/redundant gossip signatures. Idempotent wrt the gossip -/// delete — absent keys are silently skipped. +/// Apply a worker-produced aggregate to the store. Called per message on the +/// actor thread; gauge metrics that depend on total counts are batched into +/// `finalize_aggregation_session` so we pay one lock per session instead of +/// one per aggregate. Idempotent wrt the gossip delete. pub fn apply_aggregated_group(store: &mut Store, output: &AggregatedGroupOutput) { store.insert_new_aggregated_payload(output.hashed.clone(), output.proof.clone()); - metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); store.delete_gossip_signatures(&output.keys_to_delete); - metrics::update_gossip_signatures(store.gossip_signatures_count()); metrics::inc_pq_sig_aggregated_signatures(); metrics::inc_pq_sig_attestations_in_aggregated_signatures(output.participants.len() as u64); } +/// End-of-session gauge refresh. Called once after the worker finishes so the +/// `lean_latest_new_aggregated_payloads` and `lean_gossip_signatures` gauges +/// settle on the final counts instead of being churned per aggregate. +pub fn finalize_aggregation_session(store: &Store) { + metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); + metrics::update_gossip_signatures(store.gossip_signatures_count()); +} + /// Greedy set-cover selection of proofs to maximize validator coverage. /// /// Processes proof sets in priority order (new before known). Within each set,