diff --git a/Cargo.lock b/Cargo.lock index c711557b..24c6f8e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,8 +168,10 @@ dependencies = [ "rcgen", "regex", "reqwest", + "rmcp", "rustls", "rustls-pemfile", + "schemars", "serde", "serde_json", "sled", @@ -5394,6 +5396,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pastey" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ee67f1008b1ba2321834326597b8e186293b049a023cdef258527550b9935b4" + [[package]] name = "peel-off" version = "0.1.1" @@ -6287,6 +6295,42 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "rmcp" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0810a9f717d9828f475fe1f629f4c305c8464b7f496c3a854b58d29e65f4058e" +dependencies = [ + "async-trait", + "base64", + "chrono", + "futures", + "pastey", + "pin-project-lite", + "rmcp-macros", + "schemars", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", +] + +[[package]] +name = "rmcp-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aefac48c364756e97f04c0401ba3231e8607882c7c1d92da0437dc16307904d" +dependencies = [ + "darling 0.23.0", + "proc-macro2", + "quote", + "serde_json", + "syn 2.0.117", +] + [[package]] name = "rocksdb" version = "0.24.0" @@ -6584,12 +6628,26 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" dependencies = [ + "chrono", "dyn-clone", "ref-cast", + "schemars_derive", "serde", "serde_json", ] +[[package]] +name = "schemars_derive" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d115b50f4aaeea07e79c1912f645c7513d81715d0420f8bc77a18c6260b307f" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn 2.0.117", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -6735,6 +6793,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "serde_json" version = "1.0.149" diff --git a/README.md b/README.md index f635ccc1..87ca4ac5 100644 --- a/README.md +++ b/README.md @@ -395,6 +395,46 @@ aingle-minimal run --rest-port 19080 --- +## MCP Server + +The Cortex exposes the AIngle semantic graph to MCP clients like Claude Code and Claude Desktop over the Model Context Protocol (stdio), letting agents query, write, and verify graph data through tool calls. + +### Build + +```bash +cargo build -p aingle_cortex --features "mcp dag" --release +``` + +### Client configuration + +Add to `claude_desktop_config.json` (Claude Desktop) or `.mcp.json` (Claude Code): + +```json +{ + "mcpServers": { + "aingle": { + "command": "aingle-cortex", + "args": ["--mcp", "--db", "./data/graph.sled"] + } + } +} +``` + +Replace `--db ` with `--memory` for an ephemeral, in-memory graph. + +### Available tools + +- `aingle_ping` — liveness check +- `aingle_query_pattern` — query the semantic graph by triple pattern +- `aingle_graph_stats` — graph statistics +- `aingle_create_triple` — insert a triple (write) +- `aingle_verify_proof` — verify a zero-knowledge proof (returns `valid: false` for invalid proofs) +- `aingle_dag_history` — signed DAG provenance history of a subject (requires the `dag` feature) + +> stdout is reserved for the JSON-RPC stream; logs are written to stderr. + +--- + ## Contributing We welcome contributions from the community. diff --git a/crates/aingle_cortex/Cargo.toml b/crates/aingle_cortex/Cargo.toml index 8aceaaf7..f22e1d50 100644 --- a/crates/aingle_cortex/Cargo.toml +++ b/crates/aingle_cortex/Cargo.toml @@ -22,6 +22,7 @@ p2p = ["dep:quinn", "dep:rustls", "dep:rcgen", "dep:ed25519-dalek", "dep:hex"] p2p-mdns = ["p2p", "dep:mdns-sd", "dep:if-addrs"] cluster = ["p2p", "dep:aingle_wal", "dep:aingle_raft", "dep:openraft", "dep:tokio-rustls", "dep:rustls-pemfile"] dag = ["cluster", "aingle_graph/dag", "aingle_graph/dag-sign", "aingle_raft/dag"] +mcp = ["dep:rmcp", "dep:schemars"] full = ["rest", "graphql", "sparql", "auth", "dag"] [[bin]] @@ -47,6 +48,10 @@ async-graphql-axum = { version = "8.0.0-rc", optional = true } # SPARQL (optional) spargebra = { version = "0.4", optional = true } +# MCP server (optional) — Model Context Protocol over stdio +rmcp = { version = "1.7", features = ["server", "transport-io", "macros"], optional = true } +schemars = { version = "1.0", optional = true } + # Authentication (optional) jsonwebtoken = { version = "10", features = ["rust_crypto"], optional = true } argon2 = { version = "0.5", optional = true } @@ -111,3 +116,6 @@ if-addrs = { version = "0.13", optional = true } tempfile = "3.26" reqwest = { version = "0.12", features = ["json"] } tokio-test = "0.4" +# Enable the rmcp `client` feature for the in-process MCP integration test. +# This is dev-only; the production `mcp` feature uses the server side only. +rmcp = { version = "1.7", features = ["client", "transport-io"] } diff --git a/crates/aingle_cortex/src/lib.rs b/crates/aingle_cortex/src/lib.rs index dd2b99b3..4372feed 100644 --- a/crates/aingle_cortex/src/lib.rs +++ b/crates/aingle_cortex/src/lib.rs @@ -164,21 +164,24 @@ #[cfg(feature = "auth")] pub mod auth; pub mod client; +#[cfg(feature = "cluster")] +pub mod cluster_init; pub mod error; -pub mod wasm_types; #[cfg(feature = "graphql")] pub mod graphql; +#[cfg(feature = "mcp")] +pub mod mcp; pub mod middleware; +#[cfg(feature = "p2p")] +pub mod p2p; pub mod proofs; pub mod rest; pub mod server; +pub mod service; #[cfg(feature = "sparql")] pub mod sparql; pub mod state; -#[cfg(feature = "p2p")] -pub mod p2p; -#[cfg(feature = "cluster")] -pub mod cluster_init; +pub mod wasm_types; pub use client::{CortexClientConfig, CortexInternalClient}; pub use error::{Error, Result}; diff --git a/crates/aingle_cortex/src/main.rs b/crates/aingle_cortex/src/main.rs index b017594a..fb7b71d1 100644 --- a/crates/aingle_cortex/src/main.rs +++ b/crates/aingle_cortex/src/main.rs @@ -6,17 +6,27 @@ //! REST/GraphQL/SPARQL interface for AIngle semantic graphs. use aingle_cortex::{CortexConfig, CortexServer}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer}; #[tokio::main] async fn main() -> Result<(), Box> { + // In MCP mode, stdout is reserved for the JSON-RPC stream, so all logging + // must be redirected to stderr. Detect the flag before subscriber init. + let mcp_mode = std::env::args().any(|a| a == "--mcp"); + // Initialize logging + let filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "aingle_cortex=info,tower_http=debug".into()); + let fmt_layer = if mcp_mode { + tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr) + .boxed() + } else { + tracing_subscriber::fmt::layer().boxed() + }; tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "aingle_cortex=info,tower_http=debug".into()), - ) - .with(tracing_subscriber::fmt::layer()) + .with(filter) + .with(fmt_layer) .init(); // Parse command line arguments @@ -61,6 +71,9 @@ async fn main() -> Result<(), Box> { "--memory" => { config.db_path = Some(":memory:".to_string()); } + "--mcp" => { + config.mcp_mode = true; + } "--flush-interval" => { if i + 1 < args.len() { config.flush_interval_secs = args[i + 1].parse().unwrap_or(300); @@ -76,6 +89,14 @@ async fn main() -> Result<(), Box> { i += 1; } + // If --mcp was requested but the binary was built without the `mcp` feature, + // fail loudly instead of silently falling through to the TCP REST server. + #[cfg(not(feature = "mcp"))] + if config.mcp_mode { + eprintln!("error: --mcp requires building with the `mcp` feature: cargo build -p aingle_cortex --features mcp"); + std::process::exit(2); + } + // Parse P2P flags (feature-gated at compile time). #[cfg(feature = "p2p")] let p2p_config = { @@ -159,6 +180,14 @@ async fn main() -> Result<(), Box> { aingle_cortex::cluster_init::ensure_dag_ready(server.state_mut(), db_path.as_deref()).await; } + // MCP mode: serve over stdio instead of binding a TCP listener. + #[cfg(feature = "mcp")] + if server.config().mcp_mode { + let state = server.state().clone(); + aingle_cortex::mcp::serve_stdio(state).await?; + return Ok(()); + } + // Spawn periodic flush task if enabled if flush_interval_secs > 0 { let flush_state = server.state().clone(); @@ -269,6 +298,7 @@ fn print_help() { ); println!(" --memory Use volatile in-memory storage (no persistence)"); println!(" --flush-interval Periodic flush interval in seconds (default: 300, 0=off)"); + println!(" --mcp Serve MCP over stdio (requires --features mcp)"); println!(" -V, --version Print version and exit"); println!(" --help Print this help message"); println!(); diff --git a/crates/aingle_cortex/src/mcp/convert.rs b/crates/aingle_cortex/src/mcp/convert.rs new file mode 100644 index 00000000..dfe550ab --- /dev/null +++ b/crates/aingle_cortex/src/mcp/convert.rs @@ -0,0 +1,35 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Map cortex errors into MCP tool errors. + +use crate::error::Error; +use rmcp::model::ErrorData as McpError; + +/// Convert a cortex `Error` into an MCP error suitable for a failed tool result. +/// +/// `InvalidInput` maps to the JSON-RPC `invalid_params` code; every other +/// variant falls through to `internal_error` carrying the error's display text. +pub fn to_mcp_error(err: Error) -> McpError { + match err { + Error::InvalidInput(msg) => McpError::invalid_params(msg, None), + other => McpError::internal_error(other.to_string(), None), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn invalid_input_maps_to_invalid_params() { + let e = to_mcp_error(Error::InvalidInput("bad".into())); + assert_eq!(e.message.as_ref(), "bad"); + } + + #[test] + fn other_maps_to_internal_error() { + let e = to_mcp_error(Error::Internal("boom".into())); + assert!(e.message.contains("boom")); + } +} diff --git a/crates/aingle_cortex/src/mcp/mod.rs b/crates/aingle_cortex/src/mcp/mod.rs new file mode 100644 index 00000000..48963423 --- /dev/null +++ b/crates/aingle_cortex/src/mcp/mod.rs @@ -0,0 +1,38 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Model Context Protocol (MCP) server for AIngle Córtex. +//! +//! Exposes the Córtex business-logic layer over MCP via a stdio transport, +//! so that MCP-capable clients (e.g. Claude Desktop, IDE agents) can interact +//! with AIngle semantic graphs as tools. +//! +//! stdout is reserved for the JSON-RPC stream; all logging must go to stderr. + +mod convert; +mod server; + +pub use server::AingleMcp; + +use crate::state::AppState; + +/// Serves the MCP server over stdio until the client disconnects. +/// +/// stdout carries the JSON-RPC message stream; logging is expected to be +/// redirected to stderr by the caller before this is invoked. +pub async fn serve_stdio(state: AppState) -> crate::error::Result<()> { + use rmcp::transport::stdio; + use rmcp::ServiceExt; + + let service = AingleMcp::new(state) + .serve(stdio()) + .await + .map_err(|e| crate::error::Error::Internal(format!("MCP serve error: {e}")))?; + + service + .waiting() + .await + .map_err(|e| crate::error::Error::Internal(format!("MCP wait error: {e}")))?; + + Ok(()) +} diff --git a/crates/aingle_cortex/src/mcp/server.rs b/crates/aingle_cortex/src/mcp/server.rs new file mode 100644 index 00000000..f72abdea --- /dev/null +++ b/crates/aingle_cortex/src/mcp/server.rs @@ -0,0 +1,551 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! The `AingleMcp` MCP server handler and its tool router. + +use rmcp::handler::server::router::tool::ToolRouter; +use rmcp::handler::server::wrapper::Parameters; +use rmcp::model::{CallToolResult, Content, ErrorData, ServerCapabilities, ServerInfo}; +use rmcp::{tool, tool_handler, tool_router, ServerHandler}; + +use crate::state::AppState; + +/// Parameters for the `aingle_dag_history` tool. +#[cfg(feature = "dag")] +#[derive(serde::Deserialize, schemars::JsonSchema)] +pub struct DagHistoryParams { + /// Subject IRI whose mutation history to fetch. + pub subject: String, + /// Max actions to return. + #[serde(default = "default_hist_limit")] + pub limit: usize, +} + +#[cfg(feature = "dag")] +fn default_hist_limit() -> usize { + 50 +} + +/// Parameters for the `aingle_dag_action` tool. +#[cfg(feature = "dag")] +#[derive(serde::Deserialize, schemars::JsonSchema)] +pub struct DagActionParams { + /// Hex-encoded DAG action hash to fetch. + pub hash: String, +} + +/// Parameters for the `aingle_dag_chain` tool. +#[cfg(feature = "dag")] +#[derive(serde::Deserialize, schemars::JsonSchema)] +pub struct DagChainParams { + /// Author identity whose action chain to fetch. + pub author: String, + /// Max actions to return. + #[serde(default = "default_hist_limit")] + pub limit: usize, +} + +/// MCP server exposing AIngle Córtex capabilities as tools. +/// +/// Wraps the shared [`AppState`] so tools can operate on the same graph, +/// proof store, and DAG as the REST/GraphQL surfaces. +#[derive(Clone)] +pub struct AingleMcp { + pub(crate) state: AppState, + #[allow(dead_code)] + tool_router: ToolRouter, +} + +#[tool_router] +impl AingleMcp { + /// Creates a new MCP handler bound to the given shared application state. + pub fn new(state: AppState) -> Self { + // Start from the core tool router. The dag-gated tools live in a + // separate `#[tool_router(router = dag_tool_router)]` block so that the + // macro never references them when the `dag` feature is off (keeping + // `mcp` compilable standalone). Merge them in only when `dag` is on. + #[allow(unused_mut)] + let mut router = Self::tool_router(); + #[cfg(feature = "dag")] + { + router += Self::dag_tool_router(); + } + // The sparql-gated tool likewise lives in its own + // `#[tool_router(router = sparql_tool_router)]` block so the macro on the + // core impl never references it when `sparql` is off. Merge it only when + // `sparql` is on (it is in `default`, but `mcp` must compile without it). + #[cfg(feature = "sparql")] + { + router += Self::sparql_tool_router(); + } + Self { + state, + tool_router: router, + } + } + + /// Liveness probe tool. + #[tool(description = "Liveness check; returns 'pong'.")] + async fn aingle_ping(&self) -> String { + "pong".to_string() + } + + /// Query the semantic graph by triple pattern (any field omitted = wildcard). + #[tool( + description = "Query the semantic graph by triple pattern. Omit a field to wildcard it.", + annotations(read_only_hint = true) + )] + async fn aingle_query_pattern( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::query::query_pattern(&self.state, req, None) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// List unique subjects in the graph, optionally filtered by predicate. + #[tool( + description = "List unique subjects in the semantic graph, optionally filtered by predicate.", + annotations(read_only_hint = true) + )] + async fn aingle_list_subjects( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::query::list_subjects(&self.state, req, None) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// List unique predicates in the graph, optionally filtered by subject. + #[tool( + description = "List unique predicates in the semantic graph, optionally filtered by subject.", + annotations(read_only_hint = true) + )] + async fn aingle_list_predicates( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::query::list_predicates(&self.state, req, None) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Insert a triple (subject, predicate, object) into the graph. + /// + /// Mutation: not read-only. Non-destructive (it never removes or overwrites + /// existing data). NOT idempotent: the graph keys triples by content hash, + /// so inserting a triple that already exists (same content hash) returns an + /// error rather than silently succeeding — a retried call may therefore fail. + #[tool( + description = "Insert a triple into the semantic graph. Mutates the graph.", + annotations( + read_only_hint = false, + destructive_hint = false, + idempotent_hint = false + ) + )] + async fn aingle_create_triple( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let dto = crate::service::triples::create_triple(&self.state, req, None) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(dto)?])) + } + + /// Atomically bulk-insert triples into the graph. + /// + /// Mutation: not read-only. Non-destructive (only adds rows; never removes or + /// overwrites). Idempotent: batch insert silently skips triples whose content + /// hash already exists (see `GraphStore::insert_batch`), so retrying the same + /// batch converges to the same state without error. + #[tool( + description = "Atomically bulk-insert triples into the semantic graph. Duplicates are skipped silently.", + annotations( + read_only_hint = false, + destructive_hint = false, + idempotent_hint = true + ) + )] + async fn aingle_batch_insert( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::triples::batch_insert(&self.state, req, None) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Fetch a single triple by its hex hash id. + #[tool( + description = "Fetch a single triple by its hex hash id.", + annotations(read_only_hint = true) + )] + async fn aingle_get_triple( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let dto = crate::service::triples::get_triple(&self.state, &req.id) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(dto)?])) + } + + /// Delete a triple by its hex hash id. + /// + /// Mutation: not read-only. Destructive (removes data). Idempotent: deleting + /// an absent id is reported as not-found, but the resulting state (the triple + /// no longer present) is the same on retry. + #[tool( + description = "Delete a triple from the semantic graph by its hex hash id.", + annotations( + read_only_hint = false, + destructive_hint = true, + idempotent_hint = true + ) + )] + async fn aingle_delete_triple( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + crate::service::triples::delete_triple(&self.state, &req.id, None) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json( + serde_json::json!({ "deleted": true, "id": req.id }), + )?])) + } + + /// List triples with optional subject/predicate filters and pagination. + #[tool( + description = "List triples with optional subject/predicate filters and pagination.", + annotations(read_only_hint = true) + )] + async fn aingle_list_triples( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::triples::list_triples(&self.state, req, None) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Return graph statistics (triple count and related metrics). + #[tool( + description = "Return graph statistics: triple count and related metrics.", + annotations(read_only_hint = true) + )] + async fn aingle_graph_stats(&self) -> Result { + let resp = crate::service::stats::graph_stats(&self.state) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Verify a stored proof by ID; returns {valid: bool, ...}. + /// + /// Read-only. Invalid/malformed proofs return `valid:false` (NOT an error); + /// only a missing proof yields an error. + #[tool( + description = "Verify a cryptographic/ZK proof by ID. Returns valid:false for invalid proofs (not an error).", + annotations(read_only_hint = true) + )] + async fn aingle_verify_proof( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::proof::verify_proof(&self.state, req) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Fetch a stored proof by ID; returns its metadata. + /// + /// Read-only. A missing proof yields an error. + #[tool( + description = "Fetch a stored cryptographic/ZK proof by ID. Errors if the proof does not exist.", + annotations(read_only_hint = true) + )] + async fn aingle_get_proof( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::proof::get_proof(&self.state, req) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Validate a semantic skill manifest against PoL rules. + /// + /// Read-only: validation never mutates state. Returns `{valid, errors}`; + /// a manifest with unsatisfiable proof requirements yields `valid:false` + /// with per-assertion error messages (not a tool error). + #[tool( + description = "Validate a semantic skill manifest against PoL rules. Returns {valid, errors}; does not mutate.", + annotations(read_only_hint = true) + )] + async fn aingle_validate_skill( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::skill::validate_manifest(&self.state, req).await; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Create a temporary sandbox namespace for skill verification. + /// + /// Mutation: not read-only. Non-destructive (only registers new sandbox + /// state; never removes or overwrites). Each call mints a fresh sandbox id, + /// so it is not marked idempotent. + #[tool( + description = "Create a temporary sandbox namespace for skill testing. Returns {id, namespace}.", + annotations(read_only_hint = false, destructive_hint = false) + )] + async fn aingle_sandbox_create( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::skill::create_sandbox(&self.state, req).await; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Delete a sandbox namespace by id, removing all triples under it. + /// + /// Mutation: not read-only. Destructive (removes the sandbox and its + /// triples). Idempotent: deleting an absent id reports `deleted:false`, but + /// the resulting state (sandbox gone) is the same on retry. + #[tool( + description = "Delete a sandbox namespace by id, removing all triples under it. Unknown id => deleted:false.", + annotations( + read_only_hint = false, + destructive_hint = true, + idempotent_hint = true + ) + )] + async fn aingle_sandbox_delete( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::skill::delete_sandbox(&self.state, &req.id).await; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Compute an agent's assertion consistency score. + /// + /// Read-only: inspects the graph + logic engine; never mutates. An unknown + /// agent returns a well-formed default ({score:0.0, total:0, verified:0}), + /// not an error. + #[tool( + description = "Compute an agent's assertion consistency score (fraction of its assertions that pass PoL validation). Unknown agent => score 0.0.", + annotations(read_only_hint = true) + )] + async fn aingle_agent_consistency( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = + crate::service::reputation::agent_consistency(&self.state, &req.agent_id, None).await; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Batch-verify assertion proofs (subject+predicate references). + /// + /// Read-only: verification never mutates. Missing/unknown assertions report + /// `verified:false` per entry rather than erroring. + #[tool( + description = "Batch-verify assertion proofs by (subject, predicate). Returns a per-assertion verified flag; unknown assertions => verified:false (not an error).", + annotations(read_only_hint = true) + )] + async fn aingle_verify_assertions_batch( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = + crate::service::reputation::batch_verify_assertions(&self.state, req, None).await; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Validate triple(s) against the PoL logic engine. + /// + /// Read-only: validation never mutates the graph. Returns per-triple + /// validity + messages and an overall `valid` flag; an invalid triple yields + /// `valid:false` (not a tool error). + #[tool( + description = "Validate triple(s) against the PoL logic engine. Returns {valid, results, proof_hash}; invalid triples yield valid:false (not an error). Does not mutate.", + annotations(read_only_hint = true) + )] + async fn aingle_validate( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::validate::validate_triples(&self.state, req, None) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } +} + +/// Dag-gated tools, kept in a separate router so the `#[tool_router]` macro on +/// the core impl never references them when `dag` is off. The combined router +/// is assembled in [`AingleMcp::new`]. +#[cfg(feature = "dag")] +#[tool_router(router = dag_tool_router)] +impl AingleMcp { + /// Inspect the signed DAG provenance history of a subject (who changed what, newest first). + #[tool( + description = "Return the signed DAG provenance history of a subject (newest first).", + annotations(read_only_hint = true) + )] + async fn aingle_dag_history( + &self, + params: Parameters, + ) -> Result { + let Parameters(p) = params; + let h = crate::service::dag::history_by_subject(&self.state, &p.subject, p.limit) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(h)?])) + } + + /// Return the current DAG tip hashes and their count. + #[tool( + description = "Return the current DAG tip hashes (frontier) and their count.", + annotations(read_only_hint = true) + )] + async fn aingle_dag_tips(&self) -> Result { + let resp = crate::service::dag::tips(&self.state) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Fetch a single DAG action by its hex hash. + #[tool( + description = "Fetch a single DAG action by its hex hash.", + annotations(read_only_hint = true) + )] + async fn aingle_dag_action( + &self, + params: Parameters, + ) -> Result { + let Parameters(p) = params; + let resp = crate::service::dag::action(&self.state, &p.hash) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Return an author's DAG action chain, newest first. + #[tool( + description = "Return an author's DAG action chain (newest first), up to limit.", + annotations(read_only_hint = true) + )] + async fn aingle_dag_chain( + &self, + params: Parameters, + ) -> Result { + let Parameters(p) = params; + let resp = crate::service::dag::chain(&self.state, &p.author, p.limit) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Return DAG statistics: action count and tip count. + #[tool( + description = "Return DAG statistics: action count and tip count.", + annotations(read_only_hint = true) + )] + async fn aingle_dag_stats(&self) -> Result { + let resp = crate::service::dag::stats(&self.state) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } + + /// Prune the DAG according to a retention policy. + /// + /// Mutation: not read-only. Destructive (removes actions). NOT idempotent: + /// a second call against an already-pruned DAG yields a different result. + #[tool( + description = "Prune the DAG per a retention policy (keep_all/keep_since/keep_last/keep_depth). Destructive.", + annotations( + read_only_hint = false, + destructive_hint = true, + idempotent_hint = false + ) + )] + async fn aingle_dag_prune( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::dag::prune(&self.state, req) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } +} + +/// Sparql-gated tools, kept in a separate router so the `#[tool_router]` macro +/// on the core impl never references them when `sparql` is off. The combined +/// router is assembled in [`AingleMcp::new`]. +#[cfg(feature = "sparql")] +#[tool_router(router = sparql_tool_router)] +impl AingleMcp { + /// Run a SPARQL query against the semantic graph. + #[tool( + description = "Execute a SPARQL query (SELECT/CONSTRUCT/ASK) against the semantic graph.", + annotations(read_only_hint = true) + )] + async fn aingle_sparql( + &self, + params: Parameters, + ) -> Result { + let Parameters(req) = params; + let resp = crate::service::sparql::execute(&self.state, req) + .await + .map_err(super::convert::to_mcp_error)?; + Ok(CallToolResult::success(vec![Content::json(resp)?])) + } +} + +#[tool_handler(router = self.tool_router)] +impl ServerHandler for AingleMcp { + fn get_info(&self) -> ServerInfo { + let mut info = ServerInfo::default(); + info.capabilities = ServerCapabilities::builder().enable_tools().build(); + info.instructions = Some( + "AIngle Córtex MCP server: tools for querying and mutating \ + AIngle semantic graphs." + .to_string(), + ); + info + } +} diff --git a/crates/aingle_cortex/src/rest/dag.rs b/crates/aingle_cortex/src/rest/dag.rs index 8d7ec161..eded0287 100644 --- a/crates/aingle_cortex/src/rest/dag.rs +++ b/crates/aingle_cortex/src/rest/dag.rs @@ -32,6 +32,7 @@ pub struct DagTipsResponse { } #[derive(Debug, Serialize)] +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] pub struct DagActionDto { pub hash: String, pub parents: Vec, @@ -65,6 +66,7 @@ pub struct ChainQuery { } #[derive(Debug, Deserialize)] +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] pub struct PruneRequest { /// "keep_all", "keep_since", "keep_last", or "keep_depth" pub policy: String, @@ -180,19 +182,7 @@ fn default_limit() -> usize { /// GET /api/v1/dag/tips pub async fn get_dag_tips(State(state): State) -> Result> { - let graph = state.graph.read().await; - let dag_store = graph - .dag_store() - .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; - - let tips = dag_store.tips().map_err(|e| Error::Internal(e.to_string()))?; - let tip_strings: Vec = tips.iter().map(|h| h.to_hex()).collect(); - let count = tip_strings.len(); - - Ok(Json(DagTipsResponse { - tips: tip_strings, - count, - })) + Ok(Json(crate::service::dag::tips(&state).await?)) } /// GET /api/v1/dag/action/:hash @@ -200,20 +190,7 @@ pub async fn get_dag_action( State(state): State, Path(hash): Path, ) -> Result> { - let action_hash = aingle_graph::dag::DagActionHash::from_hex(&hash) - .ok_or_else(|| Error::InvalidInput(format!("Invalid DAG action hash: {}", hash)))?; - - let graph = state.graph.read().await; - let dag_store = graph - .dag_store() - .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; - - let action = dag_store - .get(&action_hash) - .map_err(|e| Error::Internal(e.to_string()))? - .ok_or_else(|| Error::NotFound(format!("DAG action {} not found", hash)))?; - - Ok(Json(action_to_dto(&action))) + Ok(Json(crate::service::dag::action(&state, &hash).await?)) } /// GET /api/v1/dag/history?subject=X&triple_id=X&limit=N @@ -221,16 +198,14 @@ pub async fn get_dag_history( State(state): State, Query(query): Query, ) -> Result>> { - let graph = state.graph.read().await; - - // Subject-based lookup uses the dedicated subject index + // Subject-based lookup uses the dedicated subject index (shared service logic) if let Some(ref subject) = query.subject { - let actions = graph - .dag_history_by_subject(subject, query.limit) - .map_err(|e| Error::Internal(e.to_string()))?; - return Ok(Json(actions.iter().map(action_to_dto).collect())); + let actions = crate::service::dag::history_by_subject(&state, subject, query.limit).await?; + return Ok(Json(actions)); } + let graph = state.graph.read().await; + // Triple-ID-based lookup uses the affected index if let Some(ref tid_hex) = query.triple_id { let mut bytes = [0u8; 32]; @@ -258,34 +233,14 @@ pub async fn get_dag_chain( State(state): State, Query(query): Query, ) -> Result>> { - let author = aingle_graph::NodeId::named(&query.author); - - let graph = state.graph.read().await; - let dag_store = graph - .dag_store() - .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; - - let actions = dag_store - .chain(&author, query.limit) - .map_err(|e| Error::Internal(e.to_string()))?; - - Ok(Json(actions.iter().map(action_to_dto).collect())) + Ok(Json( + crate::service::dag::chain(&state, &query.author, query.limit).await?, + )) } /// GET /api/v1/dag/stats pub async fn get_dag_stats(State(state): State) -> Result> { - let graph = state.graph.read().await; - let dag_store = graph - .dag_store() - .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; - - let action_count = dag_store.action_count(); - let tip_count = dag_store.tip_count().map_err(|e| Error::Internal(e.to_string()))?; - - Ok(Json(DagStatsResponse { - action_count, - tip_count, - })) + Ok(Json(crate::service::dag::stats(&state).await?)) } /// POST /api/v1/dag/prune @@ -293,24 +248,7 @@ pub async fn post_dag_prune( State(state): State, Json(req): Json, ) -> Result> { - let policy = match req.policy.as_str() { - "keep_all" => aingle_graph::dag::RetentionPolicy::KeepAll, - "keep_since" => aingle_graph::dag::RetentionPolicy::KeepSince { seconds: req.value }, - "keep_last" => aingle_graph::dag::RetentionPolicy::KeepLast(req.value as usize), - "keep_depth" => aingle_graph::dag::RetentionPolicy::KeepDepth(req.value as usize), - other => return Err(Error::InvalidInput(format!("Unknown policy: {}", other))), - }; - - let graph = state.graph.read().await; - let result = graph - .dag_prune(&policy, req.create_checkpoint) - .map_err(|e| Error::Internal(e.to_string()))?; - - Ok(Json(PruneResponse { - pruned_count: result.pruned_count, - retained_count: result.retained_count, - checkpoint_hash: result.checkpoint_hash.map(|h| h.to_hex()), - })) + Ok(Json(crate::service::dag::prune(&state, req).await?)) } /// GET /api/v1/dag/export?format=dot|mermaid|json @@ -357,7 +295,9 @@ pub async fn get_dag_verify( let mut pk_bytes = [0u8; 32]; if query.public_key.len() != 64 { - return Err(Error::InvalidInput("public_key must be 64 hex chars".into())); + return Err(Error::InvalidInput( + "public_key must be 64 hex chars".into(), + )); } for i in 0..32 { pk_bytes[i] = u8::from_str_radix(&query.public_key[i * 2..i * 2 + 2], 16) @@ -468,7 +408,10 @@ pub async fn post_dag_pull( .dag_store() .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; - if dag_store.contains(&hash).map_err(|e| Error::Internal(e.to_string()))? { + if dag_store + .contains(&hash) + .map_err(|e| Error::Internal(e.to_string()))? + { already_had += 1; } else { graph @@ -571,7 +514,9 @@ pub async fn post_create_dag_action( .dag_store() .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; - let parents = dag_store.tips().map_err(|e| Error::Internal(e.to_string()))?; + let parents = dag_store + .tips() + .map_err(|e| Error::Internal(e.to_string()))?; let timestamp = chrono::Utc::now(); let mut action = aingle_graph::dag::DagAction { @@ -641,7 +586,7 @@ pub fn dag_router() -> Router { // Helpers // ============================================================================ -fn action_to_dto(action: &aingle_graph::dag::DagAction) -> DagActionDto { +pub(crate) fn action_to_dto(action: &aingle_graph::dag::DagAction) -> DagActionDto { let hash = action.compute_hash().to_hex(); let parents: Vec = action.parents.iter().map(|h| h.to_hex()).collect(); @@ -655,7 +600,10 @@ fn action_to_dto(action: &aingle_graph::dag::DagAction) -> DagActionDto { }; ("triple:create".to_string(), summary) } - aingle_graph::dag::DagPayload::TripleDelete { triple_ids, subjects } => { + aingle_graph::dag::DagPayload::TripleDelete { + triple_ids, + subjects, + } => { let summary = if !subjects.is_empty() { format!("{} triple(s) [{}]", triple_ids.len(), subjects.join(", ")) } else { @@ -675,10 +623,9 @@ fn action_to_dto(action: &aingle_graph::dag::DagAction) -> DagActionDto { }; ("memory:op".to_string(), summary) } - aingle_graph::dag::DagPayload::Batch { ops } => ( - "batch".to_string(), - format!("{} ops", ops.len()), - ), + aingle_graph::dag::DagPayload::Batch { ops } => { + ("batch".to_string(), format!("{} ops", ops.len())) + } aingle_graph::dag::DagPayload::Genesis { triple_count, description, @@ -692,7 +639,10 @@ fn action_to_dto(action: &aingle_graph::dag::DagAction) -> DagActionDto { ref policy, } => ( "compact".to_string(), - format!("pruned {} / retained {} ({})", pruned_count, retained_count, policy), + format!( + "pruned {} / retained {} ({})", + pruned_count, retained_count, policy + ), ), aingle_graph::dag::DagPayload::Noop => ("noop".to_string(), String::new()), aingle_graph::dag::DagPayload::Custom { diff --git a/crates/aingle_cortex/src/rest/mod.rs b/crates/aingle_cortex/src/rest/mod.rs index 36ab0339..b7a35229 100644 --- a/crates/aingle_cortex/src/rest/mod.rs +++ b/crates/aingle_cortex/src/rest/mod.rs @@ -35,8 +35,6 @@ pub mod audit; pub mod cluster; #[cfg(feature = "cluster")] pub(crate) mod cluster_utils; -#[cfg(feature = "cluster")] -pub mod raft_rpc; #[cfg(feature = "dag")] pub mod dag; mod memory; @@ -46,22 +44,32 @@ mod p2p; mod proof; mod proof_api; mod query; +#[cfg(feature = "cluster")] +pub mod raft_rpc; mod reputation; -mod skill_verification; +pub mod skill_verification; mod stats; mod triples; // Re-export from proof (legacy validation endpoints) pub use proof::{ - ProofDto, ProofStepDto, StatementInput, ValidateRequest, ValidateResponse, ValidateTripleInput, - ValidationMessage, VerificationDetails, VerifyProofRequest, + ProofDto, ProofStepDto, StatementInput, TripleValidationResult, ValidateRequest, + ValidateResponse, ValidateTripleInput, ValidationMessage, VerificationDetails, + VerifyProofRequest, +}; + +// Re-export from reputation (agent consistency + batch assertion verification). +// Shared with the service layer and MCP tools. +pub use reputation::{ + AgentConsistencyRequest, AssertionRef, AssertionVerifyResult, BatchVerifyAssertionsRequest, + BatchVerifyAssertionsResponse, ConsistencyResponse, }; // Re-export from proof_api (ZK proof storage endpoints) pub use proof_api::{ BatchSubmitRequest, BatchSubmitResponse, BatchVerifyRequest, BatchVerifyResponse, - DeleteProofResponse, ListProofsQuery, ListProofsResponse, ProofResponse, ProofStatsResponse, - SubmitProofResponse, + DeleteProofResponse, GetProofRequest, ListProofsQuery, ListProofsResponse, ProofResponse, + ProofStatsResponse, SubmitProofResponse, VerifyProofByIdRequest, VerifyProofResponse, }; // Re-export from other modules @@ -69,6 +77,13 @@ pub use query::*; pub use stats::*; pub use triples::*; +// Re-export skill verification request/response types (shared with the service +// layer and MCP tools). +pub use skill_verification::{ + AssertionDecl, CreateSandboxRequest, CreateSandboxResponse, DeleteSandboxRequest, + DeleteSandboxResponse, ValidateManifestRequest, ValidateManifestResponse, +}; + use crate::state::AppState; use axum::{ routing::{delete, get, post}, @@ -81,10 +96,7 @@ pub fn router() -> Router { // Triple CRUD .route("/api/v1/triples", post(triples::create_triple)) .route("/api/v1/triples", get(triples::list_triples)) - .route( - "/api/v1/triples/batch", - post(triples::batch_insert_triples), - ) + .route("/api/v1/triples/batch", post(triples::batch_insert_triples)) .route("/api/v1/triples/{id}", get(triples::get_triple)) .route("/api/v1/triples/{id}", delete(triples::delete_triple)) // Query endpoints diff --git a/crates/aingle_cortex/src/rest/proof.rs b/crates/aingle_cortex/src/rest/proof.rs index 3dbfb906..17b06a20 100644 --- a/crates/aingle_cortex/src/rest/proof.rs +++ b/crates/aingle_cortex/src/rest/proof.rs @@ -16,6 +16,7 @@ use crate::state::{AppState, Event}; use aingle_graph::{NodeId, Predicate, Triple, Value}; /// Request to validate triples +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Deserialize)] pub struct ValidateRequest { /// Triples to validate @@ -25,6 +26,7 @@ pub struct ValidateRequest { } /// Triple input for validation +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Deserialize)] pub struct ValidateTripleInput { pub subject: String, diff --git a/crates/aingle_cortex/src/rest/proof_api.rs b/crates/aingle_cortex/src/rest/proof_api.rs index 73cf56b8..3b19f402 100644 --- a/crates/aingle_cortex/src/rest/proof_api.rs +++ b/crates/aingle_cortex/src/rest/proof_api.rs @@ -87,49 +87,31 @@ pub async fn submit_proofs_batch( /// Get a proof by ID /// /// GET /api/v1/proofs/:id +/// +/// Delegates to [`crate::service::proof::get_proof`]; the not-found behavior +/// (`Err(Error::NotFound)` for a missing proof) lives in the service layer so it +/// can be shared with the MCP `aingle_get_proof` tool. pub async fn get_proof( State(state): State, Path(proof_id): Path, ) -> Result> { - let proof = state - .proof_store - .get(&proof_id) - .await - .ok_or_else(|| Error::NotFound(format!("Proof {} not found", proof_id)))?; - - Ok(Json(ProofResponse::from(proof))) + let resp = crate::service::proof::get_proof(&state, GetProofRequest { proof_id }).await?; + Ok(Json(resp)) } /// Verify a proof /// /// GET /api/v1/proofs/:id/verify +/// +/// Delegates to [`crate::service::proof::verify_proof`]; the invalid-proof -> +/// 200 + `valid:false` contract (commit 53cca2c) lives in the service layer. pub async fn verify_proof_by_id( State(state): State, Path(proof_id): Path, ) -> Result> { - match state.proof_store.verify(&proof_id).await { - Ok(result) => Ok(Json(VerifyProofResponse { - proof_id: proof_id.clone(), - valid: result.valid, - verified_at: result.verified_at, - details: result.details, - verification_time_us: result.verification_time_us, - })), - Err(crate::proofs::VerificationError::ProofNotFound(_)) => { - Err(Error::NotFound(format!("Proof {} not found", proof_id))) - } - Err(e) => { - // Verification infrastructure error (bad proof data format, ZK error, etc.) - // Return 200 with valid=false + error details instead of 422 - Ok(Json(VerifyProofResponse { - proof_id: proof_id.clone(), - valid: false, - verified_at: chrono::Utc::now(), - details: vec![format!("Verification error: {}", e)], - verification_time_us: 0, - })) - } - } + let resp = + crate::service::proof::verify_proof(&state, VerifyProofByIdRequest { proof_id }).await?; + Ok(Json(resp)) } /// Batch verify multiple proofs @@ -328,6 +310,29 @@ impl From for ProofResponse { } } +/// Request to verify a stored proof by its ID. +/// +/// Tool/handler INPUT: the path parameter of `GET /api/v1/proofs/:id/verify` +/// modeled as a struct so it can be shared with the MCP `aingle_verify_proof` +/// tool. +#[derive(Debug, Deserialize)] +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] +pub struct VerifyProofByIdRequest { + /// Identifier of the stored proof to verify. + pub proof_id: ProofId, +} + +/// Request to fetch a stored proof by its ID. +/// +/// Tool/handler INPUT: the path parameter of `GET /api/v1/proofs/:id` modeled as +/// a struct so it can be shared with the MCP `aingle_get_proof` tool. +#[derive(Debug, Deserialize)] +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] +pub struct GetProofRequest { + /// Identifier of the stored proof to fetch. + pub proof_id: ProofId, +} + #[derive(Debug, Serialize)] pub struct VerifyProofResponse { pub proof_id: ProofId, @@ -436,7 +441,9 @@ mod tests { limit: Some(10), }; - let response = list_proofs(AxumState(state), None, Query(query)).await.unwrap(); + let response = list_proofs(AxumState(state), None, Query(query)) + .await + .unwrap(); assert_eq!(response.0.count, 3); } diff --git a/crates/aingle_cortex/src/rest/query.rs b/crates/aingle_cortex/src/rest/query.rs index 09504b94..5bbcb8a5 100644 --- a/crates/aingle_cortex/src/rest/query.rs +++ b/crates/aingle_cortex/src/rest/query.rs @@ -10,12 +10,12 @@ use axum::{ use serde::{Deserialize, Serialize}; use crate::error::Result; -use crate::middleware::{is_in_namespace, RequestNamespace}; +use crate::middleware::RequestNamespace; use crate::rest::triples::{TripleDto, ValueDto}; use crate::state::AppState; -use aingle_graph::{NodeId, Predicate, Triple, TriplePattern, Value}; /// Pattern query request +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Deserialize)] pub struct PatternQueryRequest { /// Subject pattern (None = wildcard) @@ -33,10 +33,8 @@ fn default_limit() -> usize { 100 } -/// Hard maximum for any query to prevent OOM on large graphs -const MAX_QUERY_LIMIT: usize = 10_000; - /// Pattern query response +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Serialize)] pub struct PatternQueryResponse { /// Matching triples @@ -48,6 +46,7 @@ pub struct PatternQueryResponse { } /// Description of the query pattern +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Serialize)] pub struct PatternDescription { pub subject: Option, @@ -63,58 +62,14 @@ pub async fn query_pattern( ns_ext: Option>, Json(req): Json, ) -> Result> { - let graph = state.graph.read().await; - - // Build pattern from request - let mut pattern = TriplePattern::any(); - - if let Some(ref subject) = req.subject { - pattern = pattern.with_subject(NodeId::named(subject)); - } - if let Some(ref predicate) = req.predicate { - pattern = pattern.with_predicate(Predicate::named(predicate)); - } - if let Some(ref object) = req.object { - let obj: Value = object.clone().into(); - pattern = pattern.with_object(obj); - } - - let triples = graph.find(pattern)?; - - // Enforce hard query limit to prevent OOM - let effective_limit = req.limit.min(MAX_QUERY_LIMIT); - - // Filter by namespace if present - let ns_filter = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); - let triples: Vec = if let Some(ref ns) = ns_filter { - triples.into_iter().filter(|t| is_in_namespace(&t.subject.to_string(), ns)).collect() - } else { - triples - }; - - let total = triples.len(); - let matches: Vec = triples - .into_iter() - .take(effective_limit) - .map(|t| t.into()) - .collect(); - - let pattern_desc = PatternDescription { - subject: req.subject, - predicate: req.predicate, - object: req - .object - .map(|o| serde_json::to_value(o).unwrap_or_default()), - }; - - Ok(Json(PatternQueryResponse { - matches, - total, - pattern: pattern_desc, - })) + let namespace = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); + Ok(Json( + crate::service::query::query_pattern(&state, req, namespace).await?, + )) } /// Query parameters for listing subjects +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Deserialize)] pub struct ListSubjectsQuery { /// Filter by predicate @@ -132,31 +87,14 @@ pub async fn list_subjects( ns_ext: Option>, Query(query): Query, ) -> Result> { - let graph = state.graph.read().await; - - let pattern = if let Some(ref predicate) = query.predicate { - TriplePattern::predicate(Predicate::named(predicate)) - } else { - TriplePattern::any() - }; - - let triples = graph.find(pattern)?; - let ns_filter = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); - let mut subjects: Vec = triples - .into_iter() - .map(|t| t.subject.to_string()) - .filter(|s| ns_filter.as_ref().map_or(true, |ns| is_in_namespace(s, ns))) - .collect(); - subjects.sort(); - subjects.dedup(); - - let total = subjects.len(); - let subjects: Vec = subjects.into_iter().take(query.limit).collect(); - - Ok(Json(ListSubjectsResponse { subjects, total })) + let namespace = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); + Ok(Json( + crate::service::query::list_subjects(&state, query, namespace).await?, + )) } /// Response for listing subjects +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Serialize)] pub struct ListSubjectsResponse { pub subjects: Vec, @@ -164,6 +102,7 @@ pub struct ListSubjectsResponse { } /// Query parameters for listing predicates +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Deserialize)] pub struct ListPredicatesQuery { /// Filter by subject @@ -181,31 +120,14 @@ pub async fn list_predicates( ns_ext: Option>, Query(query): Query, ) -> Result> { - let graph = state.graph.read().await; - - let pattern = if let Some(ref subject) = query.subject { - TriplePattern::subject(NodeId::named(subject)) - } else { - TriplePattern::any() - }; - - let triples = graph.find(pattern)?; - let ns_filter = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); - let mut predicates: Vec = triples - .into_iter() - .filter(|t| ns_filter.as_ref().map_or(true, |ns| is_in_namespace(&t.subject.to_string(), ns))) - .map(|t| t.predicate.to_string()) - .collect(); - predicates.sort(); - predicates.dedup(); - - let total = predicates.len(); - let predicates: Vec = predicates.into_iter().take(query.limit).collect(); - - Ok(Json(ListPredicatesResponse { predicates, total })) + let namespace = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); + Ok(Json( + crate::service::query::list_predicates(&state, query, namespace).await?, + )) } /// Response for listing predicates +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Serialize)] pub struct ListPredicatesResponse { pub predicates: Vec, diff --git a/crates/aingle_cortex/src/rest/reputation.rs b/crates/aingle_cortex/src/rest/reputation.rs index badc43d4..0f708d7c 100644 --- a/crates/aingle_cortex/src/rest/reputation.rs +++ b/crates/aingle_cortex/src/rest/reputation.rs @@ -6,9 +6,8 @@ //! Provides agent consistency scoring and batch assertion verification //! for the skill reputation system. -use crate::middleware::{is_in_namespace, RequestNamespace}; +use crate::middleware::RequestNamespace; use crate::state::AppState; -use aingle_graph::{NodeId, Value}; use axum::{ extract::{Path, State}, response::IntoResponse, @@ -20,6 +19,17 @@ use serde::{Deserialize, Serialize}; // DTOs // --------------------------------------------------------------------------- +/// Request identifying an agent whose consistency score to compute. +/// +/// Used as the MCP input for the agent-consistency tool. (REST extracts the +/// agent id from the path, so this struct is MCP-only.) +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] +#[derive(Deserialize, Debug)] +pub struct AgentConsistencyRequest { + /// The agent id whose assertion consistency to score. + pub agent_id: String, +} + /// Agent consistency score response. #[derive(Serialize, Debug)] pub struct ConsistencyResponse { @@ -32,6 +42,7 @@ pub struct ConsistencyResponse { } /// Request to batch-verify assertions. +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Deserialize, Debug)] pub struct BatchVerifyAssertionsRequest { /// Assertions to verify. @@ -39,6 +50,7 @@ pub struct BatchVerifyAssertionsRequest { } /// Reference to an assertion to verify. +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Deserialize, Debug)] pub struct AssertionRef { /// Subject of the assertion. @@ -78,88 +90,12 @@ pub async fn get_agent_consistency( ns_ext: Option>, Path(agent_id): Path, ) -> impl IntoResponse { - // Determine namespace prefix for agent node - let ns_prefix = ns_ext - .as_ref() - .and_then(|axum::Extension(RequestNamespace(ns))| ns.clone()) - .unwrap_or_else(|| "mayros".to_string()); - - // Phase 1: collect all triples we need from the graph, then drop the lock. - let (owned_subject_triples, prefixed_triples) = { - let graph = state.graph.read().await; - - let agent_node = Value::node(NodeId::named(format!("{}:agent:{}", ns_prefix, agent_id))); - - // Collect owned triples (assertedBy / ownedBy) and their subject triples. - let mut owned = Vec::new(); - if let Ok(triples) = graph.get_object(&agent_node) { - for triple in &triples { - let pred_str = triple.predicate.as_str(); - if pred_str.ends_with(":assertedBy") || pred_str.ends_with(":ownedBy") { - let subject_triples = graph.get_subject(&triple.subject).unwrap_or_default(); - owned.push(subject_triples); - } - } - } - - // Collect agent-prefixed assertion triples. - let agent_prefix = format!("{}:agent:{}:", ns_prefix, agent_id); - let mut prefixed = Vec::new(); - if let Ok(prefixed_subjects) = graph.subjects_with_prefix(&agent_prefix) { - for subj in &prefixed_subjects { - if let Ok(subj_triples) = graph.get_subject(subj) { - let filtered: Vec<_> = subj_triples - .into_iter() - .filter(|t| { - let p = t.predicate.as_str(); - !p.ends_with(":assertedBy") && !p.ends_with(":ownedBy") - }) - .collect(); - prefixed.push(filtered); - } - } - } - - (owned, prefixed) - // graph lock dropped here - }; - - // Phase 2: validate with the logic engine (separate lock). - let logic = state.logic.read().await; + // Determine namespace prefix for agent node. + let namespace = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); - let mut total: usize = 0; - let mut verified: usize = 0; - - for subject_triples in &owned_subject_triples { - total += 1; - let any_valid = subject_triples.iter().any(|t| logic.validate(t).is_valid); - if any_valid { - verified += 1; - } - } - - for triples in &prefixed_triples { - for t in triples { - total += 1; - if logic.validate(t).is_valid { - verified += 1; - } - } - } - - drop(logic); - - let score = if total > 0 { - verified as f64 / total as f64 - } else { - 0.0 - }; - - Json(ConsistencyResponse { - score, - total, - verified, - }) + // Delegate the shared scoring logic (graph + logic engine read-only). + let resp = crate::service::reputation::agent_consistency(&state, &agent_id, namespace).await; + Json(resp) } /// POST /api/v1/assertions/verify-batch — Batch verify assertion proofs. @@ -171,54 +107,12 @@ pub async fn batch_verify_assertions( ns_ext: Option>, Json(req): Json, ) -> impl IntoResponse { - // Extract namespace for filtering - let ns_filter = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); - - // Phase 1: collect matching triples from the graph, then drop the lock. - let assertion_triples: Vec<_> = { - let graph = state.graph.read().await; - - req.assertions - .iter() - .map(|assertion| { - if let Some(ref ns) = ns_filter { - if !is_in_namespace(&assertion.subject, ns) { - return None; - } - } - let subj = NodeId::named(&assertion.subject); - let triples = graph.get_subject(&subj).unwrap_or_default(); - triples - .into_iter() - .find(|t| t.predicate.as_str() == assertion.predicate) - }) - .collect() - // graph lock dropped here - }; - - // Phase 2: validate with the logic engine (separate lock). - let logic = state.logic.read().await; - - let results: Vec = req - .assertions - .iter() - .zip(assertion_triples.iter()) - .map(|(assertion, maybe_triple)| { - let verified = maybe_triple - .as_ref() - .map(|t| logic.validate(t).is_valid) - .unwrap_or(false); - AssertionVerifyResult { - subject: assertion.subject.clone(), - predicate: assertion.predicate.clone(), - verified, - } - }) - .collect(); - - drop(logic); + // Extract namespace for filtering. + let namespace = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); - Json(BatchVerifyAssertionsResponse { results }) + // Delegate the shared verification logic (graph + logic engine read-only). + let resp = crate::service::reputation::batch_verify_assertions(&state, req, namespace).await; + Json(resp) } /// Create the reputation sub-router. diff --git a/crates/aingle_cortex/src/rest/skill_verification.rs b/crates/aingle_cortex/src/rest/skill_verification.rs index db824bc0..01e2b7d3 100644 --- a/crates/aingle_cortex/src/rest/skill_verification.rs +++ b/crates/aingle_cortex/src/rest/skill_verification.rs @@ -7,7 +7,6 @@ //! and cleanup for the Apilium Hub verification pipeline. use crate::state::AppState; -use aingle_graph::{NodeId, Predicate, Triple, Value}; use axum::{ extract::{Path, State}, http::StatusCode, @@ -21,6 +20,7 @@ use serde::{Deserialize, Serialize}; // --------------------------------------------------------------------------- /// Request to validate a semantic skill manifest against PoL rules. +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Deserialize, Debug)] pub struct ValidateManifestRequest { /// Assertions declared in the skill manifest. @@ -30,6 +30,7 @@ pub struct ValidateManifestRequest { } /// A declared assertion in the skill manifest. +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Deserialize, Debug)] pub struct AssertionDecl { /// The predicate this assertion targets. @@ -40,6 +41,7 @@ pub struct AssertionDecl { } /// Response from manifest validation. +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Serialize, Debug)] pub struct ValidateManifestResponse { /// Whether all assertions are valid. @@ -49,6 +51,7 @@ pub struct ValidateManifestResponse { } /// Request to create a sandbox namespace. +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Deserialize, Debug)] pub struct CreateSandboxRequest { /// Desired namespace for the sandbox. @@ -63,6 +66,7 @@ fn default_ttl() -> u64 { } /// Response from sandbox creation. +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Serialize, Debug)] pub struct CreateSandboxResponse { /// Sandbox identifier. @@ -71,6 +75,34 @@ pub struct CreateSandboxResponse { pub namespace: String, } +/// Request identifying a sandbox by id. +/// +/// Used as the MCP input for the sandbox-delete tool. (REST extracts the id +/// from the path, so this struct is MCP-only.) +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] +#[derive(Deserialize, Debug)] +pub struct DeleteSandboxRequest { + /// The sandbox identifier to delete. + pub id: String, +} + +/// Response from sandbox deletion. +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] +#[derive(Serialize, Debug)] +pub struct DeleteSandboxResponse { + /// Whether the sandbox was found and removed. + pub deleted: bool, + /// The namespace that was cleaned up (present only when deleted). + #[serde(skip_serializing_if = "Option::is_none")] + pub namespace: Option, + /// Number of triples removed (present only when deleted). + #[serde(skip_serializing_if = "Option::is_none")] + pub triples_removed: Option, + /// Error message (present only when not deleted). + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + // --------------------------------------------------------------------------- // Handlers // --------------------------------------------------------------------------- @@ -83,39 +115,7 @@ pub async fn validate_manifest( State(state): State, Json(req): Json, ) -> impl IntoResponse { - let logic = state.logic.read().await; - let mut errors: Vec = Vec::new(); - - for assertion in &req.assertions { - let ns_pred = if assertion.predicate.contains(':') { - assertion.predicate.clone() - } else { - format!("{}:{}", req.namespace, assertion.predicate) - }; - - // If require_proof is true, validate the assertion against the - // logic engine by constructing a test triple and checking for - // rejections. This ensures PoL rules exist for this predicate. - if assertion.require_proof { - let test_triple = Triple::new( - NodeId::named(format!("{}:_test", req.namespace)), - Predicate::named(&ns_pred), - Value::literal("_test_value"), - ); - let result = logic.validate(&test_triple); - // If the engine has no matching rules at all, the result - // will have zero matches — warn the author. - if result.matches.is_empty() { - errors.push(format!( - "Assertion predicate '{}' requires proof but no PoL rules found", - ns_pred - )); - } - } - } - - let valid = errors.is_empty(); - Json(ValidateManifestResponse { valid, errors }) + Json(crate::service::skill::validate_manifest(&state, req).await) } /// POST /api/v1/skills/sandbox — Create a temporary sandbox namespace. @@ -126,22 +126,8 @@ pub async fn create_sandbox( State(state): State, Json(req): Json, ) -> impl IntoResponse { - let sandbox_id = format!("sandbox-{}", uuid::Uuid::new_v4()); - let sandbox_ns = format!("{}:{}", req.namespace, sandbox_id); - - // Register the sandbox in the manager - state - .sandbox_manager - .create(sandbox_id.clone(), sandbox_ns.clone(), req.ttl_seconds) - .await; - - ( - StatusCode::CREATED, - Json(CreateSandboxResponse { - id: sandbox_id, - namespace: sandbox_ns, - }), - ) + let resp = crate::service::skill::create_sandbox(&state, req).await; + (StatusCode::CREATED, Json(resp)) } /// DELETE /api/v1/skills/sandbox/:id — Clean up a sandbox namespace. @@ -151,24 +137,7 @@ pub async fn delete_sandbox( State(state): State, Path(sandbox_id): Path, ) -> impl IntoResponse { - let removed = state.sandbox_manager.remove(&sandbox_id).await; - - if let Some(namespace) = removed { - // Clean up all triples whose subject starts with the sandbox namespace. - let graph = state.graph.write().await; - let deleted = graph.delete_by_subject_prefix(&namespace).unwrap_or(0); - - Json(serde_json::json!({ - "deleted": true, - "namespace": namespace, - "triples_removed": deleted - })) - } else { - Json(serde_json::json!({ - "deleted": false, - "error": "sandbox not found" - })) - } + Json(crate::service::skill::delete_sandbox(&state, &sandbox_id).await) } /// Create the skill verification sub-router. diff --git a/crates/aingle_cortex/src/rest/stats.rs b/crates/aingle_cortex/src/rest/stats.rs index 08074dd3..681d038c 100644 --- a/crates/aingle_cortex/src/rest/stats.rs +++ b/crates/aingle_cortex/src/rest/stats.rs @@ -10,6 +10,7 @@ use crate::error::Result; use crate::state::AppState; /// Graph statistics response +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Serialize)] pub struct StatsResponse { /// Graph statistics @@ -19,6 +20,7 @@ pub struct StatsResponse { } /// Graph statistics DTO +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Serialize)] pub struct GraphStatsDto { /// Total number of triples @@ -32,6 +34,7 @@ pub struct GraphStatsDto { } /// Server statistics DTO +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Serialize)] pub struct ServerStatsDto { /// Number of connected WebSocket clients @@ -46,21 +49,7 @@ pub struct ServerStatsDto { /// /// GET /api/v1/stats pub async fn get_stats(State(state): State) -> Result> { - let stats = state.stats().await; - - Ok(Json(StatsResponse { - graph: GraphStatsDto { - triple_count: stats.triple_count, - subject_count: stats.subject_count, - predicate_count: stats.predicate_count, - object_count: stats.object_count, - }, - server: ServerStatsDto { - connected_clients: stats.connected_clients, - uptime_seconds: 0, // TODO: track actual uptime - version: env!("CARGO_PKG_VERSION").to_string(), - }, - })) + Ok(Json(crate::service::stats::graph_stats(&state).await?)) } /// Health check response diff --git a/crates/aingle_cortex/src/rest/triples.rs b/crates/aingle_cortex/src/rest/triples.rs index 99d73e9d..bb2f0509 100644 --- a/crates/aingle_cortex/src/rest/triples.rs +++ b/crates/aingle_cortex/src/rest/triples.rs @@ -12,14 +12,23 @@ use serde::{Deserialize, Serialize}; use crate::error::{Error, Result}; use crate::middleware::{is_in_namespace, RequestNamespace}; +use crate::state::AppState; +use aingle_graph::{NodeId, Triple, TripleId, Value}; + +// `AuditEntry` and `Event` are only referenced from the DAG/cluster write paths +// below; the non-cluster direct-write path delegates those side-effects to the +// service layer. Gate the imports so the `rest`-only (no dag/cluster) build is +// warning-free. +#[cfg(any(feature = "dag", feature = "cluster"))] use crate::rest::audit::AuditEntry; -use crate::state::{AppState, Event}; -use aingle_graph::{NodeId, Predicate, Triple, TripleId, TriplePattern, Value}; +#[cfg(any(feature = "dag", feature = "cluster"))] +use crate::state::Event; #[cfg(feature = "cluster")] use axum::http::HeaderMap; /// Triple data transfer object +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TripleDto { /// Triple hash (read-only) @@ -37,6 +46,7 @@ pub struct TripleDto { } /// Value data transfer object +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] pub enum ValueDto { @@ -97,6 +107,7 @@ impl From for TripleDto { } /// Request to create a triple +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Deserialize)] pub struct CreateTripleRequest { pub subject: String, @@ -104,7 +115,19 @@ pub struct CreateTripleRequest { pub object: ValueDto, } +/// Request identifying a single triple by its hex hash id. +/// +/// Used as the MCP input for the get/delete triple tools. (REST extracts the id +/// from the path, so this struct is MCP-only.) +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] +#[derive(Debug, Deserialize)] +pub struct TripleIdRequest { + /// The triple's hex hash id. + pub id: String, +} + /// Query parameters for listing triples +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Deserialize)] pub struct ListTriplesQuery { /// Filter by subject @@ -151,18 +174,12 @@ pub async fn create_triple( } } - let object: Value = req.object.clone().into(); - // DAG + Cluster mode: create DagAction and route through Raft #[cfg(feature = "dag")] if let Some(ref raft) = state.raft { - let dag_author = state - .dag_author - .clone() - .unwrap_or_else(|| aingle_graph::NodeId::named(&format!( - "node:{}", - state.cluster_node_id.unwrap_or(0) - ))); + let dag_author = state.dag_author.clone().unwrap_or_else(|| { + aingle_graph::NodeId::named(&format!("node:{}", state.cluster_node_id.unwrap_or(0))) + }); let dag_seq = state .dag_seq_counter .fetch_add(1, std::sync::atomic::Ordering::SeqCst); @@ -170,9 +187,7 @@ pub async fn create_triple( // Get current tips let parents = { let graph = state.graph.read().await; - graph - .dag_tips() - .unwrap_or_default() + graph.dag_tips().unwrap_or_default() }; let mut action = aingle_graph::dag::DagAction { @@ -323,100 +338,51 @@ pub async fn create_triple( // Reaching here means Raft was skipped — prevent split-brain (#2). #[cfg(feature = "cluster")] if state.raft.is_some() { - return Err(Error::Internal("Raft initialized but write not routed through Raft".into())); + return Err(Error::Internal( + "Raft initialized but write not routed through Raft".into(), + )); } - // Non-cluster mode: direct write - // Create the triple - let triple = Triple::new( - NodeId::named(&req.subject), - Predicate::named(&req.predicate), - object, - ); - - // Add triple to graph (and record DAG action if enabled) - let triple_id = { - let graph = state.graph.read().await; - let id = graph.insert(triple.clone())?; - - // Record in DAG if enabled - #[cfg(feature = "dag")] - if let Some(dag_store) = graph.dag_store() { - let dag_author = state - .dag_author - .clone() - .unwrap_or_else(|| aingle_graph::NodeId::named("node:local")); - let dag_seq = state - .dag_seq_counter - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let parents = dag_store.tips().unwrap_or_default(); - - let mut action = aingle_graph::dag::DagAction { - parents, - author: dag_author, - seq: dag_seq, - timestamp: chrono::Utc::now(), - payload: aingle_graph::dag::DagPayload::TripleInsert { - triples: vec![aingle_graph::dag::TripleInsertPayload { - subject: req.subject.clone(), - predicate: req.predicate.clone(), - object: serde_json::to_value(&req.object).unwrap_or_default(), - }], - }, - signature: None, - }; - - if let Some(ref key) = state.dag_signing_key { - key.sign(&mut action); - } + // Non-cluster mode: direct write. + // Delegate the shared insert + audit + event side-effects to the service + // layer; the cluster-only WAL replication below remains a transport concern. + let namespace = ns_ext + .as_ref() + .and_then(|axum::Extension(RequestNamespace(ns))| ns.clone()); - dag_store.put(&action).map_err(|e| { - Error::Internal(format!( - "DAG action failed for triple insert — data integrity at risk: {e}" - )) - })?; - } + // Capture data needed for the legacy WAL append before the request is moved. + #[cfg(feature = "cluster")] + let wal_payload = ( + req.subject.clone(), + req.predicate.clone(), + serde_json::to_value(&req.object).unwrap_or_default(), + ); - id - }; + let dto = crate::service::triples::create_triple(&state, req, namespace).await?; - // Append to WAL (cluster mode without Raft — legacy path) + // Append to WAL (cluster mode without Raft — legacy path). + // NOTE: ordering — the service call above has already performed the graph + // insert, recorded the audit entry, and broadcast the `TripleAdded` event. + // A WAL-append failure here therefore happens *after* those side-effects and + // cannot roll them back; the event was already observed by subscribers. #[cfg(feature = "cluster")] if let Some(ref wal) = state.wal { + let triple_id = dto + .id + .as_deref() + .and_then(TripleId::from_hex) + .ok_or_else(|| Error::Internal("Created triple is missing its ID".into()))?; + let (subject, predicate, object) = wal_payload; wal.append(aingle_wal::WalEntryKind::TripleInsert { - subject: req.subject.clone(), - predicate: req.predicate.clone(), - object: serde_json::to_value(&req.object).unwrap_or_default(), + subject, + predicate, + object, triple_id: *triple_id.as_bytes(), - }).map_err(|e| Error::Internal(format!("WAL append failed: {e}")))?; - } - - // Record audit entry - { - let namespace = ns_ext - .as_ref() - .and_then(|axum::Extension(RequestNamespace(ns))| ns.clone()); - let mut audit = state.audit_log.write().await; - audit.record(AuditEntry { - timestamp: chrono::Utc::now().to_rfc3339(), - user_id: namespace.clone().unwrap_or_else(|| "anonymous".to_string()), - namespace, - action: "create".to_string(), - resource: format!("/api/v1/triples/{}", triple_id.to_hex()), - details: Some(format!("subject={}", req.subject)), - request_id: None, - }); + }) + .map_err(|e| Error::Internal(format!("WAL append failed: {e}")))?; } - // Broadcast event - state.broadcaster.broadcast(Event::TripleAdded { - hash: triple_id.to_hex(), - subject: req.subject, - predicate: req.predicate, - object: serde_json::to_value(&req.object).unwrap_or_default(), - }); - - Ok((StatusCode::CREATED, Json(triple.into()))) + Ok((StatusCode::CREATED, Json(dto))) } /// Parse X-Consistency header into a ConsistencyLevel. @@ -458,15 +424,8 @@ pub async fn get_triple( } } - let triple_id = TripleId::from_hex(&id) - .ok_or_else(|| Error::InvalidInput(format!("Invalid triple ID: {}", id)))?; - - let graph = state.graph.read().await; - let triple = graph - .get(&triple_id)? - .ok_or_else(|| Error::NotFound(format!("Triple {} not found", id)))?; - - Ok(Json(triple.into())) + let dto = crate::service::triples::get_triple(&state, &id).await?; + Ok(Json(dto)) } /// Delete a triple @@ -496,13 +455,9 @@ pub async fn delete_triple( // DAG + Cluster mode: create DagAction for delete #[cfg(feature = "dag")] if let Some(ref raft) = state.raft { - let dag_author = state - .dag_author - .clone() - .unwrap_or_else(|| aingle_graph::NodeId::named(&format!( - "node:{}", - state.cluster_node_id.unwrap_or(0) - ))); + let dag_author = state.dag_author.clone().unwrap_or_else(|| { + aingle_graph::NodeId::named(&format!("node:{}", state.cluster_node_id.unwrap_or(0))) + }); let dag_seq = state .dag_seq_counter .fetch_add(1, std::sync::atomic::Ordering::SeqCst); @@ -591,96 +546,33 @@ pub async fn delete_triple( // Guard: if Raft is initialized, all writes MUST go through Raft (#2). #[cfg(feature = "cluster")] if state.raft.is_some() { - return Err(Error::Internal("Raft initialized but write not routed through Raft".into())); + return Err(Error::Internal( + "Raft initialized but write not routed through Raft".into(), + )); } - // Non-cluster mode: direct delete - let deleted = { - let graph = state.graph.read().await; - - // Look up subject before deleting (for DAG indexing) - #[cfg(feature = "dag")] - let subject_for_dag = graph - .get(&triple_id) - .ok() - .flatten() - .map(|t| t.subject.to_string()); - - let deleted = graph.delete(&triple_id)?; - - // Record in DAG if enabled and deletion succeeded - #[cfg(feature = "dag")] - if deleted { - if let Some(dag_store) = graph.dag_store() { - let dag_author = state - .dag_author - .clone() - .unwrap_or_else(|| aingle_graph::NodeId::named("node:local")); - let dag_seq = state - .dag_seq_counter - .fetch_add(1, std::sync::atomic::Ordering::SeqCst); - let parents = dag_store.tips().unwrap_or_default(); - - let mut action = aingle_graph::dag::DagAction { - parents, - author: dag_author, - seq: dag_seq, - timestamp: chrono::Utc::now(), - payload: aingle_graph::dag::DagPayload::TripleDelete { - triple_ids: vec![*triple_id.as_bytes()], - subjects: subject_for_dag.into_iter().collect(), - }, - signature: None, - }; - - if let Some(ref key) = state.dag_signing_key { - key.sign(&mut action); - } - - dag_store.put(&action).map_err(|e| { - Error::Internal(format!( - "DAG action failed for triple delete — data integrity at risk: {e}" - )) - })?; - } - } - - deleted - }; + // Non-cluster mode: direct delete. + // Delegate the shared delete + DAG action + audit + event side-effects to the + // service layer; the cluster-only WAL replication below remains a transport + // concern. + let namespace = ns_ext + .as_ref() + .and_then(|axum::Extension(RequestNamespace(ns))| ns.clone()); - if deleted { - // Append to WAL (legacy cluster path) - #[cfg(feature = "cluster")] - if let Some(ref wal) = state.wal { - wal.append(aingle_wal::WalEntryKind::TripleDelete { - triple_id: *triple_id.as_bytes(), - }).map_err(|e| Error::Internal(format!("WAL append failed: {e}")))?; - } + crate::service::triples::delete_triple(&state, &id, namespace).await?; - // Record audit entry - { - let namespace = ns_ext - .as_ref() - .and_then(|axum::Extension(RequestNamespace(ns))| ns.clone()); - let mut audit = state.audit_log.write().await; - audit.record(AuditEntry { - timestamp: chrono::Utc::now().to_rfc3339(), - user_id: namespace.clone().unwrap_or_else(|| "anonymous".to_string()), - namespace, - action: "delete".to_string(), - resource: format!("/api/v1/triples/{}", id), - details: None, - request_id: None, - }); - } - - state - .broadcaster - .broadcast(Event::TripleDeleted { hash: id }); - Ok(StatusCode::NO_CONTENT) - } else { - Err(Error::NotFound(format!("Triple {} not found", id))) + // Append to WAL (legacy cluster path). The service call above already + // performed the graph delete and side-effects; a WAL failure here happens + // after those and cannot roll them back. + #[cfg(feature = "cluster")] + if let Some(ref wal) = state.wal { + wal.append(aingle_wal::WalEntryKind::TripleDelete { + triple_id: *triple_id.as_bytes(), + }) + .map_err(|e| Error::Internal(format!("WAL append failed: {e}")))?; } + + Ok(StatusCode::NO_CONTENT) } /// List triples with filters @@ -711,43 +603,9 @@ pub async fn list_triples( } } - let graph = state.graph.read().await; - - // Build pattern based on provided filters - let mut pattern = TriplePattern::any(); - - if let Some(ref subject) = query.subject { - pattern = pattern.with_subject(NodeId::named(subject)); - } - if let Some(ref predicate) = query.predicate { - pattern = pattern.with_predicate(Predicate::named(predicate)); - } - - let triples = graph.find(pattern)?; - - // Filter by namespace if present - let ns_filter = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); - let triples: Vec = if let Some(ref ns) = ns_filter { - triples.into_iter().filter(|t| is_in_namespace(&t.subject.to_string(), ns)).collect() - } else { - triples - }; - - // Apply pagination - let total = triples.len(); - let triples: Vec = triples - .into_iter() - .skip(query.offset) - .take(query.limit) - .map(|t| t.into()) - .collect(); - - Ok(Json(ListTriplesResponse { - triples, - total, - limit: query.limit, - offset: query.offset, - })) + let namespace = ns_ext.and_then(|axum::Extension(RequestNamespace(ns))| ns); + let resp = crate::service::triples::list_triples(&state, query, namespace).await?; + Ok(Json(resp)) } /// Response for listing triples @@ -760,6 +618,7 @@ pub struct ListTriplesResponse { } /// Request to batch-insert multiple triples +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] #[derive(Debug, Deserialize)] pub struct BatchInsertRequest { pub triples: Vec, @@ -781,34 +640,12 @@ pub async fn batch_insert_triples( ns_ext: Option>, Json(req): Json, ) -> Result<(StatusCode, Json)> { - if req.triples.is_empty() { - return Ok(( - StatusCode::OK, - Json(BatchInsertResponse { - inserted: vec![], - total: 0, - duplicates: 0, - }), - )); - } + let empty = req.triples.is_empty(); - // Validate all inputs first - for (i, t) in req.triples.iter().enumerate() { - if t.subject.is_empty() { - return Err(Error::InvalidInput(format!( - "Triple [{}]: subject cannot be empty", - i - ))); - } - if t.predicate.is_empty() { - return Err(Error::InvalidInput(format!( - "Triple [{}]: predicate cannot be empty", - i - ))); - } - // Enforce namespace scoping - if let Some(axum::Extension(RequestNamespace(Some(ref ns)))) = ns_ext { - if !is_in_namespace(&t.subject, ns) { + // Enforce namespace scoping (transport concern — stays in REST). + if let Some(axum::Extension(RequestNamespace(Some(ref ns)))) = ns_ext { + for (i, t) in req.triples.iter().enumerate() { + if !t.subject.is_empty() && !is_in_namespace(&t.subject, ns) { return Err(Error::Forbidden(format!( "Triple [{}]: subject \"{}\" is not in namespace \"{}\"", i, t.subject, ns @@ -817,90 +654,20 @@ pub async fn batch_insert_triples( } } - // Build Triple objects - let triples: Vec = req - .triples - .iter() - .map(|t| { - let object: Value = t.object.clone().into(); - Triple::new( - NodeId::named(&t.subject), - Predicate::named(&t.predicate), - object, - ) - }) - .collect(); - - let count_before = { - let graph = state.graph.read().await; - graph.count() - }; + let namespace = ns_ext + .as_ref() + .and_then(|axum::Extension(RequestNamespace(ns))| ns.clone()); - // Atomic batch insert - let ids = { - let graph = state.graph.read().await; - graph.insert_batch(triples)? - }; + // Delegate the shared validate + atomic insert + audit + event side-effects. + let resp = crate::service::triples::batch_insert(&state, req, namespace).await?; - let count_after = { - let graph = state.graph.read().await; - graph.count() + // An empty batch is a no-op success (parity with the prior handler). + let status = if empty { + StatusCode::OK + } else { + StatusCode::CREATED }; - - let actually_inserted = count_after - count_before; - let duplicates = ids.len() - actually_inserted; - - // Build response DTOs - let inserted: Vec = ids - .iter() - .zip(req.triples.iter()) - .map(|(id, t)| TripleDto { - id: Some(id.to_hex()), - subject: format!("<{}>", t.subject), - predicate: format!("<{}>", t.predicate), - object: t.object.clone(), - created_at: Some(chrono::Utc::now().to_rfc3339()), - }) - .collect(); - - // Record audit entry - { - let namespace = ns_ext - .as_ref() - .and_then(|axum::Extension(RequestNamespace(ns))| ns.clone()); - let mut audit = state.audit_log.write().await; - audit.record(AuditEntry { - timestamp: chrono::Utc::now().to_rfc3339(), - user_id: namespace.clone().unwrap_or_else(|| "anonymous".to_string()), - namespace, - action: "batch_create".to_string(), - resource: "/api/v1/triples/batch".to_string(), - details: Some(format!( - "inserted={}, duplicates={}", - actually_inserted, duplicates - )), - request_id: None, - }); - } - - // Broadcast events for new triples - for (id, t) in ids.iter().zip(req.triples.iter()) { - state.broadcaster.broadcast(Event::TripleAdded { - hash: id.to_hex(), - subject: t.subject.clone(), - predicate: t.predicate.clone(), - object: serde_json::to_value(&t.object).unwrap_or_default(), - }); - } - - Ok(( - StatusCode::CREATED, - Json(BatchInsertResponse { - total: inserted.len(), - duplicates, - inserted, - }), - )) + Ok((status, Json(resp))) } /// Re-export shared Raft write error handler for this module. diff --git a/crates/aingle_cortex/src/server.rs b/crates/aingle_cortex/src/server.rs index 588682ab..8720662e 100644 --- a/crates/aingle_cortex/src/server.rs +++ b/crates/aingle_cortex/src/server.rs @@ -45,6 +45,8 @@ pub struct CortexConfig { /// - `Some(path)` — persist to the given directory. /// - `None` — persist to the default `~/.aingle/cortex/graph.sled`. pub db_path: Option, + /// If `true`, serve MCP over stdio instead of binding a TCP listener. + pub mcp_mode: bool, } impl Default for CortexConfig { @@ -62,6 +64,7 @@ impl Default for CortexConfig { max_body_size: 1024 * 1024, // 1MB flush_interval_secs: 300, db_path: None, + mcp_mode: false, } } } diff --git a/crates/aingle_cortex/src/service/dag.rs b/crates/aingle_cortex/src/service/dag.rs new file mode 100644 index 00000000..e43997bf --- /dev/null +++ b/crates/aingle_cortex/src/service/dag.rs @@ -0,0 +1,208 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! DAG provenance business logic shared by REST and MCP. + +use crate::error::{Error, Result}; +use crate::rest::dag::{ + action_to_dto, DagActionDto, DagStatsResponse, DagTipsResponse, PruneRequest, PruneResponse, +}; +use crate::state::AppState; + +/// Return DAG actions affecting a subject, newest first, up to `limit`. +pub async fn history_by_subject( + state: &AppState, + subject: &str, + limit: usize, +) -> Result> { + let graph = state.graph.read().await; + let actions = graph + .dag_history_by_subject(subject, limit) + .map_err(|e| Error::Internal(e.to_string()))?; + Ok(actions.iter().map(action_to_dto).collect()) +} + +/// Return the current DAG tip hashes and their count. +pub async fn tips(state: &AppState) -> Result { + let graph = state.graph.read().await; + let dag_store = graph + .dag_store() + .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; + + let tips = dag_store + .tips() + .map_err(|e| Error::Internal(e.to_string()))?; + let tip_strings: Vec = tips.iter().map(|h| h.to_hex()).collect(); + let count = tip_strings.len(); + + Ok(DagTipsResponse { + tips: tip_strings, + count, + }) +} + +/// Fetch a single DAG action by its hex hash. `NotFound` if absent. +pub async fn action(state: &AppState, hash: &str) -> Result { + let action_hash = aingle_graph::dag::DagActionHash::from_hex(hash) + .ok_or_else(|| Error::InvalidInput(format!("Invalid DAG action hash: {}", hash)))?; + + let graph = state.graph.read().await; + let dag_store = graph + .dag_store() + .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; + + let action = dag_store + .get(&action_hash) + .map_err(|e| Error::Internal(e.to_string()))? + .ok_or_else(|| Error::NotFound(format!("DAG action {} not found", hash)))?; + + Ok(action_to_dto(&action)) +} + +/// Return an author's action chain, newest first, up to `limit`. +pub async fn chain(state: &AppState, author: &str, limit: usize) -> Result> { + let author = aingle_graph::NodeId::named(author); + + let graph = state.graph.read().await; + let dag_store = graph + .dag_store() + .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; + + let actions = dag_store + .chain(&author, limit) + .map_err(|e| Error::Internal(e.to_string()))?; + + Ok(actions.iter().map(action_to_dto).collect()) +} + +/// Return DAG statistics: action count and tip count. +pub async fn stats(state: &AppState) -> Result { + let graph = state.graph.read().await; + let dag_store = graph + .dag_store() + .ok_or_else(|| Error::Internal("DAG not enabled".into()))?; + + let action_count = dag_store.action_count(); + let tip_count = dag_store + .tip_count() + .map_err(|e| Error::Internal(e.to_string()))?; + + Ok(DagStatsResponse { + action_count, + tip_count, + }) +} + +/// Prune the DAG according to a retention policy, optionally checkpointing. +pub async fn prune(state: &AppState, req: PruneRequest) -> Result { + let policy = match req.policy.as_str() { + "keep_all" => aingle_graph::dag::RetentionPolicy::KeepAll, + "keep_since" => aingle_graph::dag::RetentionPolicy::KeepSince { seconds: req.value }, + "keep_last" => aingle_graph::dag::RetentionPolicy::KeepLast(req.value as usize), + "keep_depth" => aingle_graph::dag::RetentionPolicy::KeepDepth(req.value as usize), + other => return Err(Error::InvalidInput(format!("Unknown policy: {}", other))), + }; + + let graph = state.graph.read().await; + let result = graph + .dag_prune(&policy, req.create_checkpoint) + .map_err(|e| Error::Internal(e.to_string()))?; + + Ok(PruneResponse { + pruned_count: result.pruned_count, + retained_count: result.retained_count, + checkpoint_hash: result.checkpoint_hash.map(|h| h.to_hex()), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn history_of_unknown_subject_is_empty() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + + // A fresh in-memory graph has no DAG store; `dag_history_by_subject` + // returns a "DAG not enabled" error until the DAG is enabled. + // Enable it the way the node does at startup, then query. + { + let mut graph = state.graph.write().await; + graph.enable_dag(); + } + + let h = history_by_subject(&state, "ex:nobody", 10).await.unwrap(); + assert!(h.is_empty()); + } + + /// Enable the DAG on a fresh in-memory state, mirroring node startup. + /// Without this, DAG service fns return `Error::Config("DAG not enabled")`. + async fn enabled_state() -> AppState { + let state = AppState::with_db_path(":memory:", None).unwrap(); + { + let mut graph = state.graph.write().await; + graph.enable_dag(); + } + state + } + + #[tokio::test] + async fn tips_of_empty_dag() { + let state = enabled_state().await; + let resp = tips(&state).await.unwrap(); + assert_eq!(resp.count, resp.tips.len()); + } + + #[tokio::test] + async fn action_with_invalid_hash_is_invalid_input() { + let state = enabled_state().await; + let err = action(&state, "not-a-hash").await.unwrap_err(); + assert!(matches!(err, Error::InvalidInput(_))); + } + + #[tokio::test] + async fn chain_of_unknown_author_is_empty() { + let state = enabled_state().await; + let c = chain(&state, "node:nobody", 10).await.unwrap(); + assert!(c.is_empty()); + } + + #[tokio::test] + async fn stats_of_empty_dag() { + let state = enabled_state().await; + let s = stats(&state).await.unwrap(); + assert_eq!(s.action_count, 0); + } + + #[tokio::test] + async fn prune_keep_all_prunes_nothing() { + let state = enabled_state().await; + let resp = prune( + &state, + PruneRequest { + policy: "keep_all".into(), + value: 0, + create_checkpoint: false, + }, + ) + .await + .unwrap(); + assert_eq!(resp.pruned_count, 0); + } + + #[tokio::test] + async fn prune_unknown_policy_is_invalid_input() { + let state = enabled_state().await; + let err = prune( + &state, + PruneRequest { + policy: "bogus".into(), + value: 0, + create_checkpoint: false, + }, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::InvalidInput(_))); + } +} diff --git a/crates/aingle_cortex/src/service/mod.rs b/crates/aingle_cortex/src/service/mod.rs new file mode 100644 index 00000000..bcc9f1ca --- /dev/null +++ b/crates/aingle_cortex/src/service/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Business-logic layer shared by REST handlers and the MCP server. + +#[cfg(feature = "dag")] +pub mod dag; +pub mod proof; +pub mod query; +pub mod reputation; +pub mod skill; +#[cfg(feature = "sparql")] +pub mod sparql; +pub mod stats; +pub mod triples; +pub mod validate; diff --git a/crates/aingle_cortex/src/service/proof.rs b/crates/aingle_cortex/src/service/proof.rs new file mode 100644 index 00000000..8a227b1f --- /dev/null +++ b/crates/aingle_cortex/src/service/proof.rs @@ -0,0 +1,146 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Proof verification business logic shared by REST and MCP. + +use crate::error::{Error, Result}; +use crate::rest::{GetProofRequest, ProofResponse, VerifyProofByIdRequest, VerifyProofResponse}; +use crate::state::AppState; + +/// Fetch a stored proof by its ID. +/// +/// Semantics (preserved from the REST `GET /api/v1/proofs/:id` handler): +/// - Proof exists -> `Ok(ProofResponse)`. +/// - Proof does not exist -> `Err(Error::NotFound(..))`. +pub async fn get_proof(state: &AppState, req: GetProofRequest) -> Result { + let proof_id = req.proof_id; + + let proof = state + .proof_store + .get(&proof_id) + .await + .ok_or_else(|| Error::NotFound(format!("Proof {} not found", proof_id)))?; + + Ok(ProofResponse::from(proof)) +} + +/// Verify a stored proof by its ID. +/// +/// Semantics (preserved from commit 53cca2c, "proof verify endpoint returns +/// 200+valid:false instead of 422"): +/// - Proof exists and verifies cleanly -> `Ok(VerifyProofResponse { valid, .. })`. +/// - Proof exists but its data is malformed / fails verification at the ZK +/// layer -> `Ok(VerifyProofResponse { valid: false, .. })` with the error in +/// `details`. This is NOT an `Err`: verification answering "this proof is not +/// valid" is a successful answer, not a server error. +/// - Proof does not exist -> `Err(Error::NotFound(..))`. +pub async fn verify_proof( + state: &AppState, + req: VerifyProofByIdRequest, +) -> Result { + let proof_id = req.proof_id; + + match state.proof_store.verify(&proof_id).await { + Ok(result) => Ok(VerifyProofResponse { + proof_id: proof_id.clone(), + valid: result.valid, + verified_at: result.verified_at, + details: result.details, + verification_time_us: result.verification_time_us, + }), + Err(crate::proofs::VerificationError::ProofNotFound(_)) => { + Err(Error::NotFound(format!("Proof {} not found", proof_id))) + } + Err(e) => { + // Verification infrastructure error (bad proof data format, ZK error, + // etc.) -> 200 with valid=false + error details instead of 422. + Ok(VerifyProofResponse { + proof_id: proof_id.clone(), + valid: false, + verified_at: chrono::Utc::now(), + details: vec![format!("Verification error: {}", e)], + verification_time_us: 0, + }) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::proofs::{ProofType, SubmitProofRequest}; + + #[tokio::test] + async fn verifying_invalid_proof_returns_valid_false() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + + // Submit a proof whose `proof_data` is structurally-valid JSON but is + // NOT a parseable `aingle_zk::ZkProof` envelope. The proof therefore + // EXISTS in the store (so we don't hit the ProofNotFound path), but the + // verifier fails to deserialize it -> the service must return + // Ok(valid: false), NOT Err. + let proof_id = state + .proof_store + .submit(SubmitProofRequest { + proof_type: ProofType::Schnorr, + proof_data: serde_json::json!({ "garbage": "not-a-zk-proof" }), + metadata: None, + }) + .await + .expect("submit should succeed; only verification is expected to fail"); + + let req = VerifyProofByIdRequest { + proof_id: proof_id.clone(), + }; + + let resp = verify_proof(&state, req) + .await + .expect("invalid proof must return Ok (200), not Err"); + assert!(!resp.valid, "bogus proof data must yield valid:false"); + assert_eq!(resp.proof_id, proof_id); + } + + #[tokio::test] + async fn getting_missing_proof_returns_not_found() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + + let req = GetProofRequest { + proof_id: "does-not-exist".to_string(), + }; + + let err = get_proof(&state, req) + .await + .expect_err("missing proof must return Err(NotFound)"); + assert!( + matches!(err, Error::NotFound(_)), + "expected NotFound, got {err:?}" + ); + } + + #[tokio::test] + async fn getting_existing_proof_round_trips() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + + let proof_id = state + .proof_store + .submit(SubmitProofRequest { + proof_type: ProofType::Schnorr, + proof_data: serde_json::json!({ "some": "data" }), + metadata: None, + }) + .await + .expect("submit should succeed"); + + let resp = get_proof( + &state, + GetProofRequest { + proof_id: proof_id.clone(), + }, + ) + .await + .expect("stored proof must be fetchable"); + + assert_eq!(resp.id, proof_id); + assert_eq!(resp.proof_type, ProofType::Schnorr); + } +} diff --git a/crates/aingle_cortex/src/service/query.rs b/crates/aingle_cortex/src/service/query.rs new file mode 100644 index 00000000..11605bf2 --- /dev/null +++ b/crates/aingle_cortex/src/service/query.rs @@ -0,0 +1,319 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Query business logic shared by REST and MCP. + +use crate::error::Result; +use crate::middleware::is_in_namespace; +use crate::rest::{ + ListPredicatesQuery, ListPredicatesResponse, ListSubjectsQuery, ListSubjectsResponse, + PatternDescription, PatternQueryRequest, PatternQueryResponse, TripleDto, +}; +use crate::state::AppState; +use aingle_graph::{NodeId, Predicate, TriplePattern, Value}; + +/// Hard maximum for any query to prevent OOM on large graphs. +const MAX_QUERY_LIMIT: usize = 10_000; + +/// Execute a pattern-matching query. `namespace` filters subjects when `Some` +/// (REST passes the request namespace; MCP passes `None`). +pub async fn query_pattern( + state: &AppState, + req: PatternQueryRequest, + namespace: Option, +) -> Result { + let graph = state.graph.read().await; + + let mut pattern = TriplePattern::any(); + if let Some(ref subject) = req.subject { + pattern = pattern.with_subject(NodeId::named(subject)); + } + if let Some(ref predicate) = req.predicate { + pattern = pattern.with_predicate(Predicate::named(predicate)); + } + if let Some(ref object) = req.object { + let obj: Value = object.clone().into(); + pattern = pattern.with_object(obj); + } + + let triples = graph.find(pattern)?; + + let effective_limit = req.limit.min(MAX_QUERY_LIMIT); + + let triples: Vec<_> = if let Some(ref ns) = namespace { + triples + .into_iter() + .filter(|t| is_in_namespace(&t.subject.to_string(), ns)) + .collect() + } else { + triples + }; + + let total = triples.len(); + let matches: Vec = triples + .into_iter() + .take(effective_limit) + .map(|t| t.into()) + .collect(); + + Ok(PatternQueryResponse { + matches, + total, + pattern: PatternDescription { + subject: req.subject, + predicate: req.predicate, + object: req + .object + .map(|o| serde_json::to_value(o).unwrap_or_default()), + }, + }) +} + +/// List unique subjects, optionally filtered by predicate. `namespace` filters +/// subjects when `Some` (REST passes the request namespace; MCP passes `None`). +pub async fn list_subjects( + state: &AppState, + query: ListSubjectsQuery, + namespace: Option, +) -> Result { + let graph = state.graph.read().await; + + let pattern = if let Some(ref predicate) = query.predicate { + TriplePattern::predicate(Predicate::named(predicate)) + } else { + TriplePattern::any() + }; + + let triples = graph.find(pattern)?; + let mut subjects: Vec = triples + .into_iter() + .map(|t| t.subject.to_string()) + .filter(|s| namespace.as_ref().is_none_or(|ns| is_in_namespace(s, ns))) + .collect(); + subjects.sort(); + subjects.dedup(); + + let total = subjects.len(); + let subjects: Vec = subjects.into_iter().take(query.limit).collect(); + + Ok(ListSubjectsResponse { subjects, total }) +} + +/// List unique predicates, optionally filtered by subject. `namespace` filters +/// by subject namespace when `Some` (REST passes the request namespace; MCP +/// passes `None`). +pub async fn list_predicates( + state: &AppState, + query: ListPredicatesQuery, + namespace: Option, +) -> Result { + let graph = state.graph.read().await; + + let pattern = if let Some(ref subject) = query.subject { + TriplePattern::subject(NodeId::named(subject)) + } else { + TriplePattern::any() + }; + + let triples = graph.find(pattern)?; + let mut predicates: Vec = triples + .into_iter() + .filter(|t| { + namespace + .as_ref() + .is_none_or(|ns| is_in_namespace(&t.subject.to_string(), ns)) + }) + .map(|t| t.predicate.to_string()) + .collect(); + predicates.sort(); + predicates.dedup(); + + let total = predicates.len(); + let predicates: Vec = predicates.into_iter().take(query.limit).collect(); + + Ok(ListPredicatesResponse { predicates, total }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn query_empty_graph_returns_no_matches() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let req = PatternQueryRequest { + subject: None, + predicate: None, + object: None, + limit: 100, + }; + let resp = query_pattern(&state, req, None).await.unwrap(); + assert_eq!(resp.total, 0); + assert!(resp.matches.is_empty()); + } + + #[tokio::test] + async fn query_with_data_round_trips() { + use aingle_graph::Triple; + + let state = AppState::with_db_path(":memory:", None).unwrap(); + + // Insert a few triples sharing a predicate so a bound query matches. + { + let graph = state.graph.read().await; + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:knows"), + Value::Node(NodeId::named("ex:bob")), + )) + .unwrap(); + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:knows"), + Value::Node(NodeId::named("ex:carol")), + )) + .unwrap(); + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:name"), + Value::String("Alice".into()), + )) + .unwrap(); + } + + // Bound predicate => the two `ex:knows` triples. + let req = PatternQueryRequest { + subject: None, + predicate: Some("ex:knows".to_string()), + object: None, + limit: 100, + }; + let resp = query_pattern(&state, req, None).await.unwrap(); + assert_eq!(resp.total, 2); + assert_eq!(resp.matches.len(), 2); + + // Non-matching predicate => no results. + let req = PatternQueryRequest { + subject: None, + predicate: Some("ex:nonexistent".to_string()), + object: None, + limit: 100, + }; + let resp = query_pattern(&state, req, None).await.unwrap(); + assert_eq!(resp.total, 0); + assert!(resp.matches.is_empty()); + } + + #[tokio::test] + async fn list_subjects_returns_unique_sorted() { + use aingle_graph::Triple; + + let state = AppState::with_db_path(":memory:", None).unwrap(); + { + let graph = state.graph.read().await; + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:knows"), + Value::Node(NodeId::named("ex:bob")), + )) + .unwrap(); + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:name"), + Value::String("Alice".into()), + )) + .unwrap(); + graph + .insert(Triple::new( + NodeId::named("ex:bob"), + Predicate::named("ex:name"), + Value::String("Bob".into()), + )) + .unwrap(); + } + + // All subjects, deduped: alice + bob. + let query = ListSubjectsQuery { + predicate: None, + limit: 100, + }; + let resp = list_subjects(&state, query, None).await.unwrap(); + assert_eq!(resp.total, 2); + assert_eq!(resp.subjects, vec!["", ""]); + + // Filter by predicate => only subjects with `ex:knows` (alice). + let query = ListSubjectsQuery { + predicate: Some("ex:knows".to_string()), + limit: 100, + }; + let resp = list_subjects(&state, query, None).await.unwrap(); + assert_eq!(resp.total, 1); + assert_eq!(resp.subjects, vec![""]); + } + + #[tokio::test] + async fn list_predicates_returns_unique_sorted() { + use aingle_graph::Triple; + + let state = AppState::with_db_path(":memory:", None).unwrap(); + { + let graph = state.graph.read().await; + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:knows"), + Value::Node(NodeId::named("ex:bob")), + )) + .unwrap(); + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:name"), + Value::String("Alice".into()), + )) + .unwrap(); + graph + .insert(Triple::new( + NodeId::named("ex:bob"), + Predicate::named("ex:name"), + Value::String("Bob".into()), + )) + .unwrap(); + } + + // All predicates, deduped: knows + name. + let query = ListPredicatesQuery { + subject: None, + limit: 100, + }; + let resp = list_predicates(&state, query, None).await.unwrap(); + assert_eq!(resp.total, 2); + assert_eq!(resp.predicates, vec!["", ""]); + + // Filter by subject => only predicates used by bob (name). + let query = ListPredicatesQuery { + subject: Some("ex:bob".to_string()), + limit: 100, + }; + let resp = list_predicates(&state, query, None).await.unwrap(); + assert_eq!(resp.total, 1); + assert_eq!(resp.predicates, vec![""]); + } + + #[tokio::test] + async fn list_subjects_empty_graph() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let query = ListSubjectsQuery { + predicate: None, + limit: 100, + }; + let resp = list_subjects(&state, query, None).await.unwrap(); + assert_eq!(resp.total, 0); + assert!(resp.subjects.is_empty()); + } +} diff --git a/crates/aingle_cortex/src/service/reputation.rs b/crates/aingle_cortex/src/service/reputation.rs new file mode 100644 index 00000000..568c7454 --- /dev/null +++ b/crates/aingle_cortex/src/service/reputation.rs @@ -0,0 +1,218 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Reputation business logic shared by REST and MCP. +//! +//! Agent assertion consistency scoring and batch assertion verification. Both +//! operations are read-only: they inspect the graph + logic engine and never +//! mutate state. Like the REST handlers, neither returns a hard error for empty +//! or unknown input — an unknown agent yields a well-formed default response +//! (score 0.0), and a batch of non-existent assertions yields `verified:false` +//! per entry. + +use crate::middleware::is_in_namespace; +use crate::rest::{ + AssertionVerifyResult, BatchVerifyAssertionsRequest, BatchVerifyAssertionsResponse, + ConsistencyResponse, +}; +use crate::state::AppState; +use aingle_graph::{NodeId, Value}; + +/// Compute an agent's assertion consistency score. +/// +/// Semantics preserved from the REST `GET /api/v1/agents/:id/consistency` +/// handler: collects every assertion owned by (or prefixed with) the agent node +/// and reports the fraction that pass PoL validation. `namespace` selects the +/// agent-node namespace prefix; REST passes the request namespace, MCP passes +/// `None` (defaulting to the `mayros` namespace, matching the handler default). +pub async fn agent_consistency( + state: &AppState, + agent_id: &str, + namespace: Option, +) -> ConsistencyResponse { + let ns_prefix = namespace.unwrap_or_else(|| "mayros".to_string()); + + // Phase 1: collect all triples we need from the graph, then drop the lock. + let (owned_subject_triples, prefixed_triples) = { + let graph = state.graph.read().await; + + let agent_node = Value::node(NodeId::named(format!("{}:agent:{}", ns_prefix, agent_id))); + + // Collect owned triples (assertedBy / ownedBy) and their subject triples. + let mut owned = Vec::new(); + if let Ok(triples) = graph.get_object(&agent_node) { + for triple in &triples { + let pred_str = triple.predicate.as_str(); + if pred_str.ends_with(":assertedBy") || pred_str.ends_with(":ownedBy") { + let subject_triples = graph.get_subject(&triple.subject).unwrap_or_default(); + owned.push(subject_triples); + } + } + } + + // Collect agent-prefixed assertion triples. + let agent_prefix = format!("{}:agent:{}:", ns_prefix, agent_id); + let mut prefixed = Vec::new(); + if let Ok(prefixed_subjects) = graph.subjects_with_prefix(&agent_prefix) { + for subj in &prefixed_subjects { + if let Ok(subj_triples) = graph.get_subject(subj) { + let filtered: Vec<_> = subj_triples + .into_iter() + .filter(|t| { + let p = t.predicate.as_str(); + !p.ends_with(":assertedBy") && !p.ends_with(":ownedBy") + }) + .collect(); + prefixed.push(filtered); + } + } + } + + (owned, prefixed) + // graph lock dropped here + }; + + // Phase 2: validate with the logic engine (separate lock). + let logic = state.logic.read().await; + + let mut total: usize = 0; + let mut verified: usize = 0; + + for subject_triples in &owned_subject_triples { + total += 1; + let any_valid = subject_triples.iter().any(|t| logic.validate(t).is_valid); + if any_valid { + verified += 1; + } + } + + for triples in &prefixed_triples { + for t in triples { + total += 1; + if logic.validate(t).is_valid { + verified += 1; + } + } + } + + drop(logic); + + let score = if total > 0 { + verified as f64 / total as f64 + } else { + 0.0 + }; + + ConsistencyResponse { + score, + total, + verified, + } +} + +/// Batch-verify assertion proofs. +/// +/// Semantics preserved from the REST `POST /api/v1/assertions/verify-batch` +/// handler: for each `(subject, predicate)` reference, locates the matching +/// triple and reports whether it passes PoL validation. Missing triples (and, +/// when `namespace` is `Some`, out-of-namespace subjects) report +/// `verified:false` rather than erroring. `namespace` is the request namespace +/// for REST and `None` for the MCP path. +pub async fn batch_verify_assertions( + state: &AppState, + req: BatchVerifyAssertionsRequest, + namespace: Option, +) -> BatchVerifyAssertionsResponse { + let ns_filter = namespace; + + // Phase 1: collect matching triples from the graph, then drop the lock. + let assertion_triples: Vec<_> = { + let graph = state.graph.read().await; + + req.assertions + .iter() + .map(|assertion| { + if let Some(ref ns) = ns_filter { + if !is_in_namespace(&assertion.subject, ns) { + return None; + } + } + let subj = NodeId::named(&assertion.subject); + let triples = graph.get_subject(&subj).unwrap_or_default(); + triples + .into_iter() + .find(|t| t.predicate.as_str() == assertion.predicate) + }) + .collect() + // graph lock dropped here + }; + + // Phase 2: validate with the logic engine (separate lock). + let logic = state.logic.read().await; + + let results: Vec = req + .assertions + .iter() + .zip(assertion_triples.iter()) + .map(|(assertion, maybe_triple)| { + let verified = maybe_triple + .as_ref() + .map(|t| logic.validate(t).is_valid) + .unwrap_or(false); + AssertionVerifyResult { + subject: assertion.subject.clone(), + predicate: assertion.predicate.clone(), + verified, + } + }) + .collect(); + + drop(logic); + + BatchVerifyAssertionsResponse { results } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn consistency_of_unknown_agent_is_zero() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + + let resp = agent_consistency(&state, "nobody", None).await; + assert_eq!(resp.total, 0); + assert_eq!(resp.verified, 0); + assert_eq!(resp.score, 0.0); + } + + #[tokio::test] + async fn batch_verify_empty_returns_empty_results() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + + let req = BatchVerifyAssertionsRequest { assertions: vec![] }; + let resp = batch_verify_assertions(&state, req, None).await; + assert!(resp.results.is_empty()); + } + + #[tokio::test] + async fn batch_verify_unknown_assertion_is_unverified() { + use crate::rest::AssertionRef; + + let state = AppState::with_db_path(":memory:", None).unwrap(); + + // A reference to a triple that does not exist must come back as a + // well-formed result with verified:false (not a hard error). + let req = BatchVerifyAssertionsRequest { + assertions: vec![AssertionRef { + subject: "ex:thing".to_string(), + predicate: "ex:claims".to_string(), + }], + }; + let resp = batch_verify_assertions(&state, req, None).await; + assert_eq!(resp.results.len(), 1); + assert_eq!(resp.results[0].subject, "ex:thing"); + assert_eq!(resp.results[0].predicate, "ex:claims"); + assert!(!resp.results[0].verified); + } +} diff --git a/crates/aingle_cortex/src/service/skill.rs b/crates/aingle_cortex/src/service/skill.rs new file mode 100644 index 00000000..45b51acb --- /dev/null +++ b/crates/aingle_cortex/src/service/skill.rs @@ -0,0 +1,190 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Skill verification business logic shared by REST and MCP. +//! +//! Covers semantic skill manifest validation, temporary sandbox namespace +//! creation, and sandbox cleanup. The REST handlers in +//! [`crate::rest::skill_verification`] delegate to these functions so the MCP +//! tools and HTTP surface share a single implementation. + +use crate::rest::{ + CreateSandboxRequest, CreateSandboxResponse, DeleteSandboxResponse, ValidateManifestRequest, + ValidateManifestResponse, +}; +use crate::state::AppState; +use aingle_graph::{NodeId, Predicate, Triple, Value}; + +/// Validate a semantic skill manifest against the PoL logic engine. +/// +/// For every declared assertion that requires a proof, a probe triple is run +/// through the logic engine; if no PoL rules match the predicate, a validation +/// error is recorded. Validation never mutates state. Returns a response whose +/// `valid` flag is `true` iff no errors were collected (mirrors the REST +/// handler exactly). +pub async fn validate_manifest( + state: &AppState, + req: ValidateManifestRequest, +) -> ValidateManifestResponse { + let logic = state.logic.read().await; + let mut errors: Vec = Vec::new(); + + for assertion in &req.assertions { + let ns_pred = if assertion.predicate.contains(':') { + assertion.predicate.clone() + } else { + format!("{}:{}", req.namespace, assertion.predicate) + }; + + if assertion.require_proof { + let test_triple = Triple::new( + NodeId::named(format!("{}:_test", req.namespace)), + Predicate::named(&ns_pred), + Value::literal("_test_value"), + ); + let result = logic.validate(&test_triple); + if result.matches.is_empty() { + errors.push(format!( + "Assertion predicate '{}' requires proof but no PoL rules found", + ns_pred + )); + } + } + } + + let valid = errors.is_empty(); + ValidateManifestResponse { valid, errors } +} + +/// Create a temporary sandbox namespace and register it in the sandbox manager. +/// +/// Generates a unique sandbox id and derived namespace, registers it with the +/// requested TTL, and returns the id/namespace. Mutates sandbox state (mirrors +/// the REST handler). +pub async fn create_sandbox(state: &AppState, req: CreateSandboxRequest) -> CreateSandboxResponse { + let sandbox_id = format!("sandbox-{}", uuid::Uuid::new_v4()); + let sandbox_ns = format!("{}:{}", req.namespace, sandbox_id); + + state + .sandbox_manager + .create(sandbox_id.clone(), sandbox_ns.clone(), req.ttl_seconds) + .await; + + CreateSandboxResponse { + id: sandbox_id, + namespace: sandbox_ns, + } +} + +/// Delete a sandbox namespace by id, removing all triples under it. +/// +/// Deregisters the sandbox from the manager and, if it existed, deletes every +/// triple whose subject begins with the sandbox namespace. Returns a response +/// describing whether anything was deleted. Deleting an unknown id yields +/// `{ deleted: false, error: "sandbox not found" }` (mirrors the REST handler). +pub async fn delete_sandbox(state: &AppState, sandbox_id: &str) -> DeleteSandboxResponse { + let removed = state.sandbox_manager.remove(sandbox_id).await; + + if let Some(namespace) = removed { + let graph = state.graph.write().await; + let deleted = graph.delete_by_subject_prefix(&namespace).unwrap_or(0); + + DeleteSandboxResponse { + deleted: true, + namespace: Some(namespace), + triples_removed: Some(deleted), + error: None, + } + } else { + DeleteSandboxResponse { + deleted: false, + namespace: None, + triples_removed: None, + error: Some("sandbox not found".to_string()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rest::AssertionDecl; + + #[tokio::test] + async fn validate_manifest_no_proof_required_is_valid() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + // A minimal manifest: one assertion that does not require proof, so the + // logic engine is never consulted and validation passes. + let req = ValidateManifestRequest { + namespace: "skill".into(), + assertions: vec![AssertionDecl { + predicate: "hasCapability".into(), + require_proof: false, + }], + }; + let resp = validate_manifest(&state, req).await; + assert!(resp.valid); + assert!(resp.errors.is_empty()); + } + + #[tokio::test] + async fn validate_manifest_proof_required_without_rules_is_invalid() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + // require_proof=true with an empty logic engine => no PoL rules match, + // so the assertion is flagged as invalid. + let req = ValidateManifestRequest { + namespace: "skill".into(), + assertions: vec![AssertionDecl { + predicate: "provesIdentity".into(), + require_proof: true, + }], + }; + let resp = validate_manifest(&state, req).await; + assert!(!resp.valid); + assert_eq!(resp.errors.len(), 1); + assert!(resp.errors[0].contains("provesIdentity")); + } + + #[tokio::test] + async fn create_sandbox_returns_id_and_namespace() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let req = CreateSandboxRequest { + namespace: "skill".into(), + ttl_seconds: 300, + }; + let resp = create_sandbox(&state, req).await; + assert!(resp.id.starts_with("sandbox-")); + assert!(resp.namespace.starts_with("skill:sandbox-")); + // The sandbox is registered: removing it returns its namespace. + let removed = state.sandbox_manager.remove(&resp.id).await; + assert_eq!(removed.as_deref(), Some(resp.namespace.as_str())); + } + + #[tokio::test] + async fn create_then_delete_sandbox_succeeds() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let created = create_sandbox( + &state, + CreateSandboxRequest { + namespace: "skill".into(), + ttl_seconds: 300, + }, + ) + .await; + + let resp = delete_sandbox(&state, &created.id).await; + assert!(resp.deleted); + assert_eq!(resp.namespace.as_deref(), Some(created.namespace.as_str())); + assert_eq!(resp.triples_removed, Some(0)); + assert!(resp.error.is_none()); + } + + #[tokio::test] + async fn delete_unknown_sandbox_reports_not_found() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let resp = delete_sandbox(&state, "sandbox-does-not-exist").await; + assert!(!resp.deleted); + assert!(resp.namespace.is_none()); + assert_eq!(resp.error.as_deref(), Some("sandbox not found")); + } +} diff --git a/crates/aingle_cortex/src/service/sparql.rs b/crates/aingle_cortex/src/service/sparql.rs new file mode 100644 index 00000000..fb784920 --- /dev/null +++ b/crates/aingle_cortex/src/service/sparql.rs @@ -0,0 +1,88 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! SPARQL execution business logic shared by REST and MCP. + +use crate::error::Result; +use crate::sparql::{execute_query, parse_sparql, SparqlRequest, SparqlResponse}; +use crate::state::AppState; + +/// Parse and execute a SPARQL query against the shared graph. +/// +/// This is the core of the REST `execute_sparql` handler, lifted into the +/// service layer so the MCP `aingle_sparql` tool can reuse it. +pub async fn execute(state: &AppState, req: SparqlRequest) -> Result { + let start = std::time::Instant::now(); + + // Parse the query. + let parsed = parse_sparql(&req.query)?; + + // Execute against the graph. + let graph = state.graph.read().await; + let result = execute_query(&graph, &parsed)?; + + let execution_time_ms = start.elapsed().as_millis() as u64; + + Ok(SparqlResponse { + result_type: result.result_type, + variables: result.variables, + bindings: result.bindings, + boolean: result.boolean, + triple_count: result.triple_count, + execution_time_ms, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use aingle_graph::{NodeId, Predicate, Triple, Value}; + + #[tokio::test] + async fn select_returns_inserted_data() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + + { + let graph = state.graph.read().await; + graph + .insert(Triple::new( + NodeId::named("alice"), + Predicate::named("knows"), + Value::Node(NodeId::named("bob")), + )) + .unwrap(); + graph + .insert(Triple::new( + NodeId::named("alice"), + Predicate::named("name"), + Value::String("Alice".into()), + )) + .unwrap(); + } + + let req = SparqlRequest { + query: "SELECT ?s ?p ?o WHERE { ?s ?p ?o }".to_string(), + default_graph: None, + named_graphs: None, + }; + let resp = execute(&state, req).await.unwrap(); + + assert_eq!(resp.result_type, "bindings"); + let bindings = resp.bindings.expect("SELECT should yield bindings"); + // Two triples were inserted; the wildcard SELECT must reflect both. + assert_eq!(bindings.len(), 2); + } + + #[tokio::test] + async fn select_on_empty_graph_succeeds() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let req = SparqlRequest { + query: "SELECT ?s ?p ?o WHERE { ?s ?p ?o }".to_string(), + default_graph: None, + named_graphs: None, + }; + let resp = execute(&state, req).await.unwrap(); + assert_eq!(resp.result_type, "bindings"); + assert!(resp.bindings.unwrap().is_empty()); + } +} diff --git a/crates/aingle_cortex/src/service/stats.rs b/crates/aingle_cortex/src/service/stats.rs new file mode 100644 index 00000000..2c203bcb --- /dev/null +++ b/crates/aingle_cortex/src/service/stats.rs @@ -0,0 +1,42 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Graph statistics business logic shared by REST and MCP. + +use crate::error::Result; +use crate::rest::{GraphStatsDto, ServerStatsDto, StatsResponse}; +use crate::state::AppState; + +/// Compute graph and server statistics (triple count and related metrics). +pub async fn graph_stats(state: &AppState) -> Result { + let stats = state.stats().await; + + Ok(StatsResponse { + graph: GraphStatsDto { + triple_count: stats.triple_count, + subject_count: stats.subject_count, + predicate_count: stats.predicate_count, + object_count: stats.object_count, + }, + server: ServerStatsDto { + connected_clients: stats.connected_clients, + uptime_seconds: 0, // TODO: track actual uptime + version: env!("CARGO_PKG_VERSION").to_string(), + }, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn stats_on_empty_graph() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let stats = graph_stats(&state).await.unwrap(); + assert_eq!(stats.graph.triple_count, 0); + assert_eq!(stats.graph.subject_count, 0); + assert_eq!(stats.graph.predicate_count, 0); + assert_eq!(stats.graph.object_count, 0); + } +} diff --git a/crates/aingle_cortex/src/service/triples.rs b/crates/aingle_cortex/src/service/triples.rs new file mode 100644 index 00000000..28ae7923 --- /dev/null +++ b/crates/aingle_cortex/src/service/triples.rs @@ -0,0 +1,531 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Triple write/read business logic shared by REST and MCP. + +use crate::error::{Error, Result}; +use crate::rest::audit::AuditEntry; +use crate::rest::{ + BatchInsertRequest, BatchInsertResponse, CreateTripleRequest, ListTriplesQuery, + ListTriplesResponse, TripleDto, +}; +use crate::state::{AppState, Event}; +use aingle_graph::{NodeId, Predicate, Triple, TripleId, TriplePattern, Value}; + +/// Create (insert) a single triple, returning its stored form (with hash id). +/// +/// Performs the same side-effects as the REST handler's direct-write path: +/// validates input, inserts into the graph (recording a DAG action when the +/// `dag` feature is enabled), records an audit entry, and broadcasts a +/// `TripleAdded` event. `namespace` scopes the audit entry's user id and is the +/// request namespace for REST (`None` for the MCP path). +/// +/// NOTE: cluster/Raft routing and `HeaderMap`-based replication are transport +/// concerns and remain in the REST handler; this function is the non-cluster +/// direct-write path that both surfaces share for local writes. +pub async fn create_triple( + state: &AppState, + req: CreateTripleRequest, + namespace: Option, +) -> Result { + // Validate input + if req.subject.is_empty() { + return Err(Error::InvalidInput("Subject cannot be empty".to_string())); + } + if req.predicate.is_empty() { + return Err(Error::InvalidInput("Predicate cannot be empty".to_string())); + } + + let object: Value = req.object.clone().into(); + + // Create the triple + let triple = Triple::new( + NodeId::named(&req.subject), + Predicate::named(&req.predicate), + object, + ); + + // Add triple to graph (and record DAG action if enabled) + let triple_id = { + let graph = state.graph.read().await; + let id = graph.insert(triple.clone())?; + + // Record in DAG if enabled + #[cfg(feature = "dag")] + if let Some(dag_store) = graph.dag_store() { + let dag_author = state + .dag_author + .clone() + .unwrap_or_else(|| aingle_graph::NodeId::named("node:local")); + let dag_seq = state + .dag_seq_counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let parents = dag_store.tips().unwrap_or_default(); + + let mut action = aingle_graph::dag::DagAction { + parents, + author: dag_author, + seq: dag_seq, + timestamp: chrono::Utc::now(), + payload: aingle_graph::dag::DagPayload::TripleInsert { + triples: vec![aingle_graph::dag::TripleInsertPayload { + subject: req.subject.clone(), + predicate: req.predicate.clone(), + object: serde_json::to_value(&req.object).unwrap_or_default(), + }], + }, + signature: None, + }; + + if let Some(ref key) = state.dag_signing_key { + key.sign(&mut action); + } + + dag_store.put(&action).map_err(|e| { + Error::Internal(format!( + "DAG action failed for triple insert — data integrity at risk: {e}" + )) + })?; + } + + id + }; + + // Record audit entry + { + let mut audit = state.audit_log.write().await; + audit.record(AuditEntry { + timestamp: chrono::Utc::now().to_rfc3339(), + user_id: namespace.clone().unwrap_or_else(|| "anonymous".to_string()), + namespace, + action: "create".to_string(), + resource: format!("/api/v1/triples/{}", triple_id.to_hex()), + details: Some(format!("subject={}", req.subject)), + request_id: None, + }); + } + + // Broadcast event + state.broadcaster.broadcast(Event::TripleAdded { + hash: triple_id.to_hex(), + subject: req.subject, + predicate: req.predicate, + object: serde_json::to_value(&req.object).unwrap_or_default(), + }); + + Ok(triple.into()) +} + +/// Atomic bulk insert of triples, returning the per-row stored forms plus +/// insert/duplicate counts. +/// +/// Mirrors the REST batch handler's non-cluster direct-write path: validates +/// every row, performs an atomic `insert_batch` (which silently skips +/// duplicates), records a single `batch_create` audit entry, and broadcasts a +/// `TripleAdded` event per row. `namespace` scopes the audit entry. +/// +/// NOTE: cluster/Raft routing and namespace ENFORCEMENT are transport concerns +/// and remain in the REST handler. +pub async fn batch_insert( + state: &AppState, + req: BatchInsertRequest, + namespace: Option, +) -> Result { + if req.triples.is_empty() { + return Ok(BatchInsertResponse { + inserted: vec![], + total: 0, + duplicates: 0, + }); + } + + // Validate all inputs first + for (i, t) in req.triples.iter().enumerate() { + if t.subject.is_empty() { + return Err(Error::InvalidInput(format!( + "Triple [{}]: subject cannot be empty", + i + ))); + } + if t.predicate.is_empty() { + return Err(Error::InvalidInput(format!( + "Triple [{}]: predicate cannot be empty", + i + ))); + } + } + + // Build Triple objects + let triples: Vec = req + .triples + .iter() + .map(|t| { + let object: Value = t.object.clone().into(); + Triple::new( + NodeId::named(&t.subject), + Predicate::named(&t.predicate), + object, + ) + }) + .collect(); + + let count_before = { + let graph = state.graph.read().await; + graph.count() + }; + + // Atomic batch insert + let ids = { + let graph = state.graph.read().await; + graph.insert_batch(triples)? + }; + + let count_after = { + let graph = state.graph.read().await; + graph.count() + }; + + let actually_inserted = count_after - count_before; + let duplicates = ids.len() - actually_inserted; + + // Build response DTOs + let inserted: Vec = ids + .iter() + .zip(req.triples.iter()) + .map(|(id, t)| TripleDto { + id: Some(id.to_hex()), + subject: format!("<{}>", t.subject), + predicate: format!("<{}>", t.predicate), + object: t.object.clone(), + created_at: Some(chrono::Utc::now().to_rfc3339()), + }) + .collect(); + + // Record audit entry + { + let mut audit = state.audit_log.write().await; + audit.record(AuditEntry { + timestamp: chrono::Utc::now().to_rfc3339(), + user_id: namespace.clone().unwrap_or_else(|| "anonymous".to_string()), + namespace, + action: "batch_create".to_string(), + resource: "/api/v1/triples/batch".to_string(), + details: Some(format!( + "inserted={}, duplicates={}", + actually_inserted, duplicates + )), + request_id: None, + }); + } + + // Broadcast events for new triples + for (id, t) in ids.iter().zip(req.triples.iter()) { + state.broadcaster.broadcast(Event::TripleAdded { + hash: id.to_hex(), + subject: t.subject.clone(), + predicate: t.predicate.clone(), + object: serde_json::to_value(&t.object).unwrap_or_default(), + }); + } + + Ok(BatchInsertResponse { + total: inserted.len(), + duplicates, + inserted, + }) +} + +/// Fetch a single triple by its hex hash id. +/// +/// Returns `Error::InvalidInput` for a malformed id and `Error::NotFound` when +/// no triple with that id exists — matching the REST handler's behavior. +pub async fn get_triple(state: &AppState, id: &str) -> Result { + let triple_id = TripleId::from_hex(id) + .ok_or_else(|| Error::InvalidInput(format!("Invalid triple ID: {}", id)))?; + + let graph = state.graph.read().await; + let triple = graph + .get(&triple_id)? + .ok_or_else(|| Error::NotFound(format!("Triple {} not found", id)))?; + + Ok(triple.into()) +} + +/// Delete a triple by its hex hash id. +/// +/// Mirrors the REST delete handler's non-cluster direct-write path: deletes from +/// the graph (recording a DAG action when the `dag` feature is enabled), records +/// an audit entry, and broadcasts a `TripleDeleted` event. Returns +/// `Error::NotFound` when no triple with that id exists. +/// +/// NOTE: cluster/Raft routing and namespace ENFORCEMENT remain in the REST +/// handler. +pub async fn delete_triple(state: &AppState, id: &str, namespace: Option) -> Result<()> { + let triple_id = TripleId::from_hex(id) + .ok_or_else(|| Error::InvalidInput(format!("Invalid triple ID: {}", id)))?; + + let deleted = { + let graph = state.graph.read().await; + + // Look up subject before deleting (for DAG indexing) + #[cfg(feature = "dag")] + let subject_for_dag = graph + .get(&triple_id) + .ok() + .flatten() + .map(|t| t.subject.to_string()); + + let deleted = graph.delete(&triple_id)?; + + // Record in DAG if enabled and deletion succeeded + #[cfg(feature = "dag")] + if deleted { + if let Some(dag_store) = graph.dag_store() { + let dag_author = state + .dag_author + .clone() + .unwrap_or_else(|| aingle_graph::NodeId::named("node:local")); + let dag_seq = state + .dag_seq_counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let parents = dag_store.tips().unwrap_or_default(); + + let mut action = aingle_graph::dag::DagAction { + parents, + author: dag_author, + seq: dag_seq, + timestamp: chrono::Utc::now(), + payload: aingle_graph::dag::DagPayload::TripleDelete { + triple_ids: vec![*triple_id.as_bytes()], + subjects: subject_for_dag.into_iter().collect(), + }, + signature: None, + }; + + if let Some(ref key) = state.dag_signing_key { + key.sign(&mut action); + } + + dag_store.put(&action).map_err(|e| { + Error::Internal(format!( + "DAG action failed for triple delete — data integrity at risk: {e}" + )) + })?; + } + } + + deleted + }; + + if deleted { + // Record audit entry + { + let mut audit = state.audit_log.write().await; + audit.record(AuditEntry { + timestamp: chrono::Utc::now().to_rfc3339(), + user_id: namespace.clone().unwrap_or_else(|| "anonymous".to_string()), + namespace, + action: "delete".to_string(), + resource: format!("/api/v1/triples/{}", id), + details: None, + request_id: None, + }); + } + + state.broadcaster.broadcast(Event::TripleDeleted { + hash: id.to_string(), + }); + Ok(()) + } else { + Err(Error::NotFound(format!("Triple {} not found", id))) + } +} + +/// List triples matching optional subject/predicate filters with pagination. +/// +/// `namespace` filters subjects when `Some` (REST passes the request namespace; +/// MCP passes `None`). +pub async fn list_triples( + state: &AppState, + query: ListTriplesQuery, + namespace: Option, +) -> Result { + let graph = state.graph.read().await; + + // Build pattern based on provided filters + let mut pattern = TriplePattern::any(); + + if let Some(ref subject) = query.subject { + pattern = pattern.with_subject(NodeId::named(subject)); + } + if let Some(ref predicate) = query.predicate { + pattern = pattern.with_predicate(Predicate::named(predicate)); + } + + let triples = graph.find(pattern)?; + + // Filter by namespace if present + let triples: Vec = if let Some(ref ns) = namespace { + triples + .into_iter() + .filter(|t| crate::middleware::is_in_namespace(&t.subject.to_string(), ns)) + .collect() + } else { + triples + }; + + // Apply pagination + let total = triples.len(); + let triples: Vec = triples + .into_iter() + .skip(query.offset) + .take(query.limit) + .map(|t| t.into()) + .collect(); + + Ok(ListTriplesResponse { + triples, + total, + limit: query.limit, + offset: query.offset, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rest::ValueDto; + + #[tokio::test] + async fn create_then_count_is_one() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let req = CreateTripleRequest { + subject: "ex:alice".into(), + predicate: "ex:knows".into(), + object: ValueDto::Node { + node: "ex:bob".into(), + }, + }; + let dto = create_triple(&state, req, None).await.unwrap(); + assert!(dto.id.is_some()); + let count = state.graph.read().await.count(); + assert_eq!(count, 1); + } + + fn req(subject: &str, predicate: &str, object_node: &str) -> CreateTripleRequest { + CreateTripleRequest { + subject: subject.into(), + predicate: predicate.into(), + object: ValueDto::Node { + node: object_node.into(), + }, + } + } + + #[tokio::test] + async fn batch_insert_two_triples_count_is_two() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let batch = BatchInsertRequest { + triples: vec![ + req("ex:alice", "ex:knows", "ex:bob"), + req("ex:alice", "ex:knows", "ex:carol"), + ], + }; + let resp = batch_insert(&state, batch, None).await.unwrap(); + assert_eq!(resp.total, 2); + assert_eq!(resp.duplicates, 0); + let count = state.graph.read().await.count(); + assert_eq!(count, 2); + } + + #[tokio::test] + async fn get_triple_round_trips_and_missing_is_not_found() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let id = { + let graph = state.graph.read().await; + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:knows"), + Value::Node(NodeId::named("ex:bob")), + )) + .unwrap() + }; + + let dto = get_triple(&state, &id.to_hex()).await.unwrap(); + assert_eq!(dto.id.as_deref(), Some(id.to_hex().as_str())); + assert_eq!(dto.subject, ""); + + // A well-formed but absent id => NotFound (same as the REST handler). + let bogus = "0".repeat(64); + let err = get_triple(&state, &bogus).await.unwrap_err(); + assert!(matches!(err, Error::NotFound(_))); + } + + #[tokio::test] + async fn delete_triple_removes_it_count_is_zero() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + let id = { + let graph = state.graph.read().await; + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:knows"), + Value::Node(NodeId::named("ex:bob")), + )) + .unwrap() + }; + assert_eq!(state.graph.read().await.count(), 1); + + delete_triple(&state, &id.to_hex(), None).await.unwrap(); + assert_eq!(state.graph.read().await.count(), 0); + + // Deleting again => NotFound. + let err = delete_triple(&state, &id.to_hex(), None).await.unwrap_err(); + assert!(matches!(err, Error::NotFound(_))); + } + + #[tokio::test] + async fn list_triples_returns_inserted() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + { + let graph = state.graph.read().await; + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:knows"), + Value::Node(NodeId::named("ex:bob")), + )) + .unwrap(); + graph + .insert(Triple::new( + NodeId::named("ex:alice"), + Predicate::named("ex:name"), + Value::String("Alice".into()), + )) + .unwrap(); + } + + let query = ListTriplesQuery { + subject: None, + predicate: None, + object: None, + limit: 100, + offset: 0, + }; + let resp = list_triples(&state, query, None).await.unwrap(); + assert_eq!(resp.total, 2); + assert_eq!(resp.triples.len(), 2); + + // Filter by predicate => only the matching triple. + let query = ListTriplesQuery { + subject: None, + predicate: Some("ex:knows".into()), + object: None, + limit: 100, + offset: 0, + }; + let resp = list_triples(&state, query, None).await.unwrap(); + assert_eq!(resp.total, 1); + } +} diff --git a/crates/aingle_cortex/src/service/validate.rs b/crates/aingle_cortex/src/service/validate.rs new file mode 100644 index 00000000..ffcffdac --- /dev/null +++ b/crates/aingle_cortex/src/service/validate.rs @@ -0,0 +1,180 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! Triple validation business logic shared by REST and MCP. + +use crate::error::{Error, Result}; +use crate::middleware::is_in_namespace; +use crate::rest::{ + TripleDto, TripleValidationResult, ValidateRequest, ValidateResponse, ValidationMessage, +}; +use crate::state::{AppState, Event}; +use aingle_graph::{NodeId, Predicate, Triple, Value}; + +/// Validate triple(s) against the logic engine. +/// +/// Semantics preserved from the REST `POST /api/v1/validate` handler: each input +/// triple is run through the PoL logic engine and reported with per-triple +/// validity + messages. A `proof_hash` is generated only when every triple is +/// valid, and a `ValidationCompleted` event is broadcast in that case (matching +/// the handler's side-effect). Validation answering "this triple is invalid" is +/// a successful response (`valid:false`), NOT an error. +/// +/// `namespace` enforces that input subjects fall within the request namespace; +/// REST passes the request namespace, MCP passes `None` (no namespace +/// enforcement). An out-of-namespace subject yields `Err(Error::Forbidden(..))`, +/// exactly as the REST handler does. +pub async fn validate_triples( + state: &AppState, + req: ValidateRequest, + namespace: Option, +) -> Result { + let logic = state.logic.read().await; + + let ns_filter = namespace; + + let mut results = Vec::new(); + let mut all_valid = true; + + for input in req.triples { + // Enforce namespace on input subjects. + if let Some(ref ns) = ns_filter { + if !is_in_namespace(&input.subject, ns) { + return Err(Error::Forbidden(format!( + "Subject \"{}\" is not in namespace \"{}\"", + input.subject, ns + ))); + } + } + let object: Value = input.object.clone().into(); + + // Create a triple for validation. + let triple = Triple::new( + NodeId::named(&input.subject), + Predicate::named(&input.predicate), + object, + ); + + // Validate using logic engine. + let validation = logic.validate(&triple); + + let valid = validation.is_valid(); + if !valid { + all_valid = false; + } + + // Convert messages. + let mut messages = Vec::new(); + for rejection in &validation.rejections { + messages.push(ValidationMessage { + level: "error".to_string(), + message: rejection.reason.clone(), + rule: Some(rejection.rule_id.clone()), + }); + } + for warning in &validation.warnings { + messages.push(ValidationMessage { + level: "warning".to_string(), + message: warning.message.clone(), + rule: Some(warning.rule_id.clone()), + }); + } + + let triple_dto = TripleDto { + id: Some(triple.id().to_hex()), + subject: input.subject.clone(), + predicate: input.predicate.clone(), + object: input.object, + created_at: None, + }; + + results.push(TripleValidationResult { + triple: triple_dto, + valid, + messages, + }); + } + + drop(logic); + + // Generate a simple proof hash if all valid. + let proof_hash = if all_valid { + let mut hasher = blake3::Hasher::new(); + for result in &results { + if let Some(ref id) = result.triple.id { + hasher.update(id.as_bytes()); + } + } + Some(hasher.finalize().to_hex().to_string()) + } else { + None + }; + + // Broadcast validation event (same side-effect as the REST handler). + if let Some(ref hash) = proof_hash { + state.broadcaster.broadcast(Event::ValidationCompleted { + hash: hash.clone(), + valid: all_valid, + proof_hash: proof_hash.clone(), + }); + } + + Ok(ValidateResponse { + valid: all_valid, + results, + proof_hash, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::rest::{ValidateTripleInput, ValueDto}; + + #[tokio::test] + async fn validate_minimal_triple_returns_per_triple_result() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + + let req = ValidateRequest { + triples: vec![ValidateTripleInput { + subject: "ex:alice".to_string(), + predicate: "ex:knows".to_string(), + object: ValueDto::Node { + node: "ex:bob".to_string(), + }, + }], + rule_set: None, + }; + + let resp = validate_triples(&state, req, None) + .await + .expect("validation must return Ok for a well-formed triple"); + + // One input => one per-triple result. + assert_eq!(resp.results.len(), 1); + assert_eq!(resp.results[0].triple.subject, "ex:alice"); + assert_eq!(resp.results[0].triple.predicate, "ex:knows"); + // With no rules loaded, a plain triple validates and a proof hash is + // produced (all_valid => Some). + assert_eq!(resp.valid, resp.proof_hash.is_some()); + } + + #[tokio::test] + async fn validate_empty_request_is_valid_with_proof_hash() { + let state = AppState::with_db_path(":memory:", None).unwrap(); + + let req = ValidateRequest { + triples: vec![], + rule_set: None, + }; + + let resp = validate_triples(&state, req, None) + .await + .expect("empty validation must return Ok"); + // Vacuously valid: no triples failed, so all_valid stays true and a + // (degenerate) proof hash is generated. + assert!(resp.valid); + assert!(resp.results.is_empty()); + assert!(resp.proof_hash.is_some()); + } +} diff --git a/crates/aingle_cortex/src/sparql/mod.rs b/crates/aingle_cortex/src/sparql/mod.rs index 89551f13..ac2e715d 100644 --- a/crates/aingle_cortex/src/sparql/mod.rs +++ b/crates/aingle_cortex/src/sparql/mod.rs @@ -26,6 +26,7 @@ pub fn router() -> Router { /// SPARQL query request #[derive(Debug, Deserialize)] +#[cfg_attr(feature = "mcp", derive(schemars::JsonSchema))] pub struct SparqlRequest { /// SPARQL query string pub query: String, @@ -63,25 +64,8 @@ pub async fn execute_sparql( State(state): State, Json(req): Json, ) -> Result> { - let start = std::time::Instant::now(); - - // Parse the query - let parsed = parse_sparql(&req.query)?; - - // Execute against the graph - let graph = state.graph.read().await; - let result = execute_query(&graph, &parsed)?; - - let execution_time_ms = start.elapsed().as_millis() as u64; - - Ok(Json(SparqlResponse { - result_type: result.result_type, - variables: result.variables, - bindings: result.bindings, - boolean: result.boolean, - triple_count: result.triple_count, - execution_time_ms, - })) + let resp = crate::service::sparql::execute(&state, req).await?; + Ok(Json(resp)) } /// SPARQL result diff --git a/crates/aingle_cortex/tests/mcp_integration.rs b/crates/aingle_cortex/tests/mcp_integration.rs new file mode 100644 index 00000000..73096e03 --- /dev/null +++ b/crates/aingle_cortex/tests/mcp_integration.rs @@ -0,0 +1,156 @@ +// Copyright 2019-2026 Apilium Technologies OÜ. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 OR Commercial + +//! End-to-end MCP test: drive AingleMcp with an in-memory client over a duplex stream. +#![cfg(feature = "mcp")] + +use std::time::Duration; + +use aingle_cortex::mcp::AingleMcp; +use aingle_cortex::state::AppState; +use rmcp::model::CallToolRequestParams; +use rmcp::{RoleClient, ServiceExt}; + +/// Test A — in-process duplex client/server. +/// +/// Spawns `AingleMcp` on one end of a `tokio::io::duplex` pair, connects a bare +/// (`()` handler) rmcp client on the other end, lists tools, and exercises a +/// read tool plus a create→query round-trip. +#[tokio::test] +async fn mcp_in_process_client_server() { + // 1. In-memory application state. + let state = AppState::with_db_path(":memory:", None).expect("build in-memory AppState"); + + // 2. Duplex transport: server on one half, client on the other. + let (server_io, client_io) = tokio::io::duplex(8 * 1024); + + // 3. Spawn the MCP server. + let server_task = tokio::spawn(async move { + let running = AingleMcp::new(state) + .serve(server_io) + .await + .expect("server serve handshake"); + // Block until the client disconnects (or the task is aborted). + let _ = running.waiting().await; + }); + + // 4. Connect a bare client. The unit type `()` implements `ClientHandler` + // (hence `Service`) in rmcp 1.7. The duplex half is symmetric, + // so the client role must be named explicitly to disambiguate `serve`. + let client = ServiceExt::::serve((), client_io) + .await + .expect("client serve handshake"); + + // 5. The advertised tool set must include the core read/query tools and, + // since this is built with `dag`, the provenance history tool. + let tools = client.list_all_tools().await.expect("list_all_tools"); + let names: Vec<&str> = tools.iter().map(|t| t.name.as_ref()).collect(); + assert!( + names.contains(&"aingle_query_pattern"), + "tools missing aingle_query_pattern; got {names:?}" + ); + assert!( + names.contains(&"aingle_graph_stats"), + "tools missing aingle_graph_stats; got {names:?}" + ); + assert!( + names.contains(&"aingle_dag_history"), + "tools missing aingle_dag_history (dag feature); got {names:?}" + ); + + // 6. Call aingle_graph_stats with no arguments; must not be an error. + let stats = client + .call_tool(CallToolRequestParams::new("aingle_graph_stats")) + .await + .expect("call aingle_graph_stats"); + assert_ne!( + stats.is_error, + Some(true), + "aingle_graph_stats returned an error result: {stats:?}" + ); + + // 7. Round-trip: create a triple, then query it back by subject. + let create_args = serde_json::json!({ + "subject": "http://example.org/alice", + "predicate": "http://example.org/knows", + "object": "bob", + }) + .as_object() + .cloned() + .unwrap(); + let create = client + .call_tool(CallToolRequestParams::new("aingle_create_triple").with_arguments(create_args)) + .await + .expect("call aingle_create_triple"); + assert_ne!( + create.is_error, + Some(true), + "aingle_create_triple returned an error result: {create:?}" + ); + + let query_args = serde_json::json!({ + "subject": "http://example.org/alice", + }) + .as_object() + .cloned() + .unwrap(); + let query = client + .call_tool(CallToolRequestParams::new("aingle_query_pattern").with_arguments(query_args)) + .await + .expect("call aingle_query_pattern"); + assert_ne!( + query.is_error, + Some(true), + "aingle_query_pattern returned an error result: {query:?}" + ); + + // 8. Clean shutdown: cancel the client, abort the server task. + let _ = client.cancel().await; + server_task.abort(); +} + +/// Test B — stdout hygiene (subprocess). +/// +/// Spawns the real binary in MCP mode, feeds a single `initialize` request, and +/// asserts every non-empty stdout line is JSON. This guards the invariant that +/// stdout carries only the JSON-RPC stream while logs go to stderr. +#[tokio::test] +async fn stdout_is_clean_jsonrpc_only() { + use tokio::io::AsyncWriteExt; + + let mut child = tokio::process::Command::new(env!("CARGO_BIN_EXE_aingle-cortex")) + .args(["--mcp", "--memory"]) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .expect("spawn aingle-cortex --mcp --memory"); + + let mut stdin = child.stdin.take().expect("child stdin"); + stdin + .write_all(b"{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\",\"params\":{\"protocolVersion\":\"2024-11-05\",\"capabilities\":{},\"clientInfo\":{\"name\":\"t\",\"version\":\"0\"}}}\n") + .await + .expect("write initialize"); + // EOF on stdin signals the stdio transport to shut down. + drop(stdin); + + // Robust against a server that does not exit promptly: bound the wait and, + // on timeout, kill the child and inspect whatever stdout was captured. + let out = match tokio::time::timeout(Duration::from_secs(30), child.wait_with_output()).await { + Ok(res) => res.expect("collect child output"), + Err(_) => { + // `wait_with_output` consumed `child`; we can't kill it here, but the + // EOF on stdin should already have it shutting down. Fail loudly so a + // genuine hang is visible rather than silently passing. + panic!("aingle-cortex did not exit within 30s after stdin EOF"); + } + }; + + let stdout = String::from_utf8_lossy(&out.stdout); + for line in stdout.lines().filter(|l| !l.trim().is_empty()) { + assert!( + line.trim_start().starts_with('{'), + "non-JSON on stdout: {line}" + ); + } +}