Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8e0e71f
feat(truapi): add testing API and versioned wiring
pgherveou Jun 30, 2026
dea080a
fixup! feat(truapi): add testing API and versioned wiring
pgherveou Jul 1, 2026
3409152
fixup! feat(truapi): add testing API and versioned wiring
pgherveou Jul 1, 2026
0569aaf
fixup! feat(truapi): add testing API and versioned wiring
pgherveou Jul 2, 2026
8c93e99
feat(truapi-platform): add host capability traits
pgherveou Jun 30, 2026
97d0b79
fixup! feat(truapi-platform): add host capability traits
pgherveou Jul 2, 2026
d2e3674
fixup! feat(truapi-platform): add host capability traits
pgherveou Jul 3, 2026
c6f888c
feat(truapi-codegen): emit Rust dispatcher, wire table, and host call…
pgherveou Jun 30, 2026
a37abae
fixup! feat(truapi-codegen): emit Rust dispatcher, wire table, and ho…
pgherveou Jul 2, 2026
99d1b7e
fixup! feat(truapi-codegen): emit Rust dispatcher, wire table, and ho…
pgherveou Jul 2, 2026
1c46353
fixup! feat(truapi-codegen): emit Rust dispatcher, wire table, and ho…
pgherveou Jul 2, 2026
2c52262
fixup! feat(truapi-codegen): emit Rust dispatcher, wire table, and ho…
pgherveou Jul 3, 2026
7bd57b8
fixup! feat(truapi-codegen): emit Rust dispatcher, wire table, and ho…
pgherveou Jul 3, 2026
6964df3
feat(truapi-server): add host logic primitives
pgherveou Jul 1, 2026
c8c4595
fixup! feat(truapi-server): add host logic primitives
pgherveou Jul 1, 2026
88cf1a9
fixup! feat(truapi-server): add host logic primitives
pgherveou Jul 2, 2026
2d88bdf
fixup! feat(truapi-server): add host logic primitives
pgherveou Jul 3, 2026
e1f6790
feat(truapi-server): add wire and chain infrastructure
pgherveou Jul 1, 2026
d959726
fixup! feat(truapi-server): add wire and chain infrastructure
pgherveou Jul 3, 2026
fa642c1
fixup! feat(truapi-server): add host logic primitives
pgherveou Jul 3, 2026
e4d9a0d
fixup! feat(truapi-server): add wire and chain infrastructure
pgherveou Jul 3, 2026
dab5395
Merge origin/main into rust-core/04-codegen
pgherveou Jul 3, 2026
15e1594
Merge branch 'rust-core/04-codegen' into rust-core/04a-server-host-logic
pgherveou Jul 3, 2026
54f0b20
docs(host-logic): link host-spec sections
pgherveou Jul 3, 2026
2da3da5
Merge branch 'rust-core/04a-server-host-logic' into rust-core/04b-ser…
pgherveou Jul 3, 2026
cda27e1
fixup! feat(truapi-server): add wire and chain infrastructure
pgherveou Jul 3, 2026
1291de5
Merge branch 'main' into rust-core/04b-server-wire-chain
pgherveou Jul 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,589 changes: 1,589 additions & 0 deletions rust/crates/truapi-server/src/chain_runtime.rs

Large diffs are not rendered by default.

272 changes: 272 additions & 0 deletions rust/crates/truapi-server/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
//! Request dispatcher.
//!
//! Routes incoming frames to the appropriate trait method based on the
//! numeric wire discriminant. The handler set is registered by the
//! auto-generated [`crate::generated::dispatcher::register`] function; this
//! module provides the framework that owns the registration tables and the
//! routing logic.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use futures::future::LocalBoxFuture;
use tracing::instrument;

use crate::frame::{Payload, ProtocolMessage};
use crate::generated::wire_table::{RequestFrameIds, SubscriptionFrameIds};
use crate::subscription::{Spawner, SubscriptionManager, SubscriptionStream};
use crate::transport::Transport;

/// A handler for a request-response method. The returned future is not
/// required to be `Send` because the truapi trait uses `async fn`, whose
/// auto-Send-ness is not guaranteed. The `request_id` is the per-frame
/// identifier; handlers thread it into the `CallContext` so trait methods
/// can correlate logs/cancellation with the originating request. On the
/// error path handlers return the complete SCALE-encoded response payload.
pub type RequestHandler =
Arc<dyn Fn(String, Vec<u8>) -> LocalBoxFuture<'static, Result<Vec<u8>, Vec<u8>>> + Send + Sync>;

/// A handler for a subscription method. On the error path the handler returns
/// the complete SCALE-encoded `_interrupt` payload.
pub type SubscriptionHandler = Arc<
dyn Fn(String, Vec<u8>) -> LocalBoxFuture<'static, Result<SubscriptionStream, Vec<u8>>>
+ Send
+ Sync,
>;

/// A registered request handler plus the discriminants it replies on.
pub struct RequestEntry {
ids: RequestFrameIds,
handler: RequestHandler,
}

/// A registered subscription handler plus the discriminants its frames carry.
pub struct SubscriptionEntry {
ids: SubscriptionFrameIds,
handler: SubscriptionHandler,
}

/// Routes incoming protocol messages to registered handlers, keyed on the
/// numeric wire discriminant.
pub struct Dispatcher {
by_request: HashMap<u8, RequestEntry>,
by_start: HashMap<u8, SubscriptionEntry>,
stop_ids: HashSet<u8>,
subscriptions: SubscriptionManager,
}

impl Dispatcher {
/// Construct a dispatcher whose subscriptions are driven on `spawner`.
pub fn new(spawner: Spawner) -> Self {
Self {
by_request: HashMap::new(),
by_start: HashMap::new(),
stop_ids: HashSet::new(),
subscriptions: SubscriptionManager::new(spawner),
}
}

/// Register a request-response handler, keyed on `ids.request_id`. Returns
/// the previously registered entry if any; callers (the generated
/// `dispatcher::register`) should treat `Some` as a programming error
/// since each request id must own exactly one handler.
pub fn on_request<F>(&mut self, ids: RequestFrameIds, handler: F) -> Option<RequestEntry>
where
F: Fn(String, Vec<u8>) -> LocalBoxFuture<'static, Result<Vec<u8>, Vec<u8>>>
+ Send
+ Sync
+ 'static,
{
self.by_request.insert(
ids.request_id,
RequestEntry {
ids,
handler: Arc::new(handler),
},
)
}

/// Register a subscription handler, keyed on `ids.start_id`, and record
/// `ids.stop_id` so a matching `_stop` frame tears the subscription down.
/// Returns the previously registered entry if any.
pub fn on_subscription<F>(
&mut self,
ids: SubscriptionFrameIds,
handler: F,
) -> Option<SubscriptionEntry>
where
F: Fn(String, Vec<u8>) -> LocalBoxFuture<'static, Result<SubscriptionStream, Vec<u8>>>
+ Send
+ Sync
+ 'static,
{
self.stop_ids.insert(ids.stop_id);
self.by_start.insert(
ids.start_id,
SubscriptionEntry {
ids,
handler: Arc::new(handler),
},
)
}

/// Process an incoming protocol message, sending any responses or
/// subscription frames through `transport`. A discriminant with no
/// registered handler is dropped.
#[instrument(skip_all, fields(runtime.method = "dispatcher.dispatch"))]
pub async fn dispatch(&self, message: ProtocolMessage, transport: Arc<dyn Transport>) {
let id = message.payload.id;

if let Some(entry) = self.by_request.get(&id) {
let request_id = message.request_id.clone();
let value = (entry.handler)(request_id, message.payload.value)
.await
.unwrap_or_else(|value| value);
transport.send(ProtocolMessage {
request_id: message.request_id,
payload: Payload {
id: entry.ids.response_id,
value,
},
});
} else if let Some(entry) = self.by_start.get(&id) {
// Reserve the slot before awaiting the handler so a `_stop`
// arriving while the handler resolves cancels the pending
// subscription instead of racing the registration.
let token = self.subscriptions.reserve(message.request_id.clone());
let request_id = message.request_id.clone();
match (entry.handler)(request_id, message.payload.value).await {
Ok(stream) => {
self.subscriptions.activate(
token,
entry.ids.receive_id,
entry.ids.interrupt_id,
stream,
transport,
);
}
Err(err_bytes) => {
self.subscriptions.cancel_reservation(token);
transport.send(ProtocolMessage {
request_id: message.request_id,
payload: Payload {
id: entry.ids.interrupt_id,
value: err_bytes,
},
});
}
}
} else if self.stop_ids.contains(&id) {
self.subscriptions.handle_stop(&message.request_id);
}
// Unknown discriminant: drop. Response / receive / interrupt frames are
// handled by the client side and never registered here.
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;

fn test_spawner() -> Spawner {
#[cfg(not(target_arch = "wasm32"))]
{
crate::subscription::thread_per_subscription_spawner()
}
#[cfg(target_arch = "wasm32")]
{
Arc::new(futures::executor::block_on)
}
}

#[derive(Default)]
struct RecordingTransport {
sent: Mutex<Vec<ProtocolMessage>>,
}

impl RecordingTransport {
fn sent(&self) -> Vec<ProtocolMessage> {
self.sent.lock().unwrap().clone()
}
}

impl Transport for RecordingTransport {
fn send(&self, message: ProtocolMessage) {
self.sent.lock().unwrap().push(message);
}
fn on_message(
&self,
_handler: Box<dyn Fn(ProtocolMessage) + Send + Sync>,
) -> Box<dyn FnOnce()> {
Box::new(|| {})
}
}

fn make_frame(id: u8, value: Vec<u8>) -> ProtocolMessage {
ProtocolMessage {
request_id: "p:1".into(),
payload: Payload { id, value },
}
}

/// A frame whose discriminant has no registered handler is dropped: no
/// response, no interrupt. (In production `register` registers every wire
/// method, so this only happens for malformed or client-bound ids.)
#[test]
fn dispatch_unregistered_id_sends_nothing() {
let dispatcher = Dispatcher::new(test_spawner());
let transport = Arc::new(RecordingTransport::default());
let transport_dyn: Arc<dyn Transport> = transport.clone();
let frame = make_frame(250, Vec::new());
futures::executor::block_on(dispatcher.dispatch(frame, transport_dyn));
assert!(
transport.sent().is_empty(),
"an unregistered discriminant must produce no frame"
);
}

/// A handler error already owns the complete response payload. The
/// dispatcher only routes it to the registered response id.
#[test]
fn dispatch_request_handler_error_emits_response_payload() {
let mut dispatcher = Dispatcher::new(test_spawner());
let ids = RequestFrameIds {
request_id: 200,
response_id: 201,
};
dispatcher.on_request(ids, |_request_id, _bytes| {
Box::pin(async move { Err(vec![9, 8, 7]) })
});
let transport = Arc::new(RecordingTransport::default());
let frame = make_frame(200, Vec::new());
futures::executor::block_on(dispatcher.dispatch(frame, transport.clone()));
let sent = transport.sent();
assert_eq!(sent.len(), 1, "exactly one response expected");
assert_eq!(sent[0].payload.id, 201);
assert_eq!(sent[0].payload.value, vec![9, 8, 7]);
}

/// Registering two handlers under the same key must not silently
/// overwrite. The contract chosen here is "loud": `on_request`
/// returns the previous handler, so callers can detect collisions.
#[test]
fn register_request_twice_returns_previous_handler() {
let mut dispatcher = Dispatcher::new(test_spawner());
let ids = RequestFrameIds {
request_id: 200,
response_id: 201,
};
let prev = dispatcher.on_request(ids, |_request_id, _bytes| {
Box::pin(async move { Ok(Vec::new()) })
});
assert!(prev.is_none(), "first registration has no predecessor");
let prev = dispatcher.on_request(ids, |_request_id, _bytes| {
Box::pin(async move { Ok(Vec::new()) })
});
assert!(
prev.is_some(),
"second registration must return the previous handler"
);
}
}
Loading