diff --git a/crates/aingle_cortex/src/cluster_init.rs b/crates/aingle_cortex/src/cluster_init.rs index 20bcfa74..cd652013 100644 --- a/crates/aingle_cortex/src/cluster_init.rs +++ b/crates/aingle_cortex/src/cluster_init.rs @@ -77,8 +77,10 @@ impl ClusterConfig { } "--cluster-peers" => { if i + 1 < args.len() { - cfg.peers = - args[i + 1].split(',').map(|s| s.trim().to_string()).collect(); + cfg.peers = args[i + 1] + .split(',') + .map(|s| s.trim().to_string()) + .collect(); i += 1; } } @@ -353,6 +355,134 @@ pub fn build_tls_server_config( Ok(config) } +/// Ensure the Semantic DAG is enabled and ready for *signed* writes. +/// +/// Enables the DAG on the graph (persistent when `db_path` is a real path, +/// in-memory otherwise), initializes genesis, sets the DAG author from the +/// cluster node id when present, and loads/generates the Ed25519 signing key. +/// +/// This lives in the library (not only in `main.rs`) so that library consumers +/// and the cluster integration tests — which call [`init_cluster`] directly, +/// bypassing `main()` — get a fully-initialized, signable DAG. Without it, +/// cluster DAG writes are emitted unsigned and rejected with +/// "DagAction rejected: missing Ed25519 signature". +#[cfg(feature = "dag")] +pub async fn ensure_dag_ready(state: &mut crate::state::AppState, db_path: Option<&str>) { + // Enable DAG on the GraphDB (persistent for Sled, in-memory otherwise). + { + let mut graph = state.graph.write().await; + match db_path { + Some(p) if p != ":memory:" => { + graph.enable_dag_persistent(p).unwrap_or_else(|e| { + panic!( + "Failed to enable persistent DAG at '{}': {e}. \ + Refusing to start with volatile DAG — fix the storage path or permissions.", + p + ); + }); + tracing::info!("DAG persistence enabled (Sled)"); + } + _ => { + tracing::warn!("DAG using in-memory backend — data will NOT survive restarts"); + graph.enable_dag(); + } + } + let triple_count = graph.count(); + if let Some(dag_store) = graph.dag_store() { + match dag_store.init_or_migrate(triple_count) { + Ok(genesis_hash) => { + tracing::info!( + hash = %genesis_hash, + triples = triple_count, + "DAG initialized (genesis)" + ); + } + Err(e) => { + tracing::error!("DAG initialization failed: {e}"); + } + } + } + } + + // Set DAG author from cluster node ID (no-op when not in cluster mode). + if let Some(node_id) = state.cluster_node_id { + state.dag_author = Some(aingle_graph::NodeId::named(&format!("node:{}", node_id))); + } + + // Initialize the Ed25519 signing key for DAG actions. + // Persistent nodes reuse the `node.key` seed next to the DB; in-memory + // nodes generate an ephemeral key. + let key = match db_path { + Some(p) if p != ":memory:" => { + let key_path = std::path::Path::new(p) + .parent() + .unwrap_or(std::path::Path::new(".")) + .join("node.key"); + if key_path.exists() { + match std::fs::read(&key_path) { + Ok(seed) if seed.len() == 32 => { + let mut arr = [0u8; 32]; + arr.copy_from_slice(&seed); + Some(aingle_graph::dag::DagSigningKey::from_seed(&arr)) + } + _ => None, + } + } else { + let key = aingle_graph::dag::DagSigningKey::generate(); + let seed = key.seed(); + if let Some(parent) = key_path.parent() { + std::fs::create_dir_all(parent).ok(); + } + #[cfg(unix)] + { + use std::io::Write; + use std::os::unix::fs::OpenOptionsExt; + match std::fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .mode(0o600) + .open(&key_path) + { + Ok(mut f) => { + if let Err(e) = f.write_all(&seed).and_then(|_| f.sync_all()) { + tracing::error!("Failed to persist DAG signing key: {e}"); + } + } + Err(e) => { + tracing::error!( + "Failed to open DAG key file {}: {e}", + key_path.display() + ); + } + } + } + #[cfg(not(unix))] + { + if let Err(e) = std::fs::write(&key_path, &seed) { + tracing::error!("Failed to persist DAG signing key: {e}"); + } + } + Some(key) + } + } + _ => { + // In-memory mode: generate ephemeral key. + Some(aingle_graph::dag::DagSigningKey::generate()) + } + }; + + if let Some(ref k) = key { + tracing::info!( + public_key = %k.public_key_hex(), + "DAG signing key loaded (Ed25519)" + ); + } + state.dag_signing_key = key.map(std::sync::Arc::new); + + tracing::info!("Semantic DAG v0.6.0 enabled"); +} + /// Initialize the Raft cluster on a `CortexServer`. /// /// This sets up the WAL, state machine, network factory, and Raft instance. @@ -401,10 +531,7 @@ pub async fn init_cluster( ) .await; - let rpc_sender = std::sync::Arc::new(HttpRaftRpcSender::new( - config.secret.clone(), - config.tls, - )); + let rpc_sender = std::sync::Arc::new(HttpRaftRpcSender::new(config.secret.clone(), config.tls)); let network = aingle_raft::network::CortexNetworkFactory::new(resolver, rpc_sender); let raft_config = openraft::Config { @@ -525,8 +652,7 @@ pub async fn init_cluster( break; } let base = std::time::Duration::from_secs(2u64.pow(attempt.min(5))); - let jitter = - std::time::Duration::from_millis(rand::random::() % 1000); + let jitter = std::time::Duration::from_millis(rand::random::() % 1000); let backoff = base + jitter; tracing::warn!(attempt, "Join failed, retrying in {:?}", backoff); tokio::time::sleep(backoff).await; @@ -536,12 +662,9 @@ pub async fn init_cluster( // Set up TLS server config if cluster TLS is enabled if config.tls { - let tls_config = build_tls_server_config( - config.tls_cert.as_deref(), - config.tls_key.as_deref(), - )?; - server.state_mut().tls_server_config = - Some(std::sync::Arc::new(tls_config)); + let tls_config = + build_tls_server_config(config.tls_cert.as_deref(), config.tls_key.as_deref())?; + server.state_mut().tls_server_config = Some(std::sync::Arc::new(tls_config)); tracing::info!("Cluster TLS enabled for inter-node communication"); } @@ -549,5 +672,15 @@ pub async fn init_cluster( server.state_mut().cluster_node_id = Some(node_id); tracing::info!(node_id, "Raft consensus initialized"); + // Enable + sign the DAG on this node. The binary's `main()` does this for + // standalone mode; cluster nodes (including library/test callers that reach + // here without going through `main()`) must do it too, otherwise DAG writes + // are unsigned and rejected by the Raft state machine. + #[cfg(feature = "dag")] + { + let db_path = server.config().db_path.clone(); + ensure_dag_ready(server.state_mut(), db_path.as_deref()).await; + } + Ok(()) } diff --git a/crates/aingle_cortex/src/main.rs b/crates/aingle_cortex/src/main.rs index 02a4fe07..b017594a 100644 --- a/crates/aingle_cortex/src/main.rs +++ b/crates/aingle_cortex/src/main.rs @@ -151,124 +151,12 @@ async fn main() -> Result<(), Box> { ); } - // Initialize DAG if enabled: enable DAG on the graph, create genesis if needed + // Initialize the Semantic DAG (enable, genesis, author, signing key). + // In cluster mode this was already done inside `init_cluster`, so only the + // standalone path needs to do it here — avoiding a double initialization. #[cfg(feature = "dag")] - { - let state = server.state_mut(); - - // Enable DAG on the GraphDB (persistent for Sled, in-memory otherwise) - { - let mut graph = state.graph.write().await; - match &db_path { - Some(p) if p != ":memory:" => { - graph.enable_dag_persistent(p).unwrap_or_else(|e| { - panic!( - "Failed to enable persistent DAG at '{}': {e}. \ - Refusing to start with volatile DAG — fix the storage path or permissions.", - p - ); - }); - tracing::info!("DAG persistence enabled (Sled)"); - } - _ => { - tracing::warn!("DAG using in-memory backend — data will NOT survive restarts"); - graph.enable_dag(); - } - } - let triple_count = graph.count(); - if let Some(dag_store) = graph.dag_store() { - match dag_store.init_or_migrate(triple_count) { - Ok(genesis_hash) => { - tracing::info!( - hash = %genesis_hash, - triples = triple_count, - "DAG initialized (genesis)" - ); - } - Err(e) => { - tracing::error!("DAG initialization failed: {e}"); - } - } - } - } - - // Set DAG author from cluster node ID - #[cfg(feature = "cluster")] - if let Some(node_id) = state.cluster_node_id { - state.dag_author = Some(aingle_graph::NodeId::named(&format!("node:{}", node_id))); - } - - // Initialize Ed25519 signing key for DAG actions. - // Reuses the same node.key seed as P2P identity (deterministic). - { - let key = match &db_path { - Some(p) if p != ":memory:" => { - let key_path = std::path::Path::new(p) - .parent() - .unwrap_or(std::path::Path::new(".")) - .join("node.key"); - if key_path.exists() { - match std::fs::read(&key_path) { - Ok(seed) if seed.len() == 32 => { - let mut arr = [0u8; 32]; - arr.copy_from_slice(&seed); - Some(aingle_graph::dag::DagSigningKey::from_seed(&arr)) - } - _ => None, - } - } else { - // Generate new key and persist - let key = aingle_graph::dag::DagSigningKey::generate(); - let seed = key.seed(); - if let Some(parent) = key_path.parent() { - std::fs::create_dir_all(parent).ok(); - } - #[cfg(unix)] - { - use std::io::Write; - use std::os::unix::fs::OpenOptionsExt; - match std::fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .mode(0o600) - .open(&key_path) - { - Ok(mut f) => { - if let Err(e) = f.write_all(&seed).and_then(|_| f.sync_all()) { - tracing::error!("Failed to persist DAG signing key: {e}"); - } - } - Err(e) => { - tracing::error!("Failed to open DAG key file {}: {e}", key_path.display()); - } - } - } - #[cfg(not(unix))] - { - if let Err(e) = std::fs::write(&key_path, &seed) { - tracing::error!("Failed to persist DAG signing key: {e}"); - } - } - Some(key) - } - } - _ => { - // In-memory mode: generate ephemeral key - Some(aingle_graph::dag::DagSigningKey::generate()) - } - }; - - if let Some(ref k) = key { - tracing::info!( - public_key = %k.public_key_hex(), - "DAG signing key loaded (Ed25519)" - ); - } - state.dag_signing_key = key.map(std::sync::Arc::new); - } - - tracing::info!("Semantic DAG v0.6.0 enabled"); + if !cluster_config.enabled { + aingle_cortex::cluster_init::ensure_dag_ready(server.state_mut(), db_path.as_deref()).await; } // Spawn periodic flush task if enabled @@ -277,8 +165,7 @@ async fn main() -> Result<(), Box> { let flush_dir = snapshot_dir.clone(); let interval_secs = flush_interval_secs; tokio::spawn(async move { - let mut interval = - tokio::time::interval(std::time::Duration::from_secs(interval_secs)); + let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs)); interval.tick().await; // skip immediate tick loop { interval.tick().await; @@ -289,10 +176,7 @@ async fn main() -> Result<(), Box> { } } }); - tracing::info!( - interval_secs = interval_secs, - "Periodic auto-flush enabled" - ); + tracing::info!(interval_secs = interval_secs, "Periodic auto-flush enabled"); } // Keep a reference to the state for shutdown flush @@ -347,12 +231,7 @@ async fn main() -> Result<(), Box> { #[cfg(feature = "cluster")] if let Some(ref raft) = state_for_shutdown.raft { tracing::info!("Shutting down Raft..."); - match tokio::time::timeout( - std::time::Duration::from_secs(10), - raft.shutdown(), - ) - .await - { + match tokio::time::timeout(std::time::Duration::from_secs(10), raft.shutdown()).await { Ok(Ok(())) => tracing::info!("Raft shut down gracefully"), Ok(Err(e)) => tracing::error!("Raft shutdown error: {e}"), Err(_) => tracing::error!("Raft shutdown timed out after 10s"), @@ -385,7 +264,9 @@ fn print_help() { println!(" -h, --host Host to bind to (default: 127.0.0.1)"); println!(" -p, --port Port to listen on (default: 19090)"); println!(" --public Bind to all interfaces (0.0.0.0)"); - println!(" --db Path to graph database (default: ~/.aingle/cortex/graph.sled)"); + println!( + " --db Path to graph database (default: ~/.aingle/cortex/graph.sled)" + ); println!(" --memory Use volatile in-memory storage (no persistence)"); println!(" --flush-interval Periodic flush interval in seconds (default: 300, 0=off)"); println!(" -V, --version Print version and exit"); @@ -403,7 +284,9 @@ fn print_help() { println!(" --cluster-node-id Unique node ID (u64, required)"); println!(" --cluster-peers Comma-separated peer REST addresses"); println!(" --cluster-wal-dir WAL directory (default: wal/)"); - println!(" --cluster-secret Shared secret for internal RPC auth (min 16 bytes)"); + println!( + " --cluster-secret Shared secret for internal RPC auth (min 16 bytes)" + ); println!(" --cluster-tls Enable TLS for inter-node communication"); println!(" --cluster-tls-cert TLS certificate PEM file"); println!(" --cluster-tls-key TLS private key PEM file"); diff --git a/crates/aingle_cortex/src/server.rs b/crates/aingle_cortex/src/server.rs index 3e909514..588682ab 100644 --- a/crates/aingle_cortex/src/server.rs +++ b/crates/aingle_cortex/src/server.rs @@ -7,10 +7,10 @@ use crate::error::Result; use crate::rest; use crate::state::AppState; +use axum::extract::DefaultBodyLimit; use axum::Router; use std::net::SocketAddr; use std::path::PathBuf; -use axum::extract::DefaultBodyLimit; use tower_http::cors::CorsLayer; use tower_http::trace::TraceLayer; use tracing::info; @@ -126,6 +126,11 @@ impl CortexServer { &mut self.state } + /// Returns a reference to the server configuration. + pub fn config(&self) -> &CortexConfig { + &self.config + } + /// Builds the `axum` router, combining all API routes and middleware. pub fn build_router(&self) -> Router { let mut app: Router = Router::new(); @@ -174,7 +179,7 @@ impl CortexServer { // CORS layer — only enabled with explicit origin whitelist. let app = if !self.config.cors_allowed_origins.is_empty() { - use tower_http::cors::{Any, AllowOrigin}; + use tower_http::cors::{AllowOrigin, Any}; let cors = if self.config.cors_allowed_origins == ["*"] { // Development-only wildcard @@ -220,8 +225,7 @@ impl CortexServer { if let Some(ref tls_config) = self.state.tls_server_config { info!("Starting Córtex API server on https://{}", addr); - let tls_acceptor = - tokio_rustls::TlsAcceptor::from(tls_config.clone()); + let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config.clone()); let tcp_listener = tokio::net::TcpListener::bind(addr).await?; let tls_listener = TlsListener { inner: tcp_listener, @@ -266,19 +270,15 @@ impl CortexServer { if let Some(ref tls_config) = self.state.tls_server_config { info!("Starting Córtex API server on https://{}", addr); - let tls_acceptor = - tokio_rustls::TlsAcceptor::from(tls_config.clone()); + let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config.clone()); let tcp_listener = tokio::net::TcpListener::bind(addr).await?; let tls_listener = TlsListener { inner: tcp_listener, acceptor: tls_acceptor, }; - axum::serve( - tls_listener, - router.into_make_service(), - ) - .with_graceful_shutdown(shutdown_signal) - .await?; + axum::serve(tls_listener, router.into_make_service()) + .with_graceful_shutdown(shutdown_signal) + .await?; info!("Córtex API server stopped"); return Ok(()); diff --git a/crates/aingle_cortex/tests/cluster_integration_test.rs b/crates/aingle_cortex/tests/cluster_integration_test.rs index 49695534..8afd6629 100644 --- a/crates/aingle_cortex/tests/cluster_integration_test.rs +++ b/crates/aingle_cortex/tests/cluster_integration_test.rs @@ -9,7 +9,7 @@ #![cfg(feature = "cluster")] -use aingle_cortex::cluster_init::{ClusterConfig, init_cluster}; +use aingle_cortex::cluster_init::{init_cluster, ClusterConfig}; use aingle_cortex::{CortexConfig, CortexServer}; use std::time::Duration; use tokio::time::sleep; @@ -27,7 +27,10 @@ async fn boot_node( peers: Vec, secret: &str, wal_dir: &str, -) -> (tokio::task::JoinHandle<()>, tokio::sync::watch::Sender) { +) -> ( + tokio::task::JoinHandle<()>, + tokio::sync::watch::Sender, +) { let mut config = CortexConfig::default() .with_host("127.0.0.1") .with_port(port); @@ -71,7 +74,12 @@ async fn boot_node( } /// Gracefully shut down nodes with enough time for Raft to stop. -async fn shutdown_nodes(nodes: Vec<(tokio::task::JoinHandle<()>, tokio::sync::watch::Sender)>) { +async fn shutdown_nodes( + nodes: Vec<( + tokio::task::JoinHandle<()>, + tokio::sync::watch::Sender, + )>, +) { for (_, tx) in &nodes { tx.send(true).ok(); } @@ -105,6 +113,35 @@ async fn wait_for_leader(port: u16, timeout_secs: u64) -> Option { None } +/// Poll a node until a triple with the given subject is visible (replicated), +/// or the timeout elapses. Returns true if the triple appeared. +async fn wait_for_triple(port: u16, subject: &str, timeout_secs: u64) -> bool { + let client = reqwest::Client::new(); + let deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs); + + while tokio::time::Instant::now() < deadline { + if let Ok(resp) = client + .get(format!( + "http://127.0.0.1:{port}/api/v1/triples?subject={subject}" + )) + .send() + .await + { + if resp.status().is_success() { + if let Ok(body) = resp.json::().await { + if let Some(triples) = body["triples"].as_array() { + if !triples.is_empty() { + return true; + } + } + } + } + } + sleep(Duration::from_millis(250)).await; + } + false +} + #[tokio::test] async fn test_single_node_cluster_bootstrap() { let tmp = tempfile::tempdir().unwrap(); @@ -159,17 +196,14 @@ async fn test_single_node_cluster_bootstrap() { resp.status() ); let body: serde_json::Value = resp.json().await.unwrap(); - let triples = body["triples"].as_array().expect("triples field should be an array"); - assert!( - !triples.is_empty(), - "Should find the triple we just wrote" - ); + let triples = body["triples"] + .as_array() + .expect("triples field should be an array"); + assert!(!triples.is_empty(), "Should find the triple we just wrote"); // Verify cluster members endpoint let resp = client - .get(format!( - "http://127.0.0.1:{port}/api/v1/cluster/members" - )) + .get(format!("http://127.0.0.1:{port}/api/v1/cluster/members")) .send() .await .unwrap(); @@ -200,14 +234,7 @@ async fn test_three_node_cluster_replication() { let wal3 = tmp.path().join("wal3"); // Boot node 1 as bootstrap (no peers) - let (h1, tx1) = boot_node( - 1, - port1, - vec![], - secret, - wal1.to_str().unwrap(), - ) - .await; + let (h1, tx1) = boot_node(1, port1, vec![], secret, wal1.to_str().unwrap()).await; // Wait for Raft election (timeout_min=1500ms, give 2s + buffer) sleep(Duration::from_secs(2)).await; @@ -242,9 +269,7 @@ async fn test_three_node_cluster_replication() { // Verify members from the leader let client = reqwest::Client::new(); let resp = client - .get(format!( - "http://127.0.0.1:{port1}/api/v1/cluster/members" - )) + .get(format!("http://127.0.0.1:{port1}/api/v1/cluster/members")) .send() .await .unwrap(); @@ -273,39 +298,15 @@ async fn test_three_node_cluster_replication() { "Write to leader should succeed: {status}" ); - // Wait for replication - sleep(Duration::from_secs(2)).await; - - // Read from follower node 2 — the triple should be replicated - let resp = client - .get(format!( - "http://127.0.0.1:{port2}/api/v1/triples?subject=cluster_test" - )) - .send() - .await - .unwrap(); - assert!(resp.status().is_success(), "Read from node 2 failed: {}", resp.status()); - let body: serde_json::Value = resp.json().await.unwrap(); - let triples = body["triples"].as_array().expect("triples field should be an array"); + // Wait for replication to each follower (poll, don't assume a fixed delay — + // Raft apply latency varies, especially for debug builds on busy machines). assert!( - !triples.is_empty(), - "Follower (node 2) should have the replicated triple" + wait_for_triple(port2, "cluster_test", 15).await, + "Follower (node 2) should have the replicated triple within 15s" ); - - // Also verify from follower node 3 - let resp = client - .get(format!( - "http://127.0.0.1:{port3}/api/v1/triples?subject=cluster_test" - )) - .send() - .await - .unwrap(); - assert!(resp.status().is_success(), "Read from node 3 failed: {}", resp.status()); - let body: serde_json::Value = resp.json().await.unwrap(); - let triples = body["triples"].as_array().expect("triples field should be an array"); assert!( - !triples.is_empty(), - "Follower (node 3) should have the replicated triple" + wait_for_triple(port3, "cluster_test", 15).await, + "Follower (node 3) should have the replicated triple within 15s" ); // Shutdown all nodes @@ -333,25 +334,32 @@ async fn test_cluster_wal_stats() { // Check WAL stats endpoint let resp = client - .get(format!( - "http://127.0.0.1:{port}/api/v1/cluster/wal/stats" - )) + .get(format!("http://127.0.0.1:{port}/api/v1/cluster/wal/stats")) .send() .await .unwrap(); - assert!(resp.status().is_success(), "WAL stats failed: {}", resp.status()); + assert!( + resp.status().is_success(), + "WAL stats failed: {}", + resp.status() + ); let stats: serde_json::Value = resp.json().await.unwrap(); - assert!(stats["segment_count"].is_number(), "segment_count should be a number: {stats}"); + assert!( + stats["segment_count"].is_number(), + "segment_count should be a number: {stats}" + ); // Verify WAL integrity let resp = client - .post(format!( - "http://127.0.0.1:{port}/api/v1/cluster/wal/verify" - )) + .post(format!("http://127.0.0.1:{port}/api/v1/cluster/wal/verify")) .send() .await .unwrap(); - assert!(resp.status().is_success(), "WAL verify failed: {}", resp.status()); + assert!( + resp.status().is_success(), + "WAL verify failed: {}", + resp.status() + ); let verify: serde_json::Value = resp.json().await.unwrap(); assert_eq!(verify["valid"], true, "WAL should be valid: {verify}"); diff --git a/crates/aingle_graph/src/dag/store.rs b/crates/aingle_graph/src/dag/store.rs index 3694df68..b1e8a09f 100644 --- a/crates/aingle_graph/src/dag/store.rs +++ b/crates/aingle_graph/src/dag/store.rs @@ -172,7 +172,10 @@ impl DagStore { affected_idx.entry(triple_id).or_default().push(hash_bytes); } for subject_hash in extract_subject_hashes(&action.payload) { - subject_idx.entry(subject_hash).or_default().push(hash_bytes); + subject_idx + .entry(subject_hash) + .or_default() + .push(hash_bytes); } action_count += 1; } else { @@ -259,10 +262,9 @@ impl DagStore { // Update affected triple index { - let mut idx = self - .affected_index - .write() - .map_err(|_| crate::Error::Storage("DagStore affected index lock poisoned".into()))?; + let mut idx = self.affected_index.write().map_err(|_| { + crate::Error::Storage("DagStore affected index lock poisoned".into()) + })?; for triple_id in extract_affected_triple_ids(&action.payload) { idx.entry(triple_id).or_default().push(hash.0); } @@ -270,10 +272,9 @@ impl DagStore { // Update subject index { - let mut idx = self - .subject_index - .write() - .map_err(|_| crate::Error::Storage("DagStore subject index lock poisoned".into()))?; + let mut idx = self.subject_index.write().map_err(|_| { + crate::Error::Storage("DagStore subject index lock poisoned".into()) + })?; for subject_hash in extract_subject_hashes(&action.payload) { idx.entry(subject_hash).or_default().push(hash.0); } @@ -461,12 +462,21 @@ impl DagStore { return Ok(tips.into_iter().next().unwrap_or(DagActionHash([0; 32]))); } - // Create genesis action + // Create genesis action. + // + // The genesis is intentionally DETERMINISTIC: fixed author, seq, payload + // and a fixed (epoch) timestamp. This guarantees that every fresh node + // computes the *same* genesis hash, which is required for cluster + // replication — a follower must be able to validate that a replicated + // action's parent (the leader's genesis) exists in its own DAG. A + // wall-clock `now()` timestamp here would make each node's genesis hash + // diverge, so cross-node parent validation in `put` would fail. let genesis = DagAction { parents: vec![], author: NodeId::named("aingle:system"), seq: 0, - timestamp: chrono::Utc::now(), + timestamp: chrono::DateTime::from_timestamp(0, 0) + .expect("unix epoch is a valid timestamp"), payload: DagPayload::Genesis { triple_count, description: "Migration from v0.5.0".into(), @@ -531,10 +541,9 @@ impl DagStore { // Update affected triple index { - let mut idx = self - .affected_index - .write() - .map_err(|_| crate::Error::Storage("DagStore affected index lock poisoned".into()))?; + let mut idx = self.affected_index.write().map_err(|_| { + crate::Error::Storage("DagStore affected index lock poisoned".into()) + })?; for triple_id in extract_affected_triple_ids(&action.payload) { idx.entry(triple_id).or_default().push(hash.0); } @@ -542,10 +551,9 @@ impl DagStore { // Update subject index { - let mut idx = self - .subject_index - .write() - .map_err(|_| crate::Error::Storage("DagStore subject index lock poisoned".into()))?; + let mut idx = self.subject_index.write().map_err(|_| { + crate::Error::Storage("DagStore subject index lock poisoned".into()) + })?; for subject_hash in extract_subject_hashes(&action.payload) { idx.entry(subject_hash).or_default().push(hash.0); } @@ -568,10 +576,7 @@ impl DagStore { /// Given the remote's tips, finds all actions in our DAG that are /// ancestors of our tips but NOT ancestors of the remote's tips. /// Returns them in topological order (roots first). - pub fn compute_missing( - &self, - remote_tips: &[DagActionHash], - ) -> crate::Result> { + pub fn compute_missing(&self, remote_tips: &[DagActionHash]) -> crate::Result> { // Our full ancestor set (from our tips) let our_tips = self.tips()?; let mut our_ancestors: HashSet<[u8; 32]> = HashSet::new(); @@ -887,8 +892,7 @@ impl DagStore { /// Collect action hashes older than `seconds` ago (excluding tips). fn collect_older_than(&self, seconds: u64) -> crate::Result> { - let cutoff = chrono::Utc::now() - - chrono::Duration::seconds(seconds as i64); + let cutoff = chrono::Utc::now() - chrono::Duration::seconds(seconds as i64); let entries = self.backend.scan_prefix(ACTION_PREFIX)?; let tips = self .tips @@ -1199,7 +1203,10 @@ mod tests { assert!(genesis.is_genesis()); assert!(matches!( genesis.payload, - DagPayload::Genesis { triple_count: 100, .. } + DagPayload::Genesis { + triple_count: 100, + .. + } )); // Second call returns existing tip @@ -1243,7 +1250,7 @@ mod tests { // CRITICAL: the triple ID computed from a DagPayload must match // the TripleId::from_triple() in the graph's triple store. // If these diverge, history lookups by triple ID silently fail. - use crate::{Triple, TripleId, Predicate, Value}; + use crate::{Predicate, Triple, TripleId, Value}; let subject = "user:alice"; let predicate = "knows"; @@ -1274,7 +1281,7 @@ mod tests { fn test_history_matches_real_triple_id() { // End-to-end: insert via DagStore, then look up history using // the same triple ID that GraphDB.insert() would produce. - use crate::{Triple, TripleId, Predicate, Value}; + use crate::{Predicate, Triple, TripleId, Value}; let store = DagStore::new(); let action = make_action(1, vec![]); @@ -1300,7 +1307,11 @@ mod tests { // Pruning tests // ======================================================================= - fn make_action_at(seq: u64, parents: Vec, ts: chrono::DateTime) -> DagAction { + fn make_action_at( + seq: u64, + parents: Vec, + ts: chrono::DateTime, + ) -> DagAction { DagAction { parents, author: NodeId::named("node:1"), @@ -1451,7 +1462,10 @@ mod tests { assert!(result.checkpoint_hash.is_some()); // Checkpoint action was created - let cp = store.get(&result.checkpoint_hash.unwrap()).unwrap().unwrap(); + let cp = store + .get(&result.checkpoint_hash.unwrap()) + .unwrap() + .unwrap(); assert!(matches!(cp.payload, DagPayload::Compact { .. })); // +1 for the checkpoint assert_eq!(store.action_count(), 2); // 1 retained + 1 checkpoint @@ -1694,7 +1708,10 @@ mod tests { assert!(genesis.is_genesis()); assert!(matches!( genesis.payload, - DagPayload::Genesis { triple_count: 42, .. } + DagPayload::Genesis { + triple_count: 42, + .. + } )); } }