Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/blockchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ethlambda-types.workspace = true
spawned-concurrency.workspace = true

tokio.workspace = true
tokio-util = { version = "0.7", default-features = false }

rayon.workspace = true
thiserror.workspace = true
Expand Down
283 changes: 272 additions & 11 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant, SystemTime};

use ethlambda_network_api::{BlockChainToP2PRef, InitP2P};
use ethlambda_state_transition::is_proposer;
Expand All @@ -12,14 +12,28 @@ use ethlambda_types::{
};

use crate::key_manager::ValidatorKeyPair;
use crate::store::{AggregatedGroupOutput, AggregationSnapshot};
use spawned_concurrency::actor;
use spawned_concurrency::error::ActorError;
use spawned_concurrency::message::Message;
use spawned_concurrency::protocol;
use spawned_concurrency::tasks::{Actor, ActorRef, ActorStart, Context, Handler, send_after};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, trace, warn};

use crate::store::StoreError;

/// Soft deadline for committee-signature aggregation measured from the
/// interval-2 tick. After this much wall time elapses, the actor signals the
/// worker to stop via its cancellation token. The 50 ms budget before the next
/// interval (interval 3 at +800 ms) is reserved for publishing any late-arriving
/// aggregates and for gossip propagation margin.
const AGGREGATION_DEADLINE: Duration = Duration::from_millis(750);
/// Upper bound we wait for a prior worker to exit if it is still running when
/// the next session is about to start. Reached only in pathological cases
/// (mismatched timers, stuck proofs); we warn before blocking.
const PRIOR_WORKER_JOIN_TIMEOUT: Duration = Duration::from_secs(2);

pub(crate) mod fork_choice_tree;
pub mod key_manager;
pub mod metrics;
Expand Down Expand Up @@ -57,6 +71,7 @@ impl BlockChain {
pending_blocks: HashMap::new(),
is_aggregator,
pending_block_parents: HashMap::new(),
current_aggregation: None,
}
.start();
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
Expand Down Expand Up @@ -98,10 +113,64 @@ pub struct BlockChainServer {

/// Whether this node acts as a committee aggregator.
is_aggregator: bool,

/// In-flight committee-signature aggregation, if any. Present only while a
/// worker started at the most recent interval 2 is still running or until
/// the next interval 2 takes over.
current_aggregation: Option<AggregationSession>,
}

/// Tracks an in-flight off-thread aggregation worker so the actor can cancel,
/// join, and correlate incoming result messages with the right session.
struct AggregationSession {
/// Slot at which this session was started; used as a fencing id so we can
/// drop late-arriving messages from a prior session.
session_id: u64,
/// Child of the actor cancellation token; fires either at the deadline or
/// when the actor itself is stopping.
cancel: CancellationToken,
/// Handle to the `spawn_blocking` worker. Held so `stopped()` / new-session
/// start can await completion.
worker: tokio::task::JoinHandle<()>,
/// Kept alive so the timer is implicitly cancelled when the field is
/// replaced or the actor stops (see `spawned_concurrency::tasks::time`).
_deadline_timer: spawned_concurrency::tasks::TimerHandle,
}

/// One successful aggregate streamed back from the worker.
pub(crate) struct AggregateProduced {
session_id: u64,
output: AggregatedGroupOutput,
}
impl Message for AggregateProduced {
type Result = ();
}

/// Emitted by the worker after its loop exits (completion or cancellation).
pub(crate) struct AggregationDone {
session_id: u64,
groups_considered: usize,
groups_aggregated: usize,
total_raw_sigs: usize,
total_children: usize,
total_elapsed: Duration,
cancelled: bool,
}
impl Message for AggregationDone {
type Result = ();
}

/// Self-message scheduled via `send_after` at interval-2 start. Cancels the
/// session's token so the worker stops starting new aggregations.
pub(crate) struct AggregationDeadline {
session_id: u64,
}
impl Message for AggregationDeadline {
type Result = ();
}

impl BlockChainServer {
fn on_tick(&mut self, timestamp_ms: u64) {
async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context<Self>) {
let genesis_time_ms = self.store.config().genesis_time * 1000;

// Calculate current slot and interval from milliseconds
Expand All @@ -127,19 +196,14 @@ impl BlockChainServer {
.flatten();

// Tick the store first - this accepts attestations at interval 0 if we have a proposal
let new_aggregates = store::on_tick(
store::on_tick(
&mut self.store,
timestamp_ms,
proposer_validator_id.is_some(),
self.is_aggregator,
);

if let Some(ref p2p) = self.p2p {
for aggregate in new_aggregates {
let _ = p2p
.publish_aggregated_attestation(aggregate)
.inspect_err(|err| error!(%err, "Failed to publish aggregated attestation"));
}
if interval == 2 && self.is_aggregator {
self.start_aggregation_session(slot, ctx).await;
}

// Now build and publish the block (after attestations have been accepted)
Expand All @@ -158,6 +222,63 @@ impl BlockChainServer {
metrics::update_head_slot(self.store.head_slot());
}

/// Kick off a committee-signature aggregation session:
/// 1. If a prior session is still running (pathological), warn and join it.
/// 2. Snapshot the aggregation inputs from the store.
/// 3. Spawn a `spawn_blocking` worker that streams results back as messages.
/// 4. Schedule the `AggregationDeadline` self-message at +750 ms.
async fn start_aggregation_session(&mut self, slot: u64, ctx: &Context<Self>) {
if let Some(prior) = self.current_aggregation.take() {
prior.cancel.cancel();
if !prior.worker.is_finished() {
warn!(
prior_session_id = prior.session_id,
new_session_id = slot,
"Prior aggregation worker still running at next session start; joining before proceeding"
);
}
match tokio::time::timeout(PRIOR_WORKER_JOIN_TIMEOUT, prior.worker).await {
Ok(Ok(())) => {}
Ok(Err(err)) => warn!(?err, "Prior aggregation worker task ended abnormally"),
Err(_) => warn!(
timeout_secs = PRIOR_WORKER_JOIN_TIMEOUT.as_secs(),
"Timed out joining prior aggregation worker"
),
}
}

let Some(snapshot) = store::snapshot_aggregation_inputs(&self.store) else {
// No gossip sigs and no pending payloads — nothing to aggregate this slot.
return;
};

let session_id = slot;
// Independent token per session. Shutdown propagates via our
// #[stopped] hook which cancels any current session; the deadline
// timer cancels this specific session at +AGGREGATION_DEADLINE.
let cancel = CancellationToken::new();
let actor_ref = ctx.actor_ref();

let worker_cancel = cancel.clone();
let worker_actor = actor_ref.clone();
let worker = tokio::task::spawn_blocking(move || {
run_aggregation_worker(snapshot, worker_actor, worker_cancel, session_id);
});

let deadline_timer = send_after(
AGGREGATION_DEADLINE,
ctx.clone(),
AggregationDeadline { session_id },
);

self.current_aggregation = Some(AggregationSession {
session_id,
cancel,
worker,
_deadline_timer: deadline_timer,
});
}

/// Returns the validator ID if any of our validators is the proposer for this slot.
fn get_our_proposer(&self, slot: u64) -> Option<u64> {
let head_state = self.store.head_state();
Expand Down Expand Up @@ -497,7 +618,7 @@ impl BlockChainServer {
let timestamp = SystemTime::UNIX_EPOCH
.elapsed()
.expect("already past the unix epoch");
self.on_tick(timestamp.as_millis() as u64);
self.on_tick(timestamp.as_millis() as u64, ctx).await;
// Schedule the next tick at the next 800ms interval boundary
let ms_since_epoch = timestamp.as_millis() as u64;
let ms_to_next_interval =
Expand All @@ -508,6 +629,31 @@ impl BlockChainServer {
block_chain_protocol::Tick,
);
}

/// Actor lifecycle hook: wait for any in-flight aggregation worker to exit
/// before the actor is fully stopped. We cancel the session's token and
/// wait up to PRIOR_WORKER_JOIN_TIMEOUT for the worker's current
/// `aggregate_job` call to finish (the proof itself cannot be interrupted).
#[stopped]
async fn on_stopped(&mut self, _ctx: &Context<Self>) {
let Some(session) = self.current_aggregation.take() else {
return;
};
session.cancel.cancel();
match tokio::time::timeout(PRIOR_WORKER_JOIN_TIMEOUT, session.worker).await {
Ok(Ok(())) => {
info!(
session_id = session.session_id,
"Aggregation worker joined on shutdown"
);
}
Ok(Err(err)) => warn!(?err, "Aggregation worker task ended abnormally on shutdown"),
Err(_) => warn!(
timeout_secs = PRIOR_WORKER_JOIN_TIMEOUT.as_secs(),
"Timed out joining aggregation worker on shutdown"
),
}
}
}

// --- Manual Handler impls for network-api messages ---
Expand Down Expand Up @@ -540,3 +686,118 @@ impl Handler<NewAggregatedAttestation> for BlockChainServer {
self.on_gossip_aggregated_attestation(msg.attestation);
}
}

// -------------------------------------------------------------------------
// Aggregation message handlers (worker → actor, actor → self for deadline)
// -------------------------------------------------------------------------

impl Handler<AggregateProduced> for BlockChainServer {
async fn handle(&mut self, msg: AggregateProduced, _ctx: &Context<Self>) {
// Drop results from a prior session (or from an unexpected late worker).
// Current session may be None if the actor already cleaned it up; accept
// the message only when ids match.
let current = self.current_aggregation.as_ref().map(|s| s.session_id);
if current != Some(msg.session_id) {
trace!(
incoming_session_id = msg.session_id,
current_session_id = ?current,
"Dropping stale aggregate produced for non-current session"
);
return;
}

store::apply_aggregated_group(&mut self.store, &msg.output);

if let Some(ref p2p) = self.p2p {
let aggregate = SignedAggregatedAttestation {
data: msg.output.hashed.data().clone(),
proof: msg.output.proof,
};
let _ = p2p
.publish_aggregated_attestation(aggregate)
.inspect_err(|err| error!(%err, "Failed to publish aggregated attestation"));
}
}
}

impl Handler<AggregationDone> for BlockChainServer {
async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
store::finalize_aggregation_session(&self.store);
metrics::observe_committee_signatures_aggregation(msg.total_elapsed);

let aggregation_elapsed = msg.total_elapsed;
info!(
?aggregation_elapsed,
session_id = msg.session_id,
groups_considered = msg.groups_considered,
groups_aggregated = msg.groups_aggregated,
total_raw_sigs = msg.total_raw_sigs,
total_children = msg.total_children,
cancelled = msg.cancelled,
aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
"Committee signatures aggregated"
);
}
}
Comment on lines +723 to +741
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 AggregationDone missing session-id fence

AggregateProduced guards against stale sessions (current != Some(msg.session_id) → drop), but AggregationDone has no such guard. In the documented pathological case — a prior worker that outlives its 2 s join timeout — it can arrive while session N+1 is actively aggregating. This would (a) call finalize_aggregation_session mid-session N+1, snapshotting a partially-applied gossip-signatures gauge, and (b) emit a "Committee signatures aggregated" log line attributed to the old session id, making the devnet trace misleading.

Suggested change
impl Handler<AggregationDone> for BlockChainServer {
async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
store::finalize_aggregation_session(&self.store);
metrics::observe_committee_signatures_aggregation(msg.total_elapsed);
let aggregation_elapsed = msg.total_elapsed;
info!(
?aggregation_elapsed,
session_id = msg.session_id,
groups_considered = msg.groups_considered,
groups_aggregated = msg.groups_aggregated,
total_raw_sigs = msg.total_raw_sigs,
total_children = msg.total_children,
cancelled = msg.cancelled,
aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
"Committee signatures aggregated"
);
}
}
impl Handler<AggregationDone> for BlockChainServer {
async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
let current = self.current_aggregation.as_ref().map(|s| s.session_id);
if current != Some(msg.session_id) {
trace!(
incoming_session_id = msg.session_id,
current_session_id = ?current,
"Dropping stale aggregation done for non-current session"
);
return;
}
store::finalize_aggregation_session(&self.store);
metrics::observe_committee_signatures_aggregation(msg.total_elapsed);
let aggregation_elapsed = msg.total_elapsed;
info!(
?aggregation_elapsed,
session_id = msg.session_id,
groups_considered = msg.groups_considered,
groups_aggregated = msg.groups_aggregated,
total_raw_sigs = msg.total_raw_sigs,
total_children = msg.total_children,
cancelled = msg.cancelled,
aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
"Committee signatures aggregated"
);
}
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/blockchain/src/lib.rs
Line: 723-741

Comment:
**`AggregationDone` missing session-id fence**

`AggregateProduced` guards against stale sessions (`current != Some(msg.session_id)` → drop), but `AggregationDone` has no such guard. In the documented pathological case — a prior worker that outlives its 2 s join timeout — it can arrive while session N+1 is actively aggregating. This would (a) call `finalize_aggregation_session` mid-session N+1, snapshotting a partially-applied gossip-signatures gauge, and (b) emit a "Committee signatures aggregated" log line attributed to the old session id, making the devnet trace misleading.

```suggestion
impl Handler<AggregationDone> for BlockChainServer {
    async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
        let current = self.current_aggregation.as_ref().map(|s| s.session_id);
        if current != Some(msg.session_id) {
            trace!(
                incoming_session_id = msg.session_id,
                current_session_id = ?current,
                "Dropping stale aggregation done for non-current session"
            );
            return;
        }
        store::finalize_aggregation_session(&self.store);
        metrics::observe_committee_signatures_aggregation(msg.total_elapsed);

        let aggregation_elapsed = msg.total_elapsed;
        info!(
            ?aggregation_elapsed,
            session_id = msg.session_id,
            groups_considered = msg.groups_considered,
            groups_aggregated = msg.groups_aggregated,
            total_raw_sigs = msg.total_raw_sigs,
            total_children = msg.total_children,
            cancelled = msg.cancelled,
            aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
            "Committee signatures aggregated"
        );
    }
}
```

How can I resolve this? If you propose a fix, please make it concise.


impl Handler<AggregationDeadline> for BlockChainServer {
async fn handle(&mut self, msg: AggregationDeadline, _ctx: &Context<Self>) {
if let Some(session) = &self.current_aggregation
&& session.session_id == msg.session_id
{
session.cancel.cancel();
}
}
}

// -------------------------------------------------------------------------
// Worker loop — runs on a `spawn_blocking` thread, no store access.
// -------------------------------------------------------------------------

fn run_aggregation_worker(
snapshot: AggregationSnapshot,
actor: ActorRef<BlockChainServer>,
cancel: CancellationToken,
session_id: u64,
) {
let start = Instant::now();
let groups_considered = snapshot.groups_considered;
let mut groups_aggregated = 0usize;
let mut total_raw_sigs = 0usize;
let mut total_children = 0usize;

for job in snapshot.jobs {
if cancel.is_cancelled() {
break;
}

let job_raw_sigs = job.raw_ids.len();
let job_children = job.children.len();

let Some(output) = store::aggregate_job(job) else {
continue;
};

groups_aggregated += 1;
total_raw_sigs += job_raw_sigs;
total_children += job_children;

if actor
.send(AggregateProduced { session_id, output })
.is_err()
{
// Actor is gone; no point producing more.
break;
}
}

let _ = actor.send(AggregationDone {
session_id,
groups_considered,
groups_aggregated,
total_raw_sigs,
total_children,
total_elapsed: start.elapsed(),
cancelled: cancel.is_cancelled(),
});
}
8 changes: 5 additions & 3 deletions crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,11 @@ pub fn time_pq_sig_aggregated_signatures_verification() -> TimingGuard {
TimingGuard::new(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS)
}

/// Start timing committee signatures aggregation. Records duration when the guard is dropped.
pub fn time_committee_signatures_aggregation() -> TimingGuard {
TimingGuard::new(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS)
/// Observe committee-signature aggregation duration. Measured in the
/// off-thread worker and reported back via an `AggregationDone` message, so a
/// drop-guard that crosses the thread boundary is not appropriate here.
pub fn observe_committee_signatures_aggregation(elapsed: std::time::Duration) {
LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS.observe(elapsed.as_secs_f64());
}

/// Update a table byte size gauge.
Expand Down
Loading
Loading