Skip to content

fix: make signature aggregation asynchronous#299

Open
MegaRedHand wants to merge 2 commits intomainfrom
asynchronous-signature-aggregation
Open

fix: make signature aggregation asynchronous#299
MegaRedHand wants to merge 2 commits intomainfrom
asynchronous-signature-aggregation

Conversation

@MegaRedHand
Copy link
Copy Markdown
Collaborator

Summary

  • Moves committee-signature aggregation off the BlockChainServer actor thread onto a tokio::task::spawn_blocking worker. The actor no longer blocks for the duration of the XMSS proofs (previously 400-1200 ms per slot).
  • Bounds aggregation with a CancellationToken fired by a 750 ms deadline (self-message via send_after). Aggregates produced inside the window are streamed back via AggregateProduced actor messages and published immediately; post-deadline aggregates still get applied locally and published.
  • #[stopped] lifecycle hook cancels the in-flight worker and joins it (bounded at 2 s) so node shutdown is clean; new-session start joins any straggler from the previous slot and warns if it hadn't finished yet.

Architecture

  • crates/blockchain/src/store.rs now exposes pure helpers: snapshot_aggregation_inputs, aggregate_job, apply_aggregated_group, finalize_aggregation_session. The old aggregate_committee_signatures / try_aggregate (single inline function) are gone.
  • store::on_tick no longer calls into aggregation — the actor drives it.
  • New messages (pub(crate)): AggregateProduced, AggregationDone, AggregationDeadline. Late messages are fenced by a session id (the slot number).
  • Per-aggregate work on the actor: insert into new_payloads, delete consumed gossip sigs, publish via gossipsub. End-of-session work (gauge refresh) is batched in AggregationDone to avoid 2N lock acquisitions per slot.

Observations from devnet runs

Under a 4-node devnet with three induced partitions (via docker pause) across 200 slots:

  • Finality lag: 3 slots (vs 5 under a previous synchronous cap)
  • Sessions cancelled at deadline: 24 / 203 (11.8%) — only when groups_considered > 1
  • Missed slots: 13, all inside partition windows
  • No Prior aggregation worker still running warnings — worker wind-down is clean
  • Chain recovers finalization after each partition

Test plan

  • cargo check --workspace
  • make fmt, make lint (clippy clean)
  • cargo test -p ethlambda-blockchain --test forkchoice_spectests — 77/77 pass
  • 50-slot local devnet with 2 ethlambda + 1 zeam + 1 grandine — chain finalizes normally
  • 200-slot local devnet with induced partitions — consensus safety preserved, liveness recovers between partitions
  • Review on CI

@github-actions
Copy link
Copy Markdown

🤖 Claude Code Review

Now I have a full picture of the PR. Let me write up the review.


PR #299fix: make signature aggregation asynchronous

Overview

This PR moves committee-signature aggregation (400–1200 ms XMSS proofs) off the BlockChainServer actor thread onto a tokio::task::spawn_blocking worker. The actor now stays unblocked during the expensive cryptographic work. A 750 ms soft deadline cancels the token so the worker stops starting new groups; already-running proofs complete naturally. Results are streamed back via actor messages (AggregateProduced, AggregationDone) and fenced by a slot-number session ID.

The refactor is architecturally sound. The extraction of pure functions in store.rs (snapshot_aggregation_inputs, aggregate_job, apply_aggregated_group, finalize_aggregation_session) is clean and the store's threading contract is preserved.


Bug: AggregationDone lacks a session-ID fence

lib.rs, Handler<AggregationDone> — unlike AggregateProduced, this handler has no guard against stale messages from a prior session:

impl Handler<AggregationDone> for BlockChainServer {
    async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
        store::finalize_aggregation_session(&self.store);          // no session check
        metrics::observe_committee_signatures_aggregation(msg.total_elapsed); // ditto
        info!(..., session_id = msg.session_id, ...);
    }
}

Scenario that triggers it:

  • Session N's worker hits the deadline, gets cancelled but is slow finishing its last XMSS proof.
  • start_aggregation_session times out on join, drops the handle, and session N+1 starts.
  • Session N+1 produces and applies its aggregates normally.
  • Late AggregationDone from session N arrives.
    • finalize_aggregation_session fires a second time, mid-session-N+1 — metrics gauges are refreshed prematurely but at least reflect current counts, so no data corruption.
    • observe_committee_signatures_aggregation records session N's elapsed time into the histogram while session N+1 is still running → one spurious extra histogram entry, distorting the aggregation-time metric.
    • The info! log prints session N's stats labelled with N's session_id, with no indication it's stale.

Suggested fix — add the same guard that AggregateProduced uses:

async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
    let current = self.current_aggregation.as_ref().map(|s| s.session_id);
    if current != Some(msg.session_id) {
        return;
    }
    // ... rest unchanged
}

Performance: per-aggregate RocksDB writes replace a single batch

store.rs, apply_aggregated_group writes one payload per AggregateProduced message:

store.insert_new_aggregated_payload(output.hashed.clone(), output.proof.clone());

The original aggregate_committee_signatures called insert_new_aggregated_payloads_batch once for all groups in a slot. With N committees there are now N separate write transactions instead of one. In practice N is small (single digits for a small devnet), but it's a regression to note, especially under higher validator counts.


Minor: groups_considered overcounts eliminated jobs

In snapshot_aggregation_inputs:

let groups_considered = gossip_groups.len()
    + new_payload_keys.iter().filter(...).count();

This counts groups before build_job prunes them. jobs.len() can be less than groups_considered. The AggregationDone log reports groups_considered from the snapshot — which is the pre-filter count — so the difference between groups_considered and groups_aggregated in the log mixes two different filter stages (pre-build vs. post-build). Consider naming it groups_examined or emitting jobs.len() separately as jobs_prepared.


Minor: Completed session held in current_aggregation until next interval 2

When AggregationDone arrives, self.current_aggregation is not cleared:

impl Handler<AggregationDone> for BlockChainServer {
    async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
        // current_aggregation stays Some(finished_session)
    }
}

The finished JoinHandle, CancellationToken, and TimerHandle are retained for the remainder of the slot (up to ~3.2 s). The cost is tiny (small Arc overhead), but setting self.current_aggregation = None here would make the state machine more explicit: session is done when AggregationDone is handled.


Minor: pub structs with pub(crate) fields in store.rs

AggregationJob, AggregationSnapshot, and AggregatedGroupOutput are declared pub while all their fields are pub(crate). Since these types don't cross crate boundaries, pub(crate) on the struct declaration is more accurate.


Minor: Ignoring send error after early break in worker

for job in snapshot.jobs {
    if actor.send(AggregateProduced { ... }).is_err() {
        break;  // actor gone
    }
}
let _ = actor.send(AggregationDone { ... });  // also fails silently if actor gone

This is harmless — the let _ = correctly swallows the error — but a boolean tracking whether the actor was lost could avoid the redundant send. Low priority.


What looks good

  • Session fencing in AggregateProduced is correct and clearly written.
  • #[stopped] lifecycle hook properly cancels and joins the worker with a 2 s bound — clean shutdown path.
  • CancellationToken scoping is correct: the token is per-session and independent, so the #[stopped] hook doesn't accidentally cancel the next session's token.
  • snapshot_aggregation_inputs returns None when there's nothing to do, avoiding a no-op worker spawn.
  • Stale AggregationDeadline is safely handled via session ID even if TimerHandle drops don't cancel queued messages.
  • Metrics crossing the thread boundary via explicit observe_committee_signatures_aggregation(elapsed) rather than a drop-guard is the right call.
  • Spec tests updated cleanly; on_tick signature simplification is correct.

Automated review by Claude (Anthropic) · sonnet · custom prompt

@github-actions
Copy link
Copy Markdown

🤖 Codex Code Review

Findings

  1. High: missed-interval catch-up can now skip committee-signature aggregation for an entire slot. In crates/blockchain/src/lib.rs, aggregation is started only when the final wall-clock interval is 2, after crates/blockchain/src/store.rs has already advanced store.time() through every missed interval in a loop. If the actor wakes up late and a single tick jumps from interval 1 to 3/4, the old code still aggregated during catch-up; this version does not, so that slot’s local aggregates are silently lost. I’d drive aggregation off the same interval-advance loop, or have store::on_tick() report that interval 2 was traversed.

  2. Medium: session fencing is only slot, which is not unique enough to reject stale same-slot worker output. crates/blockchain/src/lib.rs sets session_id = slot, and crates/blockchain/src/lib.rs accepts any AggregateProduced whose id matches the current session. If interval 2 is entered twice for the same slot, late messages from the cancelled worker are indistinguishable from the replacement worker and will still be applied. Using a monotonic per-session counter and clearing the session on crates/blockchain/src/lib.rs would make the fence robust.

No other obvious consensus or memory-safety regressions stood out in the diff.

I couldn’t run cargo test here because rustup tries to write under a read-only /home/runner/.rustup/tmp.


Automated review by OpenAI Codex · gpt-5.4 · custom prompt

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 16, 2026

Greptile Summary

This PR moves committee-signature aggregation (XMSS proofs, 400–1200 ms each) off the BlockChainServer actor thread onto a tokio::task::spawn_blocking worker, bounded by a 750 ms CancellationToken deadline and a clean #[stopped] shutdown hook. Session fencing via slot-number session_id ensures stale AggregateProduced messages from a prior worker are discarded correctly.

  • P2: AggregationDone lacks the same session-id guard applied to AggregateProduced; in the documented pathological case a stale done-message can trigger a misplaced finalize_aggregation_session call and emit a confusing log, though no consensus state is affected.

Confidence Score: 5/5

Safe to merge; the one finding is a P2 metric/logging inconsistency with no consensus impact.

All remaining findings are P2: the missing session-id fence in AggregationDone affects metric gauge timing and log clarity in a rare pathological case (prior worker outliving its 2 s join timeout), but does not corrupt store state or affect fork-choice correctness. The core architecture — snapshot isolation, session fencing on AggregateProduced, CancellationToken deadline, clean #[stopped] shutdown — is sound and validated by the 200-slot partition devnet run.

crates/blockchain/src/lib.rs — AggregationDone handler (lines 723-741)

Important Files Changed

Filename Overview
crates/blockchain/src/lib.rs Core actor refactor: moves XMSS aggregation off the actor thread via spawn_blocking, adds session lifecycle (AggregationSession, three new message types, #[stopped] hook). One P2: AggregationDone handler is missing the session-id fence that AggregateProduced correctly applies.
crates/blockchain/src/store.rs Extracted pure snapshot/job/apply/finalize helpers for off-thread aggregation; removed inline aggregate_committee_signatures. on_tick interval-2 arm is now a no-op with a clear comment. Changes look correct.
crates/blockchain/src/metrics.rs Added observe_committee_signatures_aggregation using a Duration argument (instead of a drop-guard) to support cross-thread measurement. All existing metrics preserved.
crates/blockchain/Cargo.toml Adds tokio-util 0.7 (CancellationToken) with default-features = false. Only this crate uses tokio-util in the workspace; compiles cleanly per CI checks.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["Interval 2 tick"] --> B["snapshot_aggregation_inputs()"]
    B -->|"None: nothing to do"| Z["Skip session"]
    B -->|"Some(snapshot)"| C["spawn_blocking worker"]
    C --> D["send_after 750ms AggregationDeadline"]
    C --> E["run_aggregation_worker loop"]
    E --> F{"cancel.is_cancelled?"}
    F -->|"Yes"| G["Break loop"]
    F -->|"No"| H["aggregate_job() XMSS proof"]
    H --> I["Send AggregateProduced to actor"]
    I --> J["apply_aggregated_group + publish P2P"]
    J --> E
    G --> K["Send AggregationDone to actor"]
    H --> K
    D --> L["AggregationDeadline fires"]
    L --> M["cancel.cancel()"]
    K --> N["finalize_aggregation_session gauge refresh"]
    O["#stopped hook"] --> P["cancel.cancel() + join worker 2s timeout"]
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: crates/blockchain/src/lib.rs
Line: 723-741

Comment:
**`AggregationDone` missing session-id fence**

`AggregateProduced` guards against stale sessions (`current != Some(msg.session_id)` → drop), but `AggregationDone` has no such guard. In the documented pathological case — a prior worker that outlives its 2 s join timeout — it can arrive while session N+1 is actively aggregating. This would (a) call `finalize_aggregation_session` mid-session N+1, snapshotting a partially-applied gossip-signatures gauge, and (b) emit a "Committee signatures aggregated" log line attributed to the old session id, making the devnet trace misleading.

```suggestion
impl Handler<AggregationDone> for BlockChainServer {
    async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
        let current = self.current_aggregation.as_ref().map(|s| s.session_id);
        if current != Some(msg.session_id) {
            trace!(
                incoming_session_id = msg.session_id,
                current_session_id = ?current,
                "Dropping stale aggregation done for non-current session"
            );
            return;
        }
        store::finalize_aggregation_session(&self.store);
        metrics::observe_committee_signatures_aggregation(msg.total_elapsed);

        let aggregation_elapsed = msg.total_elapsed;
        info!(
            ?aggregation_elapsed,
            session_id = msg.session_id,
            groups_considered = msg.groups_considered,
            groups_aggregated = msg.groups_aggregated,
            total_raw_sigs = msg.total_raw_sigs,
            total_children = msg.total_children,
            cancelled = msg.cancelled,
            aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
            "Committee signatures aggregated"
        );
    }
}
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (1): Last reviewed commit: "refactor: simplify" | Re-trigger Greptile

Comment on lines +723 to +741
impl Handler<AggregationDone> for BlockChainServer {
async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
store::finalize_aggregation_session(&self.store);
metrics::observe_committee_signatures_aggregation(msg.total_elapsed);

let aggregation_elapsed = msg.total_elapsed;
info!(
?aggregation_elapsed,
session_id = msg.session_id,
groups_considered = msg.groups_considered,
groups_aggregated = msg.groups_aggregated,
total_raw_sigs = msg.total_raw_sigs,
total_children = msg.total_children,
cancelled = msg.cancelled,
aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
"Committee signatures aggregated"
);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 AggregationDone missing session-id fence

AggregateProduced guards against stale sessions (current != Some(msg.session_id) → drop), but AggregationDone has no such guard. In the documented pathological case — a prior worker that outlives its 2 s join timeout — it can arrive while session N+1 is actively aggregating. This would (a) call finalize_aggregation_session mid-session N+1, snapshotting a partially-applied gossip-signatures gauge, and (b) emit a "Committee signatures aggregated" log line attributed to the old session id, making the devnet trace misleading.

Suggested change
impl Handler<AggregationDone> for BlockChainServer {
async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
store::finalize_aggregation_session(&self.store);
metrics::observe_committee_signatures_aggregation(msg.total_elapsed);
let aggregation_elapsed = msg.total_elapsed;
info!(
?aggregation_elapsed,
session_id = msg.session_id,
groups_considered = msg.groups_considered,
groups_aggregated = msg.groups_aggregated,
total_raw_sigs = msg.total_raw_sigs,
total_children = msg.total_children,
cancelled = msg.cancelled,
aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
"Committee signatures aggregated"
);
}
}
impl Handler<AggregationDone> for BlockChainServer {
async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
let current = self.current_aggregation.as_ref().map(|s| s.session_id);
if current != Some(msg.session_id) {
trace!(
incoming_session_id = msg.session_id,
current_session_id = ?current,
"Dropping stale aggregation done for non-current session"
);
return;
}
store::finalize_aggregation_session(&self.store);
metrics::observe_committee_signatures_aggregation(msg.total_elapsed);
let aggregation_elapsed = msg.total_elapsed;
info!(
?aggregation_elapsed,
session_id = msg.session_id,
groups_considered = msg.groups_considered,
groups_aggregated = msg.groups_aggregated,
total_raw_sigs = msg.total_raw_sigs,
total_children = msg.total_children,
cancelled = msg.cancelled,
aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
"Committee signatures aggregated"
);
}
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/blockchain/src/lib.rs
Line: 723-741

Comment:
**`AggregationDone` missing session-id fence**

`AggregateProduced` guards against stale sessions (`current != Some(msg.session_id)` → drop), but `AggregationDone` has no such guard. In the documented pathological case — a prior worker that outlives its 2 s join timeout — it can arrive while session N+1 is actively aggregating. This would (a) call `finalize_aggregation_session` mid-session N+1, snapshotting a partially-applied gossip-signatures gauge, and (b) emit a "Committee signatures aggregated" log line attributed to the old session id, making the devnet trace misleading.

```suggestion
impl Handler<AggregationDone> for BlockChainServer {
    async fn handle(&mut self, msg: AggregationDone, _ctx: &Context<Self>) {
        let current = self.current_aggregation.as_ref().map(|s| s.session_id);
        if current != Some(msg.session_id) {
            trace!(
                incoming_session_id = msg.session_id,
                current_session_id = ?current,
                "Dropping stale aggregation done for non-current session"
            );
            return;
        }
        store::finalize_aggregation_session(&self.store);
        metrics::observe_committee_signatures_aggregation(msg.total_elapsed);

        let aggregation_elapsed = msg.total_elapsed;
        info!(
            ?aggregation_elapsed,
            session_id = msg.session_id,
            groups_considered = msg.groups_considered,
            groups_aggregated = msg.groups_aggregated,
            total_raw_sigs = msg.total_raw_sigs,
            total_children = msg.total_children,
            cancelled = msg.cancelled,
            aggregation_deadline_ms = AGGREGATION_DEADLINE.as_millis() as u64,
            "Committee signatures aggregated"
        );
    }
}
```

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant