Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 147 additions & 14 deletions crates/aingle_cortex/src/cluster_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ impl ClusterConfig {
}
"--cluster-peers" => {
if i + 1 < args.len() {
cfg.peers =
args[i + 1].split(',').map(|s| s.trim().to_string()).collect();
cfg.peers = args[i + 1]
.split(',')
.map(|s| s.trim().to_string())
.collect();
i += 1;
}
}
Expand Down Expand Up @@ -353,6 +355,134 @@ pub fn build_tls_server_config(
Ok(config)
}

/// Ensure the Semantic DAG is enabled and ready for *signed* writes.
///
/// Enables the DAG on the graph (persistent when `db_path` is a real path,
/// in-memory otherwise), initializes genesis, sets the DAG author from the
/// cluster node id when present, and loads/generates the Ed25519 signing key.
///
/// This lives in the library (not only in `main.rs`) so that library consumers
/// and the cluster integration tests — which call [`init_cluster`] directly,
/// bypassing `main()` — get a fully-initialized, signable DAG. Without it,
/// cluster DAG writes are emitted unsigned and rejected with
/// "DagAction rejected: missing Ed25519 signature".
#[cfg(feature = "dag")]
pub async fn ensure_dag_ready(state: &mut crate::state::AppState, db_path: Option<&str>) {
// Enable DAG on the GraphDB (persistent for Sled, in-memory otherwise).
{
let mut graph = state.graph.write().await;
match db_path {
Some(p) if p != ":memory:" => {
graph.enable_dag_persistent(p).unwrap_or_else(|e| {
panic!(
"Failed to enable persistent DAG at '{}': {e}. \
Refusing to start with volatile DAG — fix the storage path or permissions.",
p
);
});
tracing::info!("DAG persistence enabled (Sled)");
}
_ => {
tracing::warn!("DAG using in-memory backend — data will NOT survive restarts");
graph.enable_dag();
}
}
let triple_count = graph.count();
if let Some(dag_store) = graph.dag_store() {
match dag_store.init_or_migrate(triple_count) {
Ok(genesis_hash) => {
tracing::info!(
hash = %genesis_hash,
triples = triple_count,
"DAG initialized (genesis)"
);
}
Err(e) => {
tracing::error!("DAG initialization failed: {e}");
}
}
}
}

// Set DAG author from cluster node ID (no-op when not in cluster mode).
if let Some(node_id) = state.cluster_node_id {
state.dag_author = Some(aingle_graph::NodeId::named(&format!("node:{}", node_id)));
}

// Initialize the Ed25519 signing key for DAG actions.
// Persistent nodes reuse the `node.key` seed next to the DB; in-memory
// nodes generate an ephemeral key.
let key = match db_path {
Some(p) if p != ":memory:" => {
let key_path = std::path::Path::new(p)
.parent()
.unwrap_or(std::path::Path::new("."))
.join("node.key");
if key_path.exists() {
match std::fs::read(&key_path) {
Ok(seed) if seed.len() == 32 => {
let mut arr = [0u8; 32];
arr.copy_from_slice(&seed);
Some(aingle_graph::dag::DagSigningKey::from_seed(&arr))
}
_ => None,
}
} else {
let key = aingle_graph::dag::DagSigningKey::generate();
let seed = key.seed();
if let Some(parent) = key_path.parent() {
std::fs::create_dir_all(parent).ok();
}
#[cfg(unix)]
{
use std::io::Write;
use std::os::unix::fs::OpenOptionsExt;
match std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.mode(0o600)
.open(&key_path)
{
Ok(mut f) => {
if let Err(e) = f.write_all(&seed).and_then(|_| f.sync_all()) {
tracing::error!("Failed to persist DAG signing key: {e}");
}
}
Err(e) => {
tracing::error!(
"Failed to open DAG key file {}: {e}",
key_path.display()
);
}
}
}
#[cfg(not(unix))]
{
if let Err(e) = std::fs::write(&key_path, &seed) {
tracing::error!("Failed to persist DAG signing key: {e}");
}
}
Some(key)
}
}
_ => {
// In-memory mode: generate ephemeral key.
Some(aingle_graph::dag::DagSigningKey::generate())
}
};

if let Some(ref k) = key {
tracing::info!(
public_key = %k.public_key_hex(),
"DAG signing key loaded (Ed25519)"
);
}
state.dag_signing_key = key.map(std::sync::Arc::new);

tracing::info!("Semantic DAG v0.6.0 enabled");
}

/// Initialize the Raft cluster on a `CortexServer`.
///
/// This sets up the WAL, state machine, network factory, and Raft instance.
Expand Down Expand Up @@ -401,10 +531,7 @@ pub async fn init_cluster(
)
.await;

let rpc_sender = std::sync::Arc::new(HttpRaftRpcSender::new(
config.secret.clone(),
config.tls,
));
let rpc_sender = std::sync::Arc::new(HttpRaftRpcSender::new(config.secret.clone(), config.tls));
let network = aingle_raft::network::CortexNetworkFactory::new(resolver, rpc_sender);

let raft_config = openraft::Config {
Expand Down Expand Up @@ -525,8 +652,7 @@ pub async fn init_cluster(
break;
}
let base = std::time::Duration::from_secs(2u64.pow(attempt.min(5)));
let jitter =
std::time::Duration::from_millis(rand::random::<u64>() % 1000);
let jitter = std::time::Duration::from_millis(rand::random::<u64>() % 1000);
let backoff = base + jitter;
tracing::warn!(attempt, "Join failed, retrying in {:?}", backoff);
tokio::time::sleep(backoff).await;
Expand All @@ -536,18 +662,25 @@ pub async fn init_cluster(

// Set up TLS server config if cluster TLS is enabled
if config.tls {
let tls_config = build_tls_server_config(
config.tls_cert.as_deref(),
config.tls_key.as_deref(),
)?;
server.state_mut().tls_server_config =
Some(std::sync::Arc::new(tls_config));
let tls_config =
build_tls_server_config(config.tls_cert.as_deref(), config.tls_key.as_deref())?;
server.state_mut().tls_server_config = Some(std::sync::Arc::new(tls_config));
tracing::info!("Cluster TLS enabled for inter-node communication");
}

server.state_mut().raft = Some(raft);
server.state_mut().cluster_node_id = Some(node_id);
tracing::info!(node_id, "Raft consensus initialized");

// Enable + sign the DAG on this node. The binary's `main()` does this for
// standalone mode; cluster nodes (including library/test callers that reach
// here without going through `main()`) must do it too, otherwise DAG writes
// are unsigned and rejected by the Raft state machine.
#[cfg(feature = "dag")]
{
let db_path = server.config().db_path.clone();
ensure_dag_ready(server.state_mut(), db_path.as_deref()).await;
}

Ok(())
}
145 changes: 14 additions & 131 deletions crates/aingle_cortex/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,124 +151,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
}

// Initialize DAG if enabled: enable DAG on the graph, create genesis if needed
// Initialize the Semantic DAG (enable, genesis, author, signing key).
// In cluster mode this was already done inside `init_cluster`, so only the
// standalone path needs to do it here — avoiding a double initialization.
#[cfg(feature = "dag")]
{
let state = server.state_mut();

// Enable DAG on the GraphDB (persistent for Sled, in-memory otherwise)
{
let mut graph = state.graph.write().await;
match &db_path {
Some(p) if p != ":memory:" => {
graph.enable_dag_persistent(p).unwrap_or_else(|e| {
panic!(
"Failed to enable persistent DAG at '{}': {e}. \
Refusing to start with volatile DAG — fix the storage path or permissions.",
p
);
});
tracing::info!("DAG persistence enabled (Sled)");
}
_ => {
tracing::warn!("DAG using in-memory backend — data will NOT survive restarts");
graph.enable_dag();
}
}
let triple_count = graph.count();
if let Some(dag_store) = graph.dag_store() {
match dag_store.init_or_migrate(triple_count) {
Ok(genesis_hash) => {
tracing::info!(
hash = %genesis_hash,
triples = triple_count,
"DAG initialized (genesis)"
);
}
Err(e) => {
tracing::error!("DAG initialization failed: {e}");
}
}
}
}

// Set DAG author from cluster node ID
#[cfg(feature = "cluster")]
if let Some(node_id) = state.cluster_node_id {
state.dag_author = Some(aingle_graph::NodeId::named(&format!("node:{}", node_id)));
}

// Initialize Ed25519 signing key for DAG actions.
// Reuses the same node.key seed as P2P identity (deterministic).
{
let key = match &db_path {
Some(p) if p != ":memory:" => {
let key_path = std::path::Path::new(p)
.parent()
.unwrap_or(std::path::Path::new("."))
.join("node.key");
if key_path.exists() {
match std::fs::read(&key_path) {
Ok(seed) if seed.len() == 32 => {
let mut arr = [0u8; 32];
arr.copy_from_slice(&seed);
Some(aingle_graph::dag::DagSigningKey::from_seed(&arr))
}
_ => None,
}
} else {
// Generate new key and persist
let key = aingle_graph::dag::DagSigningKey::generate();
let seed = key.seed();
if let Some(parent) = key_path.parent() {
std::fs::create_dir_all(parent).ok();
}
#[cfg(unix)]
{
use std::io::Write;
use std::os::unix::fs::OpenOptionsExt;
match std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.mode(0o600)
.open(&key_path)
{
Ok(mut f) => {
if let Err(e) = f.write_all(&seed).and_then(|_| f.sync_all()) {
tracing::error!("Failed to persist DAG signing key: {e}");
}
}
Err(e) => {
tracing::error!("Failed to open DAG key file {}: {e}", key_path.display());
}
}
}
#[cfg(not(unix))]
{
if let Err(e) = std::fs::write(&key_path, &seed) {
tracing::error!("Failed to persist DAG signing key: {e}");
}
}
Some(key)
}
}
_ => {
// In-memory mode: generate ephemeral key
Some(aingle_graph::dag::DagSigningKey::generate())
}
};

if let Some(ref k) = key {
tracing::info!(
public_key = %k.public_key_hex(),
"DAG signing key loaded (Ed25519)"
);
}
state.dag_signing_key = key.map(std::sync::Arc::new);
}

tracing::info!("Semantic DAG v0.6.0 enabled");
if !cluster_config.enabled {
aingle_cortex::cluster_init::ensure_dag_ready(server.state_mut(), db_path.as_deref()).await;
}

// Spawn periodic flush task if enabled
Expand All @@ -277,8 +165,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let flush_dir = snapshot_dir.clone();
let interval_secs = flush_interval_secs;
tokio::spawn(async move {
let mut interval =
tokio::time::interval(std::time::Duration::from_secs(interval_secs));
let mut interval = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
interval.tick().await; // skip immediate tick
loop {
interval.tick().await;
Expand All @@ -289,10 +176,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
});
tracing::info!(
interval_secs = interval_secs,
"Periodic auto-flush enabled"
);
tracing::info!(interval_secs = interval_secs, "Periodic auto-flush enabled");
}

// Keep a reference to the state for shutdown flush
Expand Down Expand Up @@ -347,12 +231,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(feature = "cluster")]
if let Some(ref raft) = state_for_shutdown.raft {
tracing::info!("Shutting down Raft...");
match tokio::time::timeout(
std::time::Duration::from_secs(10),
raft.shutdown(),
)
.await
{
match tokio::time::timeout(std::time::Duration::from_secs(10), raft.shutdown()).await {
Ok(Ok(())) => tracing::info!("Raft shut down gracefully"),
Ok(Err(e)) => tracing::error!("Raft shutdown error: {e}"),
Err(_) => tracing::error!("Raft shutdown timed out after 10s"),
Expand Down Expand Up @@ -385,7 +264,9 @@ fn print_help() {
println!(" -h, --host <HOST> Host to bind to (default: 127.0.0.1)");
println!(" -p, --port <PORT> Port to listen on (default: 19090)");
println!(" --public Bind to all interfaces (0.0.0.0)");
println!(" --db <PATH> Path to graph database (default: ~/.aingle/cortex/graph.sled)");
println!(
" --db <PATH> Path to graph database (default: ~/.aingle/cortex/graph.sled)"
);
println!(" --memory Use volatile in-memory storage (no persistence)");
println!(" --flush-interval <S> Periodic flush interval in seconds (default: 300, 0=off)");
println!(" -V, --version Print version and exit");
Expand All @@ -403,7 +284,9 @@ fn print_help() {
println!(" --cluster-node-id <ID> Unique node ID (u64, required)");
println!(" --cluster-peers <ADDRS> Comma-separated peer REST addresses");
println!(" --cluster-wal-dir <DIR> WAL directory (default: wal/)");
println!(" --cluster-secret <SECRET> Shared secret for internal RPC auth (min 16 bytes)");
println!(
" --cluster-secret <SECRET> Shared secret for internal RPC auth (min 16 bytes)"
);
println!(" --cluster-tls Enable TLS for inter-node communication");
println!(" --cluster-tls-cert <PATH> TLS certificate PEM file");
println!(" --cluster-tls-key <PATH> TLS private key PEM file");
Expand Down
Loading
Loading