Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
-- Namespace + visibility migration is implemented idempotently in migrator.ts
-- because fresh installs may already get these columns from 001-initial.sql.
-- Namespace + visibility migration is implemented programmatically in migrator.ts.
-- See ensureNamespaceColumns() (phase 1: fast DDL, runs inside transaction) and
-- ensureNamespaceIndexesAndBackfill() (phase 2: batched backfill + index creation,
-- runs outside transaction so the bridge can be safely interrupted and restarted).
291 changes: 106 additions & 185 deletions apps/memos-local-plugin/core/storage/migrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,7 @@ export interface MigrationsResult {
*/
export function defaultMigrationsDir(): string {
const here = path.dirname(fileURLToPath(import.meta.url));
const compiled = path.join(here, "migrations");
if (fs.existsSync(compiled)) return compiled;

// Local package installs keep source files for debugging; this fallback
// makes compiled code resilient if runtime assets were not copied.
const source = path.resolve(here, "..", "..", "..", "core", "storage", "migrations");
return fs.existsSync(source) ? source : compiled;
return path.join(here, "migrations");
}

export function discoverMigrations(dir: string): MigrationFile[] {
Expand Down Expand Up @@ -115,7 +109,7 @@ export function runMigrations(db: StorageDb, dir: string = defaultMigrationsDir(
}
const t0 = now();
db.tx(() => {
applyMigration(db, file);
applyMigrationDdl(db, file);
db.prepare(
`INSERT INTO schema_migrations (version, name, applied_at) VALUES (@version, @name, @applied_at)`,
).run({ version: file.version, name: file.name, applied_at: now() });
Expand All @@ -133,7 +127,15 @@ export function runMigrations(db: StorageDb, dir: string = defaultMigrationsDir(
if (needsUnsafe) db.raw.unsafeMode(false);
}

ensureHubSharingSearchColumns(db);
// Phase 2 of migration 007: ensure all namespace columns exist on every table
// (idempotent — ensureColumn skips if already present), then batched share_scope
// backfill and index creation. Runs after every startup; if the bridge is killed
// mid-backfill it resumes where it left off on the next boot.
ensureNamespaceColumns(db);
if (columnExists(db, "traces", "owner_agent_kind")) {
ensureNamespaceIndexesAndBackfill(db);
}

markReady(db);

log.info("migrations.summary", {
Expand All @@ -155,207 +157,126 @@ function migrationNeedsUnsafeMode(fullPath: string): boolean {
return /PRAGMA\s+writable_schema/i.test(sql);
}

function applyMigration(db: StorageDb, file: MigrationFile): void {
if (file.version === 3 && file.name === "embedding-retry-lease") {
ensureEmbeddingRetryLeaseColumns(db);
return;
}
if (file.version === 4 && file.name === "skill-usage") {
ensureSkillUsageColumns(db);
return;
}
if (file.version === 6 && file.name === "world-model-version") {
if (tableExists(db, "world_model")) {
ensureColumn(db, "world_model", "version", "INTEGER NOT NULL DEFAULT 1");
}
return;
}
if (file.version === 7 && file.name === "namespace-visibility") {
ensureNamespaceVisibilityColumns(db);
return;
}
if (file.version === 8 && file.name === "feedback-experience-metadata") {
ensureFeedbackExperienceMetadataColumns(db);
return;
}
if (file.version === 9 && file.name === "policies-fts") {
if (tableExists(db, "policies")) {
db.exec(fs.readFileSync(file.fullPath, "utf8"));
}
return;
}
db.exec(fs.readFileSync(file.fullPath, "utf8"));
}
// ── Migration 007 (namespace-visibility) ─────────────────────────────────────
//
// Two-phase design breaks the O(n) crash-loop:
//
// Phase 1 — inside the migration transaction:
// ADD COLUMN only. Metadata-only, completes in milliseconds regardless of
// DB size. The schema_migrations record commits here.
//
// Phase 2 — after the migration loop, outside any transaction:
// Batched UPDATE + CREATE INDEX. Each 2,000-row UPDATE batch is its own
// implicit transaction, so a killed bridge resumes mid-backfill rather than
// restarting the whole migration. The migration 007 record has already
// committed, so Phase 1 is skipped on the next boot.

function ensureEmbeddingRetryLeaseColumns(db: StorageDb): void {
const columns = new Set(
db.prepare<unknown, { name: string }>(`PRAGMA table_info(embedding_retry_queue)`)
.all()
.map((row) => row.name),
);
if (!columns.has("claimed_by")) {
db.exec(`ALTER TABLE embedding_retry_queue ADD COLUMN claimed_by TEXT`);
}
if (!columns.has("lease_until")) {
db.exec(`ALTER TABLE embedding_retry_queue ADD COLUMN lease_until INTEGER`);
}
}
const NS_TABLES = ["sessions", "episodes", "traces", "policies", "world_model", "skills", "feedback", "decision_repairs", "l2_candidate_pool", "skill_trials", "api_logs", "audit_events"] as const;

function ensureSkillUsageColumns(db: StorageDb): void {
const table = db
.prepare<unknown, { name: string }>(
`SELECT name FROM sqlite_master WHERE type='table' AND name='skills'`,
)
.get();
if (!table) return;
const columns = new Set(
db.prepare<unknown, { name: string }>(`PRAGMA table_info(skills)`)
.all()
.map((row) => row.name),
);
if (!columns.has("usage_count")) {
db.exec(`ALTER TABLE skills ADD COLUMN usage_count INTEGER NOT NULL DEFAULT 0`);
}
if (!columns.has("last_used_at")) {
db.exec(`ALTER TABLE skills ADD COLUMN last_used_at INTEGER`);
const SHARE_TABLES = ["episodes", "traces", "policies", "world_model", "skills"] as const;

function applyMigrationDdl(db: StorageDb, file: MigrationFile): void {
if (file.version === 7) {
ensureNamespaceColumns(db);
return;
}
const sql = fs.readFileSync(file.fullPath, "utf8");
db.exec(sql);
}

function ensureNamespaceVisibilityColumns(db: StorageDb): void {
const ownerTables = [
"sessions",
"episodes",
"traces",
"policies",
"world_model",
"skills",
"feedback",
"decision_repairs",
"l2_candidate_pool",
"skill_trials",
"api_logs",
"audit_events",
];
for (const table of ownerTables) {
function ensureNamespaceColumns(db: StorageDb): void {
// Owner columns on ALL namespace tables (NOT NULL with defaults —
// matches the original v2.0.5 migration schema).
for (const table of NS_TABLES) {
if (!tableExists(db, table)) continue;
ensureColumn(db, table, "owner_agent_kind", "TEXT NOT NULL DEFAULT 'unknown'");
ensureColumn(db, table, "owner_profile_id", "TEXT NOT NULL DEFAULT 'default'");
ensureColumn(db, table, "owner_workspace_id", "TEXT");
}
for (const table of ["episodes", "traces", "policies", "world_model", "skills"]) {
// share_scope only on content-bearing tables.
for (const table of SHARE_TABLES) {
if (!tableExists(db, table)) continue;
ensureColumn(db, table, "share_scope", "TEXT DEFAULT 'private'");
db.exec(`UPDATE ${table} SET share_scope='private' WHERE share_scope IS NULL`);
}

execIfTable(db, "skills", `DROP INDEX IF EXISTS uq_skills_name`);
execIfTable(db, "sessions", `CREATE INDEX IF NOT EXISTS idx_sessions_owner ON sessions(owner_agent_kind, owner_profile_id, last_seen_at DESC)`);
execIfTable(db, "episodes", `CREATE INDEX IF NOT EXISTS idx_episodes_owner ON episodes(owner_agent_kind, owner_profile_id, started_at DESC)`);
execIfTable(db, "episodes", `CREATE INDEX IF NOT EXISTS idx_episodes_share ON episodes(share_scope, started_at DESC)`);
execIfTable(db, "traces", `CREATE INDEX IF NOT EXISTS idx_traces_owner ON traces(owner_agent_kind, owner_profile_id, ts DESC)`);
execIfTable(db, "traces", `CREATE INDEX IF NOT EXISTS idx_traces_share ON traces(share_scope, ts DESC)`);
execIfTable(db, "policies", `CREATE INDEX IF NOT EXISTS idx_policies_owner ON policies(owner_agent_kind, owner_profile_id, updated_at DESC)`);
execIfTable(db, "policies", `CREATE INDEX IF NOT EXISTS idx_policies_share ON policies(share_scope, updated_at DESC)`);
execIfTable(db, "world_model", `CREATE INDEX IF NOT EXISTS idx_world_owner ON world_model(owner_agent_kind, owner_profile_id, updated_at DESC)`);
execIfTable(db, "world_model", `CREATE INDEX IF NOT EXISTS idx_world_share ON world_model(share_scope, updated_at DESC)`);
execIfTable(db, "skills", `CREATE UNIQUE INDEX IF NOT EXISTS uq_skills_owner_name ON skills(owner_agent_kind, owner_profile_id, name)`);
execIfTable(db, "skills", `CREATE INDEX IF NOT EXISTS idx_skills_owner ON skills(owner_agent_kind, owner_profile_id, updated_at DESC)`);
execIfTable(db, "skills", `CREATE INDEX IF NOT EXISTS idx_skills_share ON skills(share_scope, updated_at DESC)`);
execIfTable(db, "feedback", `CREATE INDEX IF NOT EXISTS idx_feedback_owner ON feedback(owner_agent_kind, owner_profile_id, ts DESC)`);
execIfTable(db, "decision_repairs", `CREATE INDEX IF NOT EXISTS idx_repairs_owner ON decision_repairs(owner_agent_kind, owner_profile_id, ts DESC)`);
execIfTable(db, "l2_candidate_pool", `CREATE INDEX IF NOT EXISTS idx_l2_candidate_owner ON l2_candidate_pool(owner_agent_kind, owner_profile_id, expires_at)`);
execIfTable(db, "skill_trials", `CREATE INDEX IF NOT EXISTS idx_skill_trials_owner ON skill_trials(owner_agent_kind, owner_profile_id, created_at DESC)`);
execIfTable(db, "api_logs", `CREATE INDEX IF NOT EXISTS idx_api_logs_owner ON api_logs(owner_agent_kind, owner_profile_id, called_at DESC)`);
execIfTable(db, "audit_events", `CREATE INDEX IF NOT EXISTS idx_audit_owner ON audit_events(owner_agent_kind, owner_profile_id, ts DESC)`);
// Uniqueness on skills.name breaks with namespace isolation — multiple agents
// can legitimately own a skill with the same name.
db.exec(`DROP INDEX IF EXISTS uq_skills_name`);
}

function ensureFeedbackExperienceMetadataColumns(db: StorageDb): void {
if (!tableExists(db, "policies")) return;
ensureColumn(
db,
"policies",
"experience_type",
`TEXT NOT NULL DEFAULT 'success_pattern'
CHECK (experience_type IN ('success_pattern','repair_validated','failure_avoidance','repair_instruction','preference','verifier_feedback','procedural'))`,
);
ensureColumn(
db,
"policies",
"evidence_polarity",
`TEXT NOT NULL DEFAULT 'positive'
CHECK (evidence_polarity IN ('positive','negative','neutral','mixed'))`,
);
ensureColumn(db, "policies", "salience", "REAL NOT NULL DEFAULT 0");
ensureColumn(db, "policies", "confidence", "REAL NOT NULL DEFAULT 0.5");
ensureColumn(
db,
"policies",
"source_feedback_ids_json",
"TEXT NOT NULL DEFAULT '[]' CHECK (json_valid(source_feedback_ids_json))",
);
ensureColumn(
db,
"policies",
"source_trace_ids_json",
"TEXT NOT NULL DEFAULT '[]' CHECK (json_valid(source_trace_ids_json))",
);
ensureColumn(
db,
"policies",
"verifier_meta_json",
"TEXT NOT NULL DEFAULT 'null' CHECK (json_valid(verifier_meta_json))",
);
ensureColumn(
db,
"policies",
"skill_eligible",
"INTEGER NOT NULL DEFAULT 1 CHECK (skill_eligible IN (0,1))",
);
db.exec(`CREATE INDEX IF NOT EXISTS idx_policies_experience ON policies(experience_type, evidence_polarity, updated_at DESC)`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_policies_skill_eligible ON policies(skill_eligible, status, updated_at DESC)`);
}
function ensureNamespaceIndexesAndBackfill(db: StorageDb): void {
// Backfill share_scope in batches so each chunk is its own transaction.
for (const table of SHARE_TABLES) {
if (!tableExists(db, table) || !columnExists(db, table, "share_scope")) continue;
const stmt = db.prepare(
`UPDATE ${table} SET share_scope = 'private'
WHERE share_scope IS NULL
AND rowid IN (SELECT rowid FROM ${table} WHERE share_scope IS NULL LIMIT 2000)`,
);
let total = 0;
for (;;) {
const result = stmt.run() as { changes: number };
if (result.changes === 0) break;
total += result.changes;
}
if (total > 0) log.info("migration.backfill", { table, rows: total });
}

function ensureHubSharingSearchColumns(db: StorageDb): void {
if (!tableExists(db, "hub_shared_memories")) return;
ensureColumn(db, "hub_shared_memories", "embedding", "BLOB");
ensureColumn(db, "hub_shared_memories", "embedding_norm2", "REAL");
ensureColumn(
db,
"hub_shared_memories",
"visible",
"INTEGER NOT NULL DEFAULT 1 CHECK (visible IN (0,1))",
);
ensureColumn(db, "hub_shared_memories", "deleted_at", "INTEGER");
db.exec(
`CREATE INDEX IF NOT EXISTS idx_hub_shared_memories_deleted
ON hub_shared_memories(visible, deleted_at)
WHERE visible = 0 AND deleted_at IS NOT NULL`,
);
// Create owner/share indexes matching the full v2.0.5 schema.
// IF NOT EXISTS makes each call idempotent; we log duration so a slow
// build is visible in the agent log for future diagnosis.
const indexes = [
{ index: "idx_sessions_owner", table: "sessions", ddl: `CREATE INDEX IF NOT EXISTS idx_sessions_owner ON sessions(owner_agent_kind, owner_profile_id, last_seen_at DESC)` },
{ index: "idx_episodes_owner", table: "episodes", ddl: `CREATE INDEX IF NOT EXISTS idx_episodes_owner ON episodes(owner_agent_kind, owner_profile_id, started_at DESC)` },
{ index: "idx_episodes_share", table: "episodes", ddl: `CREATE INDEX IF NOT EXISTS idx_episodes_share ON episodes(share_scope, started_at DESC)` },
{ index: "idx_traces_owner", table: "traces", ddl: `CREATE INDEX IF NOT EXISTS idx_traces_owner ON traces(owner_agent_kind, owner_profile_id, ts DESC)` },
{ index: "idx_traces_share", table: "traces", ddl: `CREATE INDEX IF NOT EXISTS idx_traces_share ON traces(share_scope, ts DESC)` },
{ index: "idx_policies_owner", table: "policies", ddl: `CREATE INDEX IF NOT EXISTS idx_policies_owner ON policies(owner_agent_kind, owner_profile_id, updated_at DESC)` },
{ index: "idx_policies_share", table: "policies", ddl: `CREATE INDEX IF NOT EXISTS idx_policies_share ON policies(share_scope, updated_at DESC)` },
{ index: "idx_world_owner", table: "world_model", ddl: `CREATE INDEX IF NOT EXISTS idx_world_owner ON world_model(owner_agent_kind, owner_profile_id, updated_at DESC)` },
{ index: "idx_world_share", table: "world_model", ddl: `CREATE INDEX IF NOT EXISTS idx_world_share ON world_model(share_scope, updated_at DESC)` },
{ index: "uq_skills_owner_name", table: "skills", ddl: `CREATE UNIQUE INDEX IF NOT EXISTS uq_skills_owner_name ON skills(owner_agent_kind, owner_profile_id, name)` },
{ index: "idx_skills_owner", table: "skills", ddl: `CREATE INDEX IF NOT EXISTS idx_skills_owner ON skills(owner_agent_kind, owner_profile_id, updated_at DESC)` },
{ index: "idx_skills_share", table: "skills", ddl: `CREATE INDEX IF NOT EXISTS idx_skills_share ON skills(share_scope, updated_at DESC)` },
{ index: "idx_feedback_owner", table: "feedback", ddl: `CREATE INDEX IF NOT EXISTS idx_feedback_owner ON feedback(owner_agent_kind, owner_profile_id, ts DESC)` },
{ index: "idx_repairs_owner", table: "decision_repairs", ddl: `CREATE INDEX IF NOT EXISTS idx_repairs_owner ON decision_repairs(owner_agent_kind, owner_profile_id, ts DESC)` },
{ index: "idx_l2_candidate_owner", table: "l2_candidate_pool", ddl: `CREATE INDEX IF NOT EXISTS idx_l2_candidate_owner ON l2_candidate_pool(owner_agent_kind, owner_profile_id, expires_at)` },
{ index: "idx_skill_trials_owner", table: "skill_trials", ddl: `CREATE INDEX IF NOT EXISTS idx_skill_trials_owner ON skill_trials(owner_agent_kind, owner_profile_id, created_at DESC)` },
{ index: "idx_api_logs_owner", table: "api_logs", ddl: `CREATE INDEX IF NOT EXISTS idx_api_logs_owner ON api_logs(owner_agent_kind, owner_profile_id, called_at DESC)` },
{ index: "idx_audit_owner", table: "audit_events", ddl: `CREATE INDEX IF NOT EXISTS idx_audit_owner ON audit_events(owner_agent_kind, owner_profile_id, ts DESC)` },
];
for (const { index, table, ddl } of indexes) {
if (!tableExists(db, table) || !columnExists(db, table, "owner_agent_kind")) continue;
const t0 = now();
db.exec(ddl);
log.info("migration.index", { index, durationMs: now() - t0 });
}
}

function execIfTable(db: StorageDb, table: string, sql: string): void {
if (tableExists(db, table)) db.exec(sql);
}
// ── Helpers ───────────────────────────────────────────────────────────────────

function tableExists(db: StorageDb, table: string): boolean {
return Boolean(
db.prepare<{ name: string }, { name: string }>(
`SELECT name FROM sqlite_master WHERE type='table' AND name=@name`,
).get({ name: table }),
);
return !!db
.prepare<unknown, { name: string }>(
`SELECT name FROM sqlite_master WHERE type='table' AND name=?`,
)
.get(table);
}

function columnExists(db: StorageDb, table: string, column: string): boolean {
// table names here are internal constants — interpolation is safe.
const rows = db
.prepare<unknown, { name: string }>(`PRAGMA table_info(${table})`)
.all();
return rows.some((r) => r.name === column);
}

function ensureColumn(db: StorageDb, table: string, column: string, definition: string): void {
const columns = new Set(
db.prepare<unknown, { name: string }>(`PRAGMA table_info(${table})`)
.all()
.map((row) => row.name),
);
if (!columns.has(column)) {
db.exec(`ALTER TABLE ${table} ADD COLUMN ${column} ${definition}`);
}
if (!tableExists(db, table) || columnExists(db, table, column)) return;
db.exec(`ALTER TABLE ${table} ADD COLUMN ${column} ${definition}`);
}

// ─────────────────────────────────────────────────────────────────────────────

function ensureSchemaMigrationsTable(db: StorageDb): void {
db.exec(
`CREATE TABLE IF NOT EXISTS schema_migrations (
Expand Down