diff --git a/Makefile.cbm b/Makefile.cbm index d821cb76..54bce507 100644 --- a/Makefile.cbm +++ b/Makefile.cbm @@ -197,7 +197,8 @@ PIPELINE_SRCS = \ src/pipeline/pass_semantic_edges.c \ src/pipeline/pass_cross_repo.c \ src/pipeline/artifact.c \ - src/pipeline/pass_pkgmap.c + src/pipeline/pass_pkgmap.c \ + src/pipeline/pass_servicelinks.c # SimHash / MinHash module SIMHASH_SRCS = src/simhash/minhash.c @@ -337,7 +338,6 @@ TEST_INTEGRATION_SRCS = tests/test_integration.c tests/test_incremental.c TEST_TRACES_SRCS = tests/test_traces.c - TEST_CLI_SRCS = tests/test_cli.c TEST_MEM_SRCS = tests/test_mem.c @@ -351,9 +351,9 @@ TEST_YAML_SRCS = tests/test_yaml.c TEST_SIMHASH_SRCS = tests/test_simhash.c TEST_STACK_OVERFLOW_SRCS = tests/test_stack_overflow.c +TEST_ENDPOINT_REGISTRY_SRCS = tests/test_endpoint_registry.c -ALL_TEST_SRCS = $(TEST_FOUNDATION_SRCS) $(TEST_EXTRACTION_SRCS) $(TEST_STORE_SRCS) $(TEST_CYPHER_SRCS) $(TEST_MCP_SRCS) $(TEST_DISCOVER_SRCS) $(TEST_GRAPH_BUFFER_SRCS) $(TEST_PIPELINE_SRCS) $(TEST_WATCHER_SRCS) $(TEST_LZ4_SRCS) $(TEST_ZSTD_SRCS) $(TEST_ARTIFACT_SRCS) $(TEST_SQLITE_WRITER_SRCS) $(TEST_GO_LSP_SRCS) $(TEST_C_LSP_SRCS) $(TEST_PHP_LSP_SRCS) $(TEST_CS_LSP_SRCS) $(TEST_CS_LSP_BENCH_SRCS) $(TEST_SCOPE_SRCS) $(TEST_TYPE_REP_SRCS) $(TEST_PY_LSP_SRCS) $(TEST_PY_LSP_BENCH_SRCS) $(TEST_PY_LSP_STRESS_SRCS) $(TEST_PY_LSP_SCALE_SRCS) $(TEST_TS_LSP_SRCS) $(TEST_TRACES_SRCS) $(TEST_CLI_SRCS) $(TEST_MEM_SRCS) $(TEST_UI_SRCS) $(TEST_SECURITY_SRCS) $(TEST_YAML_SRCS) $(TEST_SIMHASH_SRCS) $(TEST_STACK_OVERFLOW_SRCS) $(TEST_INTEGRATION_SRCS) - +ALL_TEST_SRCS = $(TEST_FOUNDATION_SRCS) $(TEST_EXTRACTION_SRCS) $(TEST_STORE_SRCS) $(TEST_CYPHER_SRCS) $(TEST_MCP_SRCS) $(TEST_DISCOVER_SRCS) $(TEST_GRAPH_BUFFER_SRCS) $(TEST_PIPELINE_SRCS) $(TEST_WATCHER_SRCS) $(TEST_LZ4_SRCS) $(TEST_ZSTD_SRCS) $(TEST_ARTIFACT_SRCS) $(TEST_SQLITE_WRITER_SRCS) $(TEST_GO_LSP_SRCS) $(TEST_C_LSP_SRCS) $(TEST_PHP_LSP_SRCS) $(TEST_CS_LSP_SRCS) $(TEST_CS_LSP_BENCH_SRCS) $(TEST_SCOPE_SRCS) $(TEST_TYPE_REP_SRCS) $(TEST_PY_LSP_SRCS) $(TEST_PY_LSP_BENCH_SRCS) $(TEST_PY_LSP_STRESS_SRCS) $(TEST_PY_LSP_SCALE_SRCS) $(TEST_TS_LSP_SRCS) $(TEST_TRACES_SRCS) $(TEST_CLI_SRCS) $(TEST_MEM_SRCS) $(TEST_UI_SRCS) $(TEST_SECURITY_SRCS) $(TEST_YAML_SRCS) $(TEST_SIMHASH_SRCS) $(TEST_STACK_OVERFLOW_SRCS) $(TEST_INTEGRATION_SRCS) $(TEST_ENDPOINT_REGISTRY_SRCS) # ── Build directories ──────────────────────────────────────────── diff --git a/src/mcp/mcp.c b/src/mcp/mcp.c index 03e33d58..16f29050 100644 --- a/src/mcp/mcp.c +++ b/src/mcp/mcp.c @@ -417,6 +417,13 @@ static const tool_def_t TOOLS[] = { "{\"type\":\"object\",\"properties\":{\"traces\":{\"type\":\"array\",\"items\":{\"type\":" "\"object\"}},\"project\":{\"type\":" "\"string\"}},\"required\":[\"traces\",\"project\"]}"}, + + {"cross_project_links", "Discover cross-project protocol communication links between indexed projects", + "{\"type\":\"object\",\"properties\":{" + "\"protocol\":{\"type\":\"string\",\"description\":\"Filter by protocol (graphql, grpc, kafka, etc.)\"}," + "\"project\":{\"type\":\"string\",\"description\":\"Filter by project name (matches producer or consumer)\"}," + "\"identifier\":{\"type\":\"string\",\"description\":\"Filter by identifier (topic name, operation, etc.)\"}" + "}}"}, }; static const int TOOL_COUNT = sizeof(TOOLS) / sizeof(TOOLS[0]); @@ -3968,6 +3975,174 @@ static char *handle_ingest_traces(cbm_mcp_server_t *srv, const char *args) { return result; } +/* ── Cross-project links tool ────────────────────────────────── */ + +static char *handle_cross_project_links(cbm_mcp_server_t *srv, const char *args) { + (void)srv; + + /* Parse optional filters */ + char protocol[64] = {0}; + char project[256] = {0}; + char identifier[256] = {0}; + + if (args) { + yyjson_doc *doc = yyjson_read(args, strlen(args), 0); + if (doc) { + yyjson_val *root = yyjson_doc_get_root(doc); + yyjson_val *v; + v = yyjson_obj_get(root, "protocol"); + if (v && yyjson_is_str(v)) + snprintf(protocol, sizeof(protocol), "%s", yyjson_get_str(v)); + v = yyjson_obj_get(root, "project"); + if (v && yyjson_is_str(v)) + snprintf(project, sizeof(project), "%s", yyjson_get_str(v)); + v = yyjson_obj_get(root, "identifier"); + if (v && yyjson_is_str(v)) + snprintf(identifier, sizeof(identifier), "%s", yyjson_get_str(v)); + yyjson_doc_free(doc); + } + } + + /* Open _crosslinks.db */ + const char *cache_dir = cbm_resolve_cache_dir(); + if (!cache_dir) { + return cbm_mcp_text_result("Cache directory not found.", true); + } + + char db_path[1024]; + snprintf(db_path, sizeof(db_path), "%s/_crosslinks.db", cache_dir); + + sqlite3 *db = NULL; + if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, NULL) != SQLITE_OK) { + if (db) sqlite3_close(db); + return cbm_mcp_text_result( + "No cross-project links found. Index at least 2 projects first.", false); + } + + /* Build query with optional filters (using parameterized queries for safety) */ + char sql[1024]; + char where[512] = {0}; + int wlen = 0; + + if (protocol[0]) { + wlen += snprintf(where + wlen, sizeof(where) - (size_t)wlen, + "%sprotocol = ?", wlen ? " AND " : ""); + } + if (project[0]) { + wlen += snprintf(where + wlen, sizeof(where) - (size_t)wlen, + "%s(producer_project = ? OR consumer_project = ?)", + wlen ? " AND " : ""); + } + if (identifier[0]) { + wlen += snprintf(where + wlen, sizeof(where) - (size_t)wlen, + "%sidentifier = ?", wlen ? " AND " : ""); + } + + if (wlen > 0) { + snprintf(sql, sizeof(sql), + "SELECT protocol, identifier, producer_project, producer_qn, producer_file, " + "consumer_project, consumer_qn, consumer_file, confidence " + "FROM cross_links WHERE %s ORDER BY protocol, identifier, confidence DESC;", where); + } else { + snprintf(sql, sizeof(sql), + "SELECT protocol, identifier, producer_project, producer_qn, producer_file, " + "consumer_project, consumer_qn, consumer_file, confidence " + "FROM cross_links ORDER BY protocol, identifier, confidence DESC;"); + } + + sqlite3_stmt *stmt = NULL; + if (sqlite3_prepare_v2(db, sql, -1, &stmt, NULL) != SQLITE_OK) { + sqlite3_close(db); + return cbm_mcp_text_result("Failed to query cross-project links.", true); + } + + /* Bind parameters */ + int bind_idx = 1; + if (protocol[0]) { + sqlite3_bind_text(stmt, bind_idx++, protocol, -1, SQLITE_STATIC); + } + if (project[0]) { + sqlite3_bind_text(stmt, bind_idx++, project, -1, SQLITE_STATIC); + sqlite3_bind_text(stmt, bind_idx++, project, -1, SQLITE_STATIC); + } + if (identifier[0]) { + sqlite3_bind_text(stmt, bind_idx++, identifier, -1, SQLITE_STATIC); + } + + /* Format output — reserve 128 bytes at start for header (filled after loop) */ + enum { XL_HDR_RESERVE = 128 }; + int buf_cap = 65536; + char *buf = malloc((size_t)buf_cap); + if (!buf) { sqlite3_finalize(stmt); sqlite3_close(db); + return cbm_mcp_text_result("alloc failed", true); } + int pos = XL_HDR_RESERVE; /* start writing after header reservation */ + int total = 0; + char cur_protocol[64] = {0}; + int proto_count = 0; + + while (sqlite3_step(stmt) == SQLITE_ROW) { + const char *proto = (const char *)sqlite3_column_text(stmt, 0); + const char *ident = (const char *)sqlite3_column_text(stmt, 1); + const char *pprod = (const char *)sqlite3_column_text(stmt, MCP_COL_2); + const char *qprod = (const char *)sqlite3_column_text(stmt, MCP_COL_3); + const char *fprod = (const char *)sqlite3_column_text(stmt, MCP_COL_4); + const char *pcons = (const char *)sqlite3_column_text(stmt, 5); + const char *qcons = (const char *)sqlite3_column_text(stmt, 6); + const char *fcons = (const char *)sqlite3_column_text(stmt, MCP_COL_7); + double conf = sqlite3_column_double(stmt, 8); + + /* Grow buffer if needed (each entry is ~300 bytes max) */ + if (pos + 512 > buf_cap) { + int new_cap = buf_cap * 2; + char *new_buf = realloc(buf, (size_t)new_cap); + if (!new_buf) break; /* return what we have so far */ + buf = new_buf; + buf_cap = new_cap; + } + + /* Protocol header */ + if (strcmp(cur_protocol, proto ? proto : "") != 0) { + if (proto_count > 0) { + pos += snprintf(buf + pos, (size_t)(buf_cap - pos), "\n"); + } + snprintf(cur_protocol, sizeof(cur_protocol), "%s", proto ? proto : ""); + pos += snprintf(buf + pos, (size_t)(buf_cap - pos), "## %s\n\n", proto); + proto_count++; + } + + pos += snprintf(buf + pos, (size_t)(buf_cap - pos), + "%s (confidence: %.2f)\n" + " producer: %s :: %s (%s)\n" + " consumer: %s :: %s (%s)\n\n", + ident ? ident : "", conf, + pprod ? pprod : "", qprod ? qprod : "", fprod ? fprod : "", + pcons ? pcons : "", qcons ? qcons : "", fcons ? fcons : ""); + total++; + } + + sqlite3_finalize(stmt); + sqlite3_close(db); + + if (total == 0) { + free(buf); + return cbm_mcp_text_result( + "No cross-project links found. Index at least 2 projects first.", false); + } + + /* Fill header in the reserved space, then shift content to close the gap */ + char header[XL_HDR_RESERVE]; + int hlen = snprintf(header, sizeof(header), "# Cross-Project Links (%d total)\n\n", total); + int gap = XL_HDR_RESERVE - hlen; + memmove(buf + hlen, buf + XL_HDR_RESERVE, (size_t)(pos - XL_HDR_RESERVE) + 1); + memcpy(buf, header, (size_t)hlen); + pos -= gap; + buf[pos] = '\0'; + + char *result = cbm_mcp_text_result(buf, false); + free(buf); + return result; +} + /* ── Tool dispatch ────────────────────────────────────────────── */ char *cbm_mcp_handle_tool(cbm_mcp_server_t *srv, const char *tool_name, const char *args_json) { @@ -4019,6 +4194,9 @@ char *cbm_mcp_handle_tool(cbm_mcp_server_t *srv, const char *tool_name, const ch if (strcmp(tool_name, "ingest_traces") == 0) { return handle_ingest_traces(srv, args_json); } + if (strcmp(tool_name, "cross_project_links") == 0) { + return handle_cross_project_links(srv, args_json); + } char msg[CBM_SZ_256]; snprintf(msg, sizeof(msg), "unknown tool: %s", tool_name); return cbm_mcp_text_result(msg, true); diff --git a/src/pipeline/pass_servicelinks.c b/src/pipeline/pass_servicelinks.c new file mode 100644 index 00000000..14f807c0 --- /dev/null +++ b/src/pipeline/pass_servicelinks.c @@ -0,0 +1,188 @@ +/* + * pass_servicelinks.c — Pipeline pass that orchestrates all cross-service protocol linkers. + * + * Called after pass_httplinks. Runs each protocol linker sequentially. + * Individual linker failures are logged but don't stop execution. + */ +#include "servicelink.h" +#include "foundation/log.h" +#include "foundation/compat.h" +#include "foundation/yaml.h" +#include +#include +#include + +/* ── Format int to string for logging ───────────────────────── */ + +static const char *itoa_sl(int val) { + static CBM_TLS char bufs[4][32]; + static CBM_TLS int idx = 0; + int i = idx; + idx = (idx + 1) & 3; + snprintf(bufs[i], sizeof(bufs[i]), "%d", val); + return bufs[i]; +} + +/* ── Edge type array (declared extern in servicelink.h) ─────── */ + +const char *SL_ALL_EDGE_TYPES[] = { + SL_EDGE_GRAPHQL, SL_EDGE_GRPC, SL_EDGE_KAFKA, SL_EDGE_SQS, + SL_EDGE_SNS, SL_EDGE_PUBSUB, SL_EDGE_WS, SL_EDGE_SSE, + SL_EDGE_AMQP, SL_EDGE_MQTT, SL_EDGE_NATS, SL_EDGE_REDIS_PS, + SL_EDGE_TRPC, SL_EDGE_EVBRIDGE +}; + +/* Protocol keys for YAML config lookup — indexed same as LINKERS[] */ +const char *SL_PROTOCOL_KEYS[] = { + "graphql", "grpc", "kafka", "sqs", "sns", "pubsub", + "ws", "sse", "rabbitmq", "mqtt", "nats", "redis_pubsub", + "trpc", "eventbridge" +}; + +/* ── Config functions ──────────────────────────────────────────── */ + +cbm_sl_config_t cbm_sl_default_config(void) { + cbm_sl_config_t cfg; + cfg.enabled = -1; /* use default = true */ + for (int i = 0; i < SL_EDGE_TYPE_COUNT; i++) { + cfg.protocols[i].enabled = -1; + cfg.protocols[i].min_confidence = -1.0; + } + return cfg; +} + +cbm_sl_config_t cbm_sl_load_config(const char *dir) { + cbm_sl_config_t cfg = cbm_sl_default_config(); + if (!dir) return cfg; + + /* Read .cgrconfig — follow exact pattern from httplink.c:1602 */ + char path[1024]; + int n = snprintf(path, sizeof(path), "%s/.cgrconfig", dir); + if (n <= 0 || (size_t)n >= sizeof(path)) return cfg; + + FILE *f = fopen(path, "r"); + if (!f) return cfg; + + (void)fseek(f, 0, SEEK_END); + long size = ftell(f); + (void)fseek(f, 0, SEEK_SET); + if (size <= 0 || size > (long)1024 * 1024) { (void)fclose(f); return cfg; } + + char *buf = malloc((size_t)size + 1); + if (!buf) { (void)fclose(f); return cfg; } + size_t nread = fread(buf, 1, (size_t)size, f); + (void)fclose(f); + // NOLINTNEXTLINE(clang-analyzer-security.ArrayBound) + buf[nread] = '\0'; + + cbm_yaml_node_t *root = cbm_yaml_parse(buf, (int)nread); + free(buf); + if (!root) return cfg; + + /* Top-level enabled */ + if (cbm_yaml_has(root, "service_linker.enabled")) { + cfg.enabled = cbm_yaml_get_bool(root, "service_linker.enabled", true) ? 1 : 0; + } + + /* Per-protocol settings */ + for (int i = 0; i < SL_EDGE_TYPE_COUNT; i++) { + char key[128]; + snprintf(key, sizeof(key), "service_linker.%s.enabled", SL_PROTOCOL_KEYS[i]); + if (cbm_yaml_has(root, key)) { + cfg.protocols[i].enabled = cbm_yaml_get_bool(root, key, true) ? 1 : 0; + } + snprintf(key, sizeof(key), "service_linker.%s.min_confidence", SL_PROTOCOL_KEYS[i]); + if (cbm_yaml_has(root, key)) { + cfg.protocols[i].min_confidence = cbm_yaml_get_float(root, key, -1.0); + } + } + + cbm_yaml_free(root); + return cfg; +} + +bool cbm_sl_protocol_enabled(const cbm_sl_config_t *cfg, int protocol_index) { + if (!cfg) return true; + if (cfg->enabled == 0) return false; /* globally disabled */ + if (protocol_index < 0 || protocol_index >= SL_EDGE_TYPE_COUNT) return true; + if (cfg->protocols[protocol_index].enabled == 0) return false; + return true; +} + +double cbm_sl_effective_min_confidence(const cbm_sl_config_t *cfg, int protocol_index) { + if (!cfg) return SL_MIN_CONFIDENCE; + if (protocol_index >= 0 && protocol_index < SL_EDGE_TYPE_COUNT) { + if (cfg->protocols[protocol_index].min_confidence >= 0.0) { + return cfg->protocols[protocol_index].min_confidence; + } + } + return SL_MIN_CONFIDENCE; +} + +/* ── Cleanup stale edges from previous runs ─────────────────── */ + +static void cleanup_stale_edges(cbm_pipeline_ctx_t *ctx) { + for (int i = 0; i < SL_EDGE_TYPE_COUNT; i++) { + cbm_gbuf_delete_edges_by_type(ctx->gbuf, SL_ALL_EDGE_TYPES[i]); + } +} + +/* ── Linker dispatch table ──────────────────────────────────── */ + +typedef int (*cbm_sl_linker_fn)(cbm_pipeline_ctx_t *ctx); + +typedef struct { + const char *name; + cbm_sl_linker_fn fn; +} cbm_sl_linker_entry_t; + +static const cbm_sl_linker_entry_t LINKERS[] = { + { NULL, NULL } /* protocol linkers added in subsequent PRs */ +}; +#define LINKER_COUNT ((int)(sizeof(LINKERS) / sizeof(LINKERS[0])) - 1) + +/* ── Main pass entry point ──────────────────────────────────── */ + +int cbm_pipeline_pass_servicelinks(cbm_pipeline_ctx_t *ctx) { + cbm_log_info("pass.servicelinks.start", "linkers", itoa_sl(LINKER_COUNT)); + + /* Step 0: Load config */ + cbm_sl_config_t cfg = cbm_sl_load_config(ctx->repo_path); + + if (cfg.enabled == 0) { + cbm_log_info("pass.servicelinks.skip", "reason", "disabled"); + return 0; + } + + /* Step 1: Clean stale edges */ + cleanup_stale_edges(ctx); + + /* Step 2: Run each linker */ + int total_links = 0; + int errors = 0; + + for (int i = 0; i < LINKER_COUNT; i++) { + if (!cbm_sl_protocol_enabled(&cfg, i)) { + cbm_log_info("servicelink.skip", "name", LINKERS[i].name, + "reason", "disabled"); + continue; + } + cbm_log_info("servicelink.run", "name", LINKERS[i].name); + int rc = LINKERS[i].fn(ctx); + if (rc < 0) { + cbm_log_warn("servicelink.error", "name", LINKERS[i].name, + "rc", itoa_sl(rc)); + errors++; + } else { + total_links += rc; + cbm_log_info("servicelink.done", "name", LINKERS[i].name, + "links", itoa_sl(rc)); + } + } + + cbm_log_info("pass.servicelinks.done", "total_links", itoa_sl(total_links), + "errors", itoa_sl(errors)); + + /* Return 0 unless ALL linkers failed */ + return (LINKER_COUNT > 0 && errors == LINKER_COUNT) ? -1 : 0; +} diff --git a/src/pipeline/pipeline.c b/src/pipeline/pipeline.c index 396e59bf..2a9b445e 100644 --- a/src/pipeline/pipeline.c +++ b/src/pipeline/pipeline.c @@ -30,6 +30,7 @@ enum { CBM_DIR_PERMS = 0755, PL_RING = 4, PL_RING_MASK = 3, PL_SEQ_PASSES = 6, P #include "foundation/compat.h" #include "foundation/compat_thread.h" #include "foundation/profile.h" +#include "pipeline/servicelink.h" #include #include @@ -795,6 +796,18 @@ static int run_post_extraction(cbm_pipeline_t *p, cbm_pipeline_ctx_t *ctx, return rc; } + /* Cross-service protocol linking (GraphQL, gRPC, Kafka, etc.) */ + if (!check_cancel(p)) { + struct timespec t; + cbm_clock_gettime(CLOCK_MONOTONIC, &t); + int sl_rc = cbm_pipeline_pass_servicelinks(ctx); + if (sl_rc < 0) { + cbm_log_warn("pass.servicelinks.error", "rc", itoa_buf(sl_rc)); + } + cbm_log_info("pass.timing", "pass", "servicelinks", "elapsed_ms", + itoa_buf((int)elapsed_ms(t))); + } + CBM_PROF_START(t_predump); run_predump_passes(p, ctx); CBM_PROF_END("pipeline", "3_predump_passes_total", t_predump); @@ -805,6 +818,7 @@ static int run_post_extraction(cbm_pipeline_t *p, cbm_pipeline_ctx_t *ctx, rc = dump_and_persist_hashes(p, files, file_count, &t); CBM_PROF_END("pipeline", "4_dump_and_persist", t_dump); } + return rc; } @@ -909,6 +923,7 @@ int cbm_pipeline_run(cbm_pipeline_t *p) { goto cleanup; } + cbm_log_info("pipeline.done", "nodes", itoa_buf(cbm_gbuf_node_count(p->gbuf)), "edges", itoa_buf(cbm_gbuf_edge_count(p->gbuf)), "elapsed_ms", itoa_buf((int)elapsed_ms(t0))); diff --git a/src/pipeline/pipeline_internal.h b/src/pipeline/pipeline_internal.h index 85ef942b..76e8a355 100644 --- a/src/pipeline/pipeline_internal.h +++ b/src/pipeline/pipeline_internal.h @@ -16,6 +16,9 @@ #include "cbm.h" #include +/* Forward declaration for cross-repo endpoint registry (full type in servicelink.h) */ +struct cbm_sl_endpoint_list_t; + /* ── Shared pipeline constants ─────────────────────────────────── */ /* Maximum byte budget for tree-sitter extraction per file */ @@ -68,6 +71,8 @@ typedef struct { * configs are an easy follow-on). NULL when no usable configs were found. * Owned by pipeline.c / pipeline_incremental.c. */ const cbm_path_alias_collection_t *path_aliases; + + struct cbm_sl_endpoint_list_t *endpoints; /* collected across all linkers, owned by pipeline */ } cbm_pipeline_ctx_t; /* Get the current pipeline's package map (NULL if none). */ @@ -420,6 +425,8 @@ int cbm_pipeline_githistory_compute(const char *repo_path, cbm_githistory_result /* Apply pre-computed couplings to the graph buffer (main thread only). */ int cbm_pipeline_githistory_apply(cbm_pipeline_ctx_t *ctx, const cbm_githistory_result_t *result); +int cbm_pipeline_pass_servicelinks(cbm_pipeline_ctx_t *ctx); + /* Pre-dump pass: decorator tags enrichment (operates on gbuf). */ int cbm_pipeline_pass_decorator_tags(cbm_gbuf_t *gbuf, const char *project); diff --git a/src/pipeline/servicelink.h b/src/pipeline/servicelink.h new file mode 100644 index 00000000..4c148e32 --- /dev/null +++ b/src/pipeline/servicelink.h @@ -0,0 +1,373 @@ +/* + * servicelink.h — Shared types and declarations for cross-service protocol linking. + * + * Each protocol linker discovers producers/consumers in source code and creates + * typed edges (GRAPHQL_CALLS, KAFKA_CALLS, etc.) in the graph buffer. + */ +#ifndef CBM_SERVICELINK_H +#define CBM_SERVICELINK_H + +#include "pipeline_internal.h" +#include "pipeline.h" /* cbm_confidence_band */ +#include "foundation/compat_regex.h" /* portable regex: cbm_regex_t, cbm_regcomp, etc. */ +#include "foundation/log.h" /* cbm_log_info, cbm_log_warn, cbm_log_error */ +#include "foundation/platform.h" /* safe_realloc */ + +#include +#include +#include +#include +#include + +/* ── Buffer limits ──────────────────────────────────────────── */ +#define SL_MAX_PRODUCERS 8192 +#define SL_MAX_CONSUMERS 8192 +#define SL_MAX_PER_NODE 64 /* max discoveries per single function node */ +#define SL_MIN_CONFIDENCE 0.25 /* minimum confidence to create an edge */ + +/* ── Edge type constants ────────────────────────────────────── */ +#define SL_EDGE_GRAPHQL "GRAPHQL_CALLS" +#define SL_EDGE_GRPC "GRPC_CALLS" +#define SL_EDGE_KAFKA "KAFKA_CALLS" +#define SL_EDGE_SQS "SQS_CALLS" +#define SL_EDGE_SNS "SNS_CALLS" +#define SL_EDGE_PUBSUB "PUBSUB_CALLS" +#define SL_EDGE_WS "WS_CALLS" +#define SL_EDGE_SSE "SSE_CALLS" +#define SL_EDGE_AMQP "AMQP_CALLS" +#define SL_EDGE_MQTT "MQTT_CALLS" +#define SL_EDGE_NATS "NATS_CALLS" +#define SL_EDGE_REDIS_PS "REDIS_PUBSUB_CALLS" +#define SL_EDGE_TRPC "TRPC_CALLS" +#define SL_EDGE_EVBRIDGE "EVENTBRIDGE_CALLS" + +/* ── All edge types for cleanup (defined in pass_servicelinks.c) ── */ +extern const char *SL_ALL_EDGE_TYPES[]; +#define SL_EDGE_TYPE_COUNT 14 + +/* ── Generic producer/consumer structs ──────────────────────── */ + +typedef struct { + char identifier[256]; /* topic, subject, channel, operation, procedure */ + char source_qn[512]; /* qualified name of producing function */ + int64_t source_id; /* gbuf node ID */ + char file_path[256]; /* file where discovered */ + char extra[256]; /* protocol-specific: method, exchange, qos, etc. */ +} cbm_sl_producer_t; + +typedef struct { + char identifier[256]; /* topic, subject, channel, operation, procedure */ + char handler_qn[512]; /* qualified name of consuming function */ + int64_t handler_id; /* gbuf node ID */ + char file_path[256]; /* file where discovered */ + char extra[256]; /* protocol-specific metadata */ +} cbm_sl_consumer_t; + +/* ── Linker result ──────────────────────────────────────────── */ + +typedef struct { + const char *name; /* protocol name for logging */ + int links_created; + int producers_found; + int consumers_found; +} cbm_sl_result_t; + +/* ── Helper: read source lines from disk ───────────────────── */ + +static inline char *sl_read_source_lines(const char *root_dir, const char *rel_path, + int start_line, int end_line) { + char full_path[2048]; + snprintf(full_path, sizeof(full_path), "%s/%s", root_dir, rel_path); + + FILE *f = fopen(full_path, "r"); + if (!f) { + return NULL; + } + + char *result = NULL; + int result_len = 0; + int result_cap = 0; + int line = 0; + char line_buf[4096]; + + while (fgets(line_buf, sizeof(line_buf), f)) { + line++; + if (line < start_line) { + continue; + } + if (line > end_line) { + break; + } + + int llen = (int)strlen(line_buf); + if (llen > 0 && line_buf[llen - 1] == '\n') { + line_buf[--llen] = '\0'; + } + + if (result_len > 0) { + if (result_len + 1 >= result_cap) { + result_cap = (result_cap == 0) ? 1024 : result_cap * 2; + result = safe_realloc(result, (size_t)result_cap); + } + result[result_len++] = '\n'; + } + + if (result_len + llen >= result_cap) { + result_cap = result_len + llen + 256; + result = safe_realloc(result, (size_t)result_cap); + } + memcpy(result + result_len, line_buf, (size_t)llen); + result_len += llen; + } + + (void)fclose(f); + if (result) { + result[result_len] = '\0'; + } + return result; +} + +static inline char *sl_read_node_source(const cbm_pipeline_ctx_t *ctx, + const cbm_gbuf_node_t *node) { + return sl_read_source_lines(ctx->repo_path, node->file_path, + node->start_line, node->end_line); +} + +/* ── Helper: normalized Levenshtein similarity (0.0–1.0) ───── */ + +static inline double cbm_normalized_levenshtein(const char *a, const char *b) { + if (strcmp(a, b) == 0) { + return 1.0; + } + int la = (int)strlen(a); + int lb = (int)strlen(b); + int max_len = la > lb ? la : lb; + if (max_len == 0) { + return 1.0; + } + + /* Compute Levenshtein distance with two-row DP */ + int *prev = (int *)calloc((size_t)(lb + 1), sizeof(int)); + int *curr = (int *)calloc((size_t)(lb + 1), sizeof(int)); + if (!prev || !curr) { + free(prev); + free(curr); + return 0.0; + } + for (int j = 0; j <= lb; j++) { + prev[j] = j; + } + for (int i = 1; i <= la; i++) { + curr[0] = i; + for (int j = 1; j <= lb; j++) { + int cost = (a[i - 1] == b[j - 1]) ? 0 : 1; + int del = prev[j] + 1; + int ins = curr[j - 1] + 1; + int sub = prev[j - 1] + cost; + curr[j] = del < ins ? (del < sub ? del : sub) : (ins < sub ? ins : sub); + } + int *tmp = prev; + prev = curr; + curr = tmp; + } + int dist = prev[lb]; + free(prev); + free(curr); + return 1.0 - ((double)dist / (double)max_len); +} + +/* ── Helper: path match score for WS/SSE endpoint matching ─── */ + +static inline double cbm_path_match_score(const char *call_path, const char *route_path) { + if (!call_path || !route_path || !*call_path || !*route_path) { + return 0.0; + } + + /* Normalize: lowercase + strip trailing slash */ + char a[1024]; + char b[1024]; + int i; + for (i = 0; call_path[i] && i < 1022; i++) { + a[i] = (call_path[i] >= 'A' && call_path[i] <= 'Z') + ? (char)(call_path[i] + 32) + : call_path[i]; + } + a[i] = '\0'; + if (i > 1 && a[i - 1] == '/') { + a[i - 1] = '\0'; + } + + for (i = 0; route_path[i] && i < 1022; i++) { + b[i] = (route_path[i] >= 'A' && route_path[i] <= 'Z') + ? (char)(route_path[i] + 32) + : route_path[i]; + } + b[i] = '\0'; + if (i > 1 && b[i - 1] == '/') { + b[i - 1] = '\0'; + } + + if (strcmp(a, b) == 0) { + return 0.95; + } + + /* Suffix match */ + int la = (int)strlen(a); + int lb = (int)strlen(b); + if (la > lb && strcmp(a + la - lb, b) == 0) { + return 0.80; + } + if (lb > la && strcmp(b + lb - la, a) == 0) { + return 0.80; + } + + /* Fuzzy: normalized Levenshtein on path */ + double sim = cbm_normalized_levenshtein(a, b); + if (sim >= 0.75) { + return 0.65 * sim; + } + + return 0.0; +} + +/* ── Helper: get file extension ─────────────────────────────── */ + +static inline const char *sl_file_ext(const char *path) { + const char *dot = strrchr(path, '.'); + return dot ? dot : ""; +} + +/* ── Helper: insert edge with standard props ────────────────── */ + +static inline int64_t sl_insert_edge(cbm_pipeline_ctx_t *ctx, + int64_t src_id, int64_t tgt_id, const char *edge_type, + const char *identifier, double confidence, const char *extra_json) +{ + char props[512]; + if (extra_json && extra_json[0]) { + snprintf(props, sizeof(props), + "{\"identifier\":\"%s\",\"confidence\":%.3f,\"confidence_band\":\"%s\",%s}", + identifier, confidence, cbm_confidence_band(confidence), extra_json); + } else { + snprintf(props, sizeof(props), + "{\"identifier\":\"%s\",\"confidence\":%.3f,\"confidence_band\":\"%s\"}", + identifier, confidence, cbm_confidence_band(confidence)); + } + return cbm_gbuf_insert_edge(ctx->gbuf, src_id, tgt_id, edge_type, props); +} + +/* ── Per-protocol linker entry points ───────────────────────── */ + +int cbm_servicelink_graphql(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_grpc(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_kafka(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_sqs(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_sns(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_pubsub(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_ws(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_sse(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_rabbitmq(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_mqtt(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_nats(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_redis_pubsub(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_trpc(cbm_pipeline_ctx_t *ctx); +int cbm_servicelink_eventbridge(cbm_pipeline_ctx_t *ctx); + +/* ── Service linker configuration ──────────────────────────────── */ + +/* Per-protocol config */ +typedef struct { + int enabled; /* -1 = use default (true), 0 = disabled, 1 = enabled */ + double min_confidence; /* -1.0 = use default (SL_MIN_CONFIDENCE) */ +} cbm_sl_protocol_config_t; + +/* Full service linker config */ +typedef struct { + int enabled; /* -1 = use default (true), 0 = disabled, 1 = enabled */ + cbm_sl_protocol_config_t protocols[SL_EDGE_TYPE_COUNT]; /* indexed same as LINKERS[] */ +} cbm_sl_config_t; + +/* Protocol name keys for YAML lookup (indexed same as LINKERS[]) */ +extern const char *SL_PROTOCOL_KEYS[]; + +/* Return default config (all sentinel values = use defaults). */ +cbm_sl_config_t cbm_sl_default_config(void); + +/* Load config from .cgrconfig in the given directory. */ +cbm_sl_config_t cbm_sl_load_config(const char *dir); + +/* Check if a protocol is enabled. */ +bool cbm_sl_protocol_enabled(const cbm_sl_config_t *cfg, int protocol_index); + +/* Get effective min_confidence for a protocol. */ +double cbm_sl_effective_min_confidence(const cbm_sl_config_t *cfg, int protocol_index); + +/* ── Cross-repo endpoint registry ──────────────────────────────── */ + +typedef struct { + char project[256]; + char protocol[32]; /* "graphql", "kafka", "pubsub", etc. */ + char role[16]; /* "producer" or "consumer" */ + char identifier[256]; /* topic name, operation name, etc. */ + char node_qn[512]; /* function qualified name */ + char file_path[256]; /* relative file path */ + char extra[256]; /* protocol-specific metadata (JSON) */ +} cbm_sl_endpoint_t; + +typedef struct cbm_sl_endpoint_list_t { + cbm_sl_endpoint_t *items; + int count; + int capacity; +} cbm_sl_endpoint_list_t; + +#define SL_ENDPOINT_INITIAL_CAP 256 + +static inline cbm_sl_endpoint_list_t *cbm_sl_endpoint_list_new(void) { + cbm_sl_endpoint_list_t *list = calloc(1, sizeof(cbm_sl_endpoint_list_t)); + if (!list) return NULL; + list->items = calloc(SL_ENDPOINT_INITIAL_CAP, sizeof(cbm_sl_endpoint_t)); + if (!list->items) { free(list); return NULL; } + list->capacity = SL_ENDPOINT_INITIAL_CAP; + list->count = 0; + return list; +} + +static inline void cbm_sl_endpoint_list_free(cbm_sl_endpoint_list_t *list) { + if (!list) return; + free(list->items); + free(list); +} + +static inline void sl_register_endpoint(cbm_sl_endpoint_list_t *list, + const char *project, const char *protocol, + const char *role, const char *identifier, + const char *node_qn, const char *file_path, + const char *extra) { + if (!list) return; + if (!identifier || !identifier[0]) return; + if (list->count >= list->capacity) { + int new_cap = list->capacity * 2; + cbm_sl_endpoint_t *new_items = safe_realloc(list->items, + (size_t)new_cap * sizeof(cbm_sl_endpoint_t)); + if (!new_items) return; + list->items = new_items; + list->capacity = new_cap; + } + cbm_sl_endpoint_t *ep = &list->items[list->count]; + memset(ep, 0, sizeof(*ep)); + if (project) snprintf(ep->project, sizeof(ep->project), "%s", project); + if (protocol) snprintf(ep->protocol, sizeof(ep->protocol), "%s", protocol); + if (role) snprintf(ep->role, sizeof(ep->role), "%s", role); + if (identifier) snprintf(ep->identifier, sizeof(ep->identifier), "%s", identifier); + if (node_qn) snprintf(ep->node_qn, sizeof(ep->node_qn), "%s", node_qn); + if (file_path) snprintf(ep->file_path, sizeof(ep->file_path), "%s", file_path); + if (extra) snprintf(ep->extra, sizeof(ep->extra), "%s", extra); + list->count++; +} + +/* Forward declarations — implemented in pass_crossrepolinks.c */ +int cbm_persist_endpoints(const char *db_path, const char *project, + const cbm_sl_endpoint_list_t *endpoints); +int cbm_cross_project_link(const char *cache_dir); + +#endif /* CBM_SERVICELINK_H */ diff --git a/src/store/store.c b/src/store/store.c index 30fddad8..8c9d5380 100644 --- a/src/store/store.c +++ b/src/store/store.c @@ -256,6 +256,17 @@ static int init_schema(cbm_store_t *s) { " source_hash TEXT NOT NULL," " created_at TEXT NOT NULL," " updated_at TEXT NOT NULL" + ");" + "CREATE TABLE IF NOT EXISTS protocol_endpoints (" + " id INTEGER PRIMARY KEY AUTOINCREMENT," + " project TEXT NOT NULL," + " protocol TEXT NOT NULL," + " role TEXT NOT NULL," + " identifier TEXT NOT NULL," + " node_qn TEXT NOT NULL," + " file_path TEXT NOT NULL," + " extra TEXT DEFAULT '{}'," + " UNIQUE(project, protocol, role, identifier, node_qn)" ");"; int rc = exec_sql(s, ddl); diff --git a/tests/test_endpoint_registry.c b/tests/test_endpoint_registry.c new file mode 100644 index 00000000..1eb3f141 --- /dev/null +++ b/tests/test_endpoint_registry.c @@ -0,0 +1,116 @@ +/* + * test_endpoint_registry.c — Tests for cross-repo endpoint registry types and helpers. + * + * Tests cover: + * - Endpoint list creation and free (including NULL-safety) + * - Registering endpoints and verifying all fields + * - Auto-growing beyond initial capacity + * - Skipping empty/NULL identifiers + */ +#include "../src/foundation/compat.h" +#include "test_framework.h" +#include +#include + +/* ── Tests ──────────────────────────────────────────────────────── */ + +TEST(endpoint_list_create_and_free) { + cbm_sl_endpoint_list_t *list = cbm_sl_endpoint_list_new(); + ASSERT_NOT_NULL(list); + ASSERT_EQ(list->count, 0); + ASSERT_EQ(list->capacity, SL_ENDPOINT_INITIAL_CAP); + cbm_sl_endpoint_list_free(list); + /* Free NULL should not crash */ + cbm_sl_endpoint_list_free(NULL); + PASS(); +} + +TEST(endpoint_list_register_and_count) { + cbm_sl_endpoint_list_t *list = cbm_sl_endpoint_list_new(); + ASSERT_NOT_NULL(list); + + sl_register_endpoint(list, "myproject", "graphql", "producer", + "getUser", "resolvers.UserResolver.getUser", + "src/resolvers/user.ts", "{\"kind\":\"query\"}"); + + sl_register_endpoint(list, "myproject", "graphql", "consumer", + "getUser", "hooks.useGetUser", + "src/hooks/user.ts", ""); + + sl_register_endpoint(list, "myproject", "kafka", "producer", + "user.created", "services.UserService.create", + "src/services/user.ts", "{\"topic\":\"user.created\"}"); + + ASSERT_EQ(list->count, 3); + + /* Verify first endpoint fields */ + ASSERT_STR_EQ(list->items[0].project, "myproject"); + ASSERT_STR_EQ(list->items[0].protocol, "graphql"); + ASSERT_STR_EQ(list->items[0].role, "producer"); + ASSERT_STR_EQ(list->items[0].identifier, "getUser"); + ASSERT_STR_EQ(list->items[0].node_qn, "resolvers.UserResolver.getUser"); + ASSERT_STR_EQ(list->items[0].file_path, "src/resolvers/user.ts"); + ASSERT_STR_EQ(list->items[0].extra, "{\"kind\":\"query\"}"); + + /* Verify second endpoint */ + ASSERT_STR_EQ(list->items[1].role, "consumer"); + ASSERT_STR_EQ(list->items[1].node_qn, "hooks.useGetUser"); + + /* Verify third endpoint */ + ASSERT_STR_EQ(list->items[2].protocol, "kafka"); + ASSERT_STR_EQ(list->items[2].identifier, "user.created"); + + cbm_sl_endpoint_list_free(list); + PASS(); +} + +TEST(endpoint_list_grows_beyond_initial_capacity) { + cbm_sl_endpoint_list_t *list = cbm_sl_endpoint_list_new(); + ASSERT_NOT_NULL(list); + + /* Register more than SL_ENDPOINT_INITIAL_CAP (256) endpoints */ + for (int i = 0; i < 300; i++) { + char ident[64]; + snprintf(ident, sizeof(ident), "topic_%d", i); + sl_register_endpoint(list, "proj", "kafka", "producer", + ident, "fn", "file.ts", ""); + } + + ASSERT_EQ(list->count, 300); + ASSERT_GTE(list->capacity, 300); + + /* Verify first and last entries survived realloc */ + ASSERT_STR_EQ(list->items[0].identifier, "topic_0"); + ASSERT_STR_EQ(list->items[299].identifier, "topic_299"); + + cbm_sl_endpoint_list_free(list); + PASS(); +} + +TEST(endpoint_list_skips_empty_identifier) { + cbm_sl_endpoint_list_t *list = cbm_sl_endpoint_list_new(); + ASSERT_NOT_NULL(list); + + /* Empty string identifier should be skipped */ + sl_register_endpoint(list, "proj", "kafka", "producer", + "", "fn", "file.ts", ""); + ASSERT_EQ(list->count, 0); + + /* NULL identifier should be skipped */ + sl_register_endpoint(list, "proj", "kafka", "producer", + NULL, "fn", "file.ts", ""); + ASSERT_EQ(list->count, 0); + + /* NULL list should not crash */ + sl_register_endpoint(NULL, "p", "proto", "role", "id", "qn", "f", "e"); + + cbm_sl_endpoint_list_free(list); + PASS(); +} + +SUITE(endpoint_registry) { + RUN_TEST(endpoint_list_create_and_free); + RUN_TEST(endpoint_list_register_and_count); + RUN_TEST(endpoint_list_grows_beyond_initial_capacity); + RUN_TEST(endpoint_list_skips_empty_identifier); +} diff --git a/tests/test_main.c b/tests/test_main.c index 1f720d9c..fc1e33bf 100644 --- a/tests/test_main.c +++ b/tests/test_main.c @@ -71,6 +71,7 @@ extern void suite_integration(void); extern void suite_incremental(void); extern void suite_simhash(void); extern void suite_stack_overflow(void); +extern void suite_endpoint_registry(void); int main(void) { printf("\n codebase-memory-mcp C test suite\n"); @@ -188,6 +189,9 @@ int main(void) { RUN_SUITE(integration); RUN_SUITE(incremental); + /* Cross-repo endpoint registry */ + RUN_SUITE(endpoint_registry); + /* Release sqlite3 internal caches so ASan doesn't report them as leaks */ sqlite3_shutdown(); TEST_SUMMARY();