Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2f8546c
Node signature module
mskrzypkows Apr 10, 2026
fafb04a
Merge branch 'main' of github.com:NethermindEth/pluto into dkg_nodesigs
mskrzypkows Apr 13, 2026
2e54694
clippy
mskrzypkows Apr 13, 2026
128fd2b
fixed too long locking the channel
mskrzypkows Apr 13, 2026
dab0eb3
Changed CallbackFn to be async, so I can wait for the watched lock hash
mskrzypkows Apr 13, 2026
6cbabbd
unneeded pub mod
mskrzypkows Apr 14, 2026
ff87e98
Merge branch 'main' of github.com:NethermindEth/pluto into dkg_nodesigs
mskrzypkows Apr 14, 2026
074d370
review corrections
mskrzypkows Apr 14, 2026
154798c
clippy
mskrzypkows Apr 14, 2026
6afcb49
Merge branch 'main' into dkg_nodesigs
mskrzypkows Apr 15, 2026
3d87424
review corrections
mskrzypkows Apr 15, 2026
ec5a684
cancelation token forwarded to the receive function
mskrzypkows Apr 15, 2026
55c3193
Merge branch 'dkg_nodesigs' of github.com:NethermindEth/pluto into dk…
mskrzypkows Apr 15, 2026
ab22f96
Merge branch 'main' of github.com:NethermindEth/pluto into dkg_nodesigs
mskrzypkows Apr 15, 2026
1f24e96
Merge branch 'main' into dkg_nodesigs
mskrzypkows Apr 16, 2026
152f981
Merge branch 'main' into dkg_nodesigs
mskrzypkows Apr 16, 2026
ba43c99
Verificaton of sender peer id
mskrzypkows Apr 17, 2026
72c61fb
Merge branch 'main' of github.com:NethermindEth/pluto into dkg_nodesigs
mskrzypkows Apr 17, 2026
405f2e8
NONE_DATA for signature when the key is not provided
mskrzypkows Apr 17, 2026
75451bb
Merge branch 'dkg_nodesigs' of github.com:NethermindEth/pluto into dk…
mskrzypkows Apr 17, 2026
454c17e
Merge branch 'main' into dkg_nodesigs
mskrzypkows Apr 18, 2026
6452217
Merge branch 'main' into dkg_nodesigs
mskrzypkows Apr 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/dkg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ thiserror.workspace = true
libp2p.workspace = true
futures.workspace = true
tokio.workspace = true
tokio-util.workspace = true
sha2.workspace = true
tracing.workspace = true
either.workspace = true
Expand All @@ -35,6 +36,7 @@ pluto-build-proto.workspace = true

[dev-dependencies]
anyhow.workspace = true
test-case.workspace = true
clap.workspace = true
hex.workspace = true
pluto-cluster = { workspace = true, features = ["test-cluster"] }
Expand Down
18 changes: 10 additions & 8 deletions crates/dkg/examples/bcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,16 @@ async fn register_message(component: &Component, local_node_number: u32) -> bcas
Ok(())
}),
Box::new(move |peer_id, received_msg_id, msg| {
info!(
local_node = local_node_number,
sender = %peer_id,
msg_id = received_msg_id,
msg = ?msg,
"Received broadcast"
);
Ok(())
Box::pin(async move {
info!(
local_node = local_node_number,
sender = %peer_id,
msg_id = received_msg_id,
msg = ?msg,
"Received broadcast"
);
Ok(())
})
}),
)
.await
Expand Down
21 changes: 12 additions & 9 deletions crates/dkg/src/bcast/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,15 +585,18 @@ mod tests {
"timestamp",
Box::new(|_peer_id, _msg| Ok(())),
Box::new(move |peer_id, msg_id, msg| {
receipt_tx
.send(Receipt {
target: node_index,
source: peer_id,
msg_id: msg_id.to_string(),
seconds: msg.seconds,
})
.map_err(|_| Error::ReceiptChannelClosed)?;
Ok(())
let receipt_tx = receipt_tx.clone();
Box::pin(async move {
receipt_tx
.send(Receipt {
target: node_index,
source: peer_id,
msg_id,
seconds: msg.seconds,
})
.map_err(|_| Error::ReceiptChannelClosed)?;
Ok(())
})
}),
)
.await
Expand Down
28 changes: 21 additions & 7 deletions crates/dkg/src/bcast/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{collections::HashMap, sync::Arc};

use futures::future::BoxFuture;
use libp2p::PeerId;
use prost::{Message, Name};
use prost_types::Any;
Expand All @@ -13,7 +14,12 @@ use super::error::{Error, Result};
pub type CheckFn<M> = Box<dyn Fn(PeerId, &M) -> Result<()> + Send + Sync + 'static>;

/// Typed message callback invoked for validated broadcast messages.
pub type CallbackFn<M> = Box<dyn Fn(PeerId, &str, M) -> Result<()> + Send + Sync + 'static>;
///
/// The returned future is awaited by the inbound message handler, allowing
/// the callback to perform async operations (e.g. waiting for state that
/// becomes available later).
pub type CallbackFn<M> =
Box<dyn Fn(PeerId, String, M) -> BoxFuture<'static, Result<()>> + Send + Sync + 'static>;

pub(crate) type Registry = Arc<RwLock<HashMap<String, Arc<dyn RegisteredMessage>>>>;

Expand All @@ -33,7 +39,8 @@ pub(crate) trait RegisteredMessage: Send + Sync {
fn check(&self, peer_id: PeerId, any: &Any) -> Result<()>;

/// Dispatches the incoming wrapped protobuf message to the typed callback.
fn callback(&self, peer_id: PeerId, msg_id: &str, any: &Any) -> Result<()>;
fn callback(&self, peer_id: PeerId, msg_id: String, any: Any)
-> BoxFuture<'static, Result<()>>;
}

struct TypedRegistration<M> {
Expand All @@ -50,9 +57,16 @@ where
(self.check)(peer_id, &message)
}

fn callback(&self, peer_id: PeerId, msg_id: &str, any: &Any) -> Result<()> {
let message = any.to_msg::<M>()?;
(self.callback)(peer_id, msg_id, message)
fn callback(
&self,
peer_id: PeerId,
msg_id: String,
any: Any,
) -> BoxFuture<'static, Result<()>> {
match any.to_msg::<M>() {
Ok(message) => (self.callback)(peer_id, msg_id, message),
Err(e) => Box::pin(async move { Err(e.into()) }),
}
}
}

Expand Down Expand Up @@ -138,7 +152,7 @@ mod tests {
.register_message::<prost_types::Timestamp>(
"timestamp",
Box::new(|_, _| Ok(())),
Box::new(|_, _, _| Ok(())),
Box::new(|_, _, _| Box::pin(async { Ok(()) })),
)
.await
.unwrap();
Expand All @@ -147,7 +161,7 @@ mod tests {
.register_message::<prost_types::Timestamp>(
"timestamp",
Box::new(|_, _| Ok(())),
Box::new(|_, _, _| Ok(())),
Box::new(|_, _, _| Box::pin(async { Ok(()) })),
)
.await
.unwrap_err();
Expand Down
32 changes: 32 additions & 0 deletions crates/dkg/src/bcast/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ impl Failure {
}
}

/// Peer IDs involved in an [`Error::InvalidSenderPeerIndex`] error.
#[derive(Debug)]
pub struct SenderPeerMismatch {
/// The peer ID of the actual sender.
pub sender: PeerId,
/// The peer ID expected at the claimed index.
pub expected: PeerId,
}

impl fmt::Display for SenderPeerMismatch {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"sender peer ID ({}) does not match claimed peer index {}",
self.sender, self.expected
)
}
}

/// User-facing reliable-broadcast error.
#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -95,6 +114,15 @@ pub enum Error {
#[error("invalid signature for peer {0}")]
InvalidSignature(PeerId),

/// The peer index in the message is out of range or matches the local node.
#[error("invalid peer index: {0}")]
InvalidPeerIndex(PeerId),

/// The sender's peer index in the message does not match the sender's
/// actual index.
#[error("{0}")]
InvalidSenderPeerIndex(Box<SenderPeerMismatch>),

/// The repeated hash for the same `(peer, msg_id)` differed.
#[error("duplicate id with mismatching hash")]
DuplicateMismatchingHash,
Expand All @@ -119,6 +147,10 @@ pub enum Error {
#[error("receipt channel closed")]
ReceiptChannelClosed,

/// The operation was cancelled.
#[error("cancelled")]
Cancelled,

/// A required message body field was absent.
#[error("missing protobuf field: {0}")]
MissingField(&'static str),
Expand Down
2 changes: 1 addition & 1 deletion crates/dkg/src/bcast/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ async fn handle_inbound_msg(
.ok_or_else(|| Error::UnknownMessageId(message.id.clone()))?
};

handler.callback(peer_id, &message.id, &any)?;
handler.callback(peer_id, message.id, any).await?;
stream.close().await?;
Ok(())
}
2 changes: 1 addition & 1 deletion crates/dkg/src/bcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod protocol;

pub use behaviour::{Behaviour, Event};
pub use component::{CallbackFn, CheckFn, Component};
pub use error::{Error, Failure, Result};
pub use error::{Error, Failure, Result, SenderPeerMismatch};

/// The request-response protocol used to gather peer signatures.
pub const SIG_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/charon/dkg/bcast/1.0.0/sig");
Expand Down
3 changes: 3 additions & 0 deletions crates/dkg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@ pub mod disk;
/// Main DKG protocol implementation.
pub mod dkg;

/// Node signature exchange over the lock hash.
pub mod nodesigs;

/// Shares distributed to each node in the cluster.
pub mod share;
Loading
Loading