Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/cli/commands/daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { printBanner } from "../ascii-banner.js";
import { loadChallengesIntoPKC, formatChallengeNameVersion } from "../../challenge-packages/challenge-utils.js";
import { migrateDataDirectory } from "../../common-utils/data-migration.js";
import { createBsoResolvers, DEFAULT_PROVIDERS } from "../../common-utils/resolvers.js";
import { pruneStaleStates, writeDaemonState, deleteDaemonState, DAEMON_SHUTDOWN_TIMEOUT_MS } from "../../common-utils/daemon-state.js";
import { pruneStaleStates, writeDaemonState, deleteDaemonState, detectSelfSupervisor, DAEMON_SHUTDOWN_TIMEOUT_MS } from "../../common-utils/daemon-state.js";
import { createDaemonFileLogger, type DaemonFileLogger } from "../../common-utils/daemon-file-logger.js";
import fs from "fs";
import fsPromise from "fs/promises";
Expand Down Expand Up @@ -327,13 +327,17 @@ export default class Daemon extends Command {
// Prune stale daemon state files (dead PIDs from crashed daemons)
await pruneStaleStates();

// Persist this daemon's PID and startup args so `bitsocial update install --restart-daemons` can stop and restart it
// Persist this daemon's PID and startup args so `bitsocial update install --restart-daemons` can stop and restart it.
// Also record the supervisor (e.g. systemd) so the updater restarts via the supervisor instead of spawning a
// detached daemon that would compete with it for the RPC port (issue #82).
const daemonArgv = process.argv.slice(process.argv.indexOf("daemon") + 1);
const supervisor = await detectSelfSupervisor();
await writeDaemonState({
pid: process.pid,
startedAt: new Date().toISOString(),
argv: daemonArgv,
pkcRpcUrl: pkcRpcUrl.toString()
pkcRpcUrl: pkcRpcUrl.toString(),
...(supervisor ? { supervisor } : {})
});

// Create BSO name resolvers for .bso/.eth domain resolution
Expand Down
173 changes: 105 additions & 68 deletions src/cli/commands/update/install.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,20 @@ import tcpPortUsed from "tcp-port-used";
import { fetchLatestVersion, installGlobal } from "../../../update/npm-registry.js";
import { fastInstallGlobal } from "../../../update/fast-update.js";
import { compareVersions } from "../../../update/semver.js";
import { getAliveDaemonStates, DAEMON_SHUTDOWN_TIMEOUT_MS, type DaemonState } from "../../../common-utils/daemon-state.js";
import { systemctlRestart } from "../../../update/systemctl.js";
import {
getAliveDaemonStates,
resolveDaemonSupervisor,
DAEMON_SHUTDOWN_TIMEOUT_MS,
type DaemonSupervisor
} from "../../../common-utils/daemon-state.js";
import {
planDaemonRestarts,
stopUnmanagedDaemons,
startUnmanagedDaemons,
restartManagedDaemons,
type DaemonLifecycle
} from "../../../update/restart-orchestration.js";

export default class Install extends Command {
static override description = "Install a specific version of bitsocial from npm";
Expand Down Expand Up @@ -40,48 +53,29 @@ export default class Install extends Command {
async run(): Promise<void> {
const { args, flags } = await this.parse(Install);

// Check for running daemons via state files
// Discover running daemons and split them into supervisor-managed vs. updater-managed (issue #82).
// Supervised daemons (e.g. systemd) are restarted through their supervisor; spawning a detached
// replacement ourselves would create a process the supervisor doesn't own that competes with it
// for the RPC port and triggers a restart loop.
const aliveDaemons = await getAliveDaemonStates();
const plan = await planDaemonRestarts(aliveDaemons, (d) => resolveDaemonSupervisor(d));
const lifecycle = this._daemonLifecycle();

if (aliveDaemons.length > 0) {
if (!flags["restart-daemons"]) {
this.error(
`${aliveDaemons.length} daemon(s) running. Stop them first, then retry.`,
{ exit: 1 }
);
}

// Stop all running daemons
for (const d of aliveDaemons) {
this.log(`Stopping daemon (PID ${d.pid})...`);
try {
process.kill(d.pid, "SIGINT");
} catch (e) {
if ((e as NodeJS.ErrnoException).code === "ESRCH") {
this.log(` PID ${d.pid} already exited.`);
continue;
}
throw e;
}
this.error(`${aliveDaemons.length} daemon(s) running. Stop them first, then retry.`, { exit: 1 });
}

// Wait for each daemon process to fully exit — NOT just for its RPC port to free.
// The daemon releases its RPC port (daemonServer.destroy()) before it finishes killing
// its kubo child, so a port-only wait lets us restart while the old kubo still holds the
// IPFS API port; the new daemon then dies on startup with "IPFS API port already in use"
// (issue #70). The daemon's exit hook kills kubo before the process exits, so waiting for
// the PID to disappear guarantees the kubo port is free before we restart.
for (const d of aliveDaemons) {
this.log(`Waiting for daemon (PID ${d.pid}) to exit...`);
const exited = await this._waitForProcessExit(d.pid, DAEMON_SHUTDOWN_TIMEOUT_MS);
if (!exited) {
this.error(
`Daemon (PID ${d.pid}) did not shut down within ${DAEMON_SHUTDOWN_TIMEOUT_MS / 1000} seconds.`,
{ exit: 1 }
);
}
// Stop only the unsupervised daemons before the binary swap. Supervised daemons keep running
// and are restarted by their supervisor afterwards (see _restartViaSupervisor).
await stopUnmanagedDaemons(plan, lifecycle);
if (plan.unmanaged.length > 0) this.log("All unsupervised daemons stopped.");
for (const { daemon, supervisor } of plan.managed) {
this.log(
`Daemon (PID ${daemon.pid}) is managed by ${supervisor.type} (${supervisor.unit}); ` +
`it will be restarted by its supervisor.`
);
}
this.log("All daemons stopped.");
}

// Resolve the target version
Expand All @@ -101,10 +95,9 @@ export default class Install extends Command {
// Skip if already on this version (unless --force)
if (compareVersions(current, targetVersion) === 0 && !flags.force) {
this.log(`Already on v${current}. Use --force to reinstall.`);
if (aliveDaemons.length > 0 && flags["restart-daemons"]) {
// We stopped daemons but don't need to update — restart them
await this._restartDaemons(aliveDaemons);
}
// We stopped the unsupervised daemons but aren't updating — bring them back. Supervised daemons
// were never stopped, so leave them running (no unnecessary service bounce).
if (flags["restart-daemons"]) await startUnmanagedDaemons(plan, lifecycle);
return;
}

Expand Down Expand Up @@ -132,10 +125,13 @@ export default class Install extends Command {

this.log(`Installed bitsocial v${targetVersion} (was v${current}).`);

// Restart daemons with the new binary
// Restart daemons with the new binary: re-spawn the unsupervised ones we stopped, and ask each
// supervisor to restart its daemon onto the new binary.
if (aliveDaemons.length > 0 && flags["restart-daemons"]) {
await this._restartDaemons(aliveDaemons);
await startUnmanagedDaemons(plan, lifecycle);
await restartManagedDaemons(plan, lifecycle);
this.log("To see the daemon logs run `bitsocial logs --stdout`");
this.log("Check community status with: bitsocial community list");
}
}

Expand Down Expand Up @@ -163,36 +159,77 @@ export default class Install extends Command {
return false;
}

private async _restartDaemons(daemons: DaemonState[]): Promise<void> {
this.log(`Restarting ${daemons.length} daemon(s)...`);

for (const d of daemons) {
const argStr = d.argv.length > 0 ? d.argv.join(" ") : "(defaults)";
this.log(` Starting daemon with args: ${argStr}`);
/** Build the side effects that the restart orchestration drives (split out so the routing is testable). */
private _daemonLifecycle(): DaemonLifecycle {
return {
stopUnmanaged: async (daemon) => {
this.log(`Stopping daemon (PID ${daemon.pid})...`);
try {
process.kill(daemon.pid, "SIGINT");
} catch (e) {
if ((e as NodeJS.ErrnoException).code === "ESRCH") {
this.log(` PID ${daemon.pid} already exited.`);
return;
}
throw e;
}

const child = spawn("bitsocial", ["daemon", ...d.argv], {
detached: true,
stdio: "ignore"
});
child.unref();
// Wait for the process to fully exit — NOT just for its RPC port to free. The daemon
// releases its RPC port (daemonServer.destroy()) before it finishes killing its kubo
// child, so a port-only wait lets us restart while the old kubo still holds the IPFS API
// port; the new daemon then dies on "IPFS API port already in use" (issue #70). The
// daemon's exit hook kills kubo before exiting, so "PID gone" guarantees kubo is free.
this.log(`Waiting for daemon (PID ${daemon.pid}) to exit...`);
const exited = await this._waitForProcessExit(daemon.pid, DAEMON_SHUTDOWN_TIMEOUT_MS);
if (!exited) {
this.error(
`Daemon (PID ${daemon.pid}) did not shut down within ${DAEMON_SHUTDOWN_TIMEOUT_MS / 1000} seconds.`,
{ exit: 1 }
);
}
},
startUnmanaged: async (daemon) => {
const argStr = daemon.argv.length > 0 ? daemon.argv.join(" ") : "(defaults)";
this.log(`Restarting daemon with args: ${argStr}`);

const child = spawn("bitsocial", ["daemon", ...daemon.argv], {
detached: true,
stdio: "ignore"
});
child.unref();

if (!child.pid) {
this.warn(`Failed to spawn daemon for args: ${argStr}`);
return;
}

if (!child.pid) {
this.warn(`Failed to spawn daemon for args: ${argStr}`);
continue;
// Wait briefly for the daemon's RPC port to come up
const port = Number(new URL(daemon.pkcRpcUrl).port);
const started = await tcpPortUsed.waitUntilUsed(port, 500, 30000).then(() => true).catch(() => false);
if (started) {
this.log(` Daemon started (port ${port}).`);
} else {
this.warn(` Daemon may not have started — port ${port} not responding after 30s. Check logs with: bitsocial logs`);
}
},
restartManaged: async (supervisor) => {
await this._restartViaSupervisor(supervisor);
}
};
}

// Wait briefly for the daemon's RPC port to come up
const url = new URL(d.pkcRpcUrl);
const port = Number(url.port);
const started = await tcpPortUsed.waitUntilUsed(port, 500, 30000).then(() => true).catch(() => false);
if (started) {
this.log(` Daemon started (port ${port}).`);
} else {
this.warn(` Daemon may not have started — port ${port} not responding after 30s. Check logs with: bitsocial logs`);
}
/** Restart a supervised daemon onto the new binary by asking its supervisor (e.g. systemd). */
private async _restartViaSupervisor(supervisor: DaemonSupervisor): Promise<void> {
this.log(`Restarting ${supervisor.type} unit ${supervisor.unit} so it picks up the new binary...`);
try {
await systemctlRestart(supervisor.unit);
this.log(` ${supervisor.unit} restarted.`);
} catch (err) {
this.error(
`Updated the binary but failed to restart ${supervisor.type} unit ${supervisor.unit}: ${(err as Error).message}. ` +
`Restart it manually, e.g. 'sudo systemctl restart ${supervisor.unit}'.`,
{ exit: 1 }
);
}

this.log("Check community status with: bitsocial community list");
this.log("Check logs with: bitsocial logs");
}
}
71 changes: 71 additions & 0 deletions src/common-utils/daemon-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,84 @@ const DAEMON_STATES_DIR = path.join(defaults.PKC_DATA_PATH, ".daemon_states");
*/
export const DAEMON_SHUTDOWN_TIMEOUT_MS = 120000;

/**
* How a daemon's lifecycle is managed by an external supervisor. Recorded at startup so that
* `update install` restarts the daemon through its supervisor instead of spawning a detached
* replacement that would compete with the supervisor for the RPC port (issue #82).
*/
export interface DaemonSupervisor {
/** The supervisor managing this daemon. Only systemd is detected today. */
type: "systemd";
/** The unit that owns the daemon, e.g. "bitsocial.service". */
unit: string;
}

export interface DaemonState {
pid: number;
startedAt: string;
argv: string[];
pkcRpcUrl: string;
/** OS-reported process start time, used to detect PID reuse. Absent in legacy state files. */
procStartTime?: string;
/** External supervisor managing this daemon, if any. Absent for standalone or legacy daemons. */
supervisor?: DaemonSupervisor;
}

/**
* Parse the systemd service unit a process belongs to out of its cgroup contents, or undefined.
* cgroup v2: a single line `0::/system.slice/bitsocial.service`
* cgroup v1: many `id:controller:/system.slice/bitsocial.service` lines (all point at the same unit)
* The unit is the leaf of the cgroup path when it ends in `.service`. A user session has a `.scope`
* leaf (e.g. `…/session-36.scope`) — not a service — so it returns undefined (that daemon is not
* systemd-supervised even if it happens to live under system.slice somewhere up the tree).
*/
export function parseSystemdUnitFromCgroup(content: string): string | undefined {
for (const line of content.split("\n")) {
if (!line.trim()) continue;
// hierarchy-id:controller-list:cgroup-path — the path is the last colon-separated field
const cgroupPath = line.slice(line.lastIndexOf(":") + 1);
const leaf = cgroupPath.slice(cgroupPath.lastIndexOf("/") + 1);
if (leaf.endsWith(".service")) return leaf;
}
return undefined;
}

/** Read the systemd unit owning `pid` (or the current process when "self") from /proc, or undefined. */
export async function readSystemdUnit(pid: number | "self"): Promise<string | undefined> {
try {
const content = await fs.readFile(`/proc/${pid}/cgroup`, "utf-8");
return parseSystemdUnitFromCgroup(content);
} catch {
return undefined; // no /proc (non-Linux) or unreadable — treat as unsupervised
}
}

/**
* Detect whether THIS process was started by systemd, and under which unit. systemd sets
* $INVOCATION_ID for every service it spawns; the unit name comes from this process's own cgroup.
* `env`/`readUnit` are injectable for testing. Returns undefined when not systemd-supervised.
*/
export async function detectSelfSupervisor(
env: NodeJS.ProcessEnv = process.env,
readUnit: (pid: number | "self") => Promise<string | undefined> = readSystemdUnit
): Promise<DaemonSupervisor | undefined> {
if (!env.INVOCATION_ID) return undefined;
const unit = await readUnit("self");
return unit ? { type: "systemd", unit } : undefined;
}

/**
* Resolve the supervisor for a daemon described by `state`. Prefers the `supervisor` it recorded
* at startup; for legacy daemons that predate that field, falls back to inferring the unit from the
* live process's cgroup. `readUnit` is injectable for testing.
*/
export async function resolveDaemonSupervisor(
state: DaemonState,
readUnit: (pid: number | "self") => Promise<string | undefined> = readSystemdUnit
): Promise<DaemonSupervisor | undefined> {
if (state.supervisor) return state.supervisor;
const unit = await readUnit(state.pid);
return unit ? { type: "systemd", unit } : undefined;
}

function stateFilePath(pid: number): string {
Expand Down
Loading
Loading