diff --git a/packages/agent/src/embedded-workflow-executor.ts b/packages/agent/src/embedded-workflow-executor.ts index 6a7f362a39..8be320bb5a 100644 --- a/packages/agent/src/embedded-workflow-executor.ts +++ b/packages/agent/src/embedded-workflow-executor.ts @@ -3,7 +3,7 @@ import type { WorkflowExecutor } from '@forestadmin/workflow-executor'; import path from 'path'; -/** Default loopback port for an embedded workflow executor (mirrors the executor CLI's HTTP_PORT). */ +/** Default loopback port for an embedded workflow executor. */ const DEFAULT_EMBEDDED_EXECUTOR_PORT = 3400; /** @@ -55,8 +55,14 @@ export default class EmbeddedWorkflowExecutor { ); } - const port = - embedOptions.port ?? (Number(process.env.HTTP_PORT) || DEFAULT_EMBEDDED_EXECUTOR_PORT); + if (embedOptions.inMemory && embedOptions.database) { + throw new Error( + 'addWorkflowExecutor: `inMemory` and `database` are mutually exclusive. The in-memory run ' + + 'store persists nothing; drop one of the two options.', + ); + } + + const port = embedOptions.port ?? DEFAULT_EMBEDDED_EXECUTOR_PORT; this.config = { ...embedOptions, port }; return `http://127.0.0.1:${port}`; @@ -84,19 +90,9 @@ export default class EmbeddedWorkflowExecutor { ); } - const database = - config.database ?? (process.env.DATABASE_URL ? { uri: process.env.DATABASE_URL } : undefined); + const { buildDatabaseExecutor, buildInMemoryExecutor } = await this.importPackage(); - if (!database) { - throw new Error( - 'Embedded workflow executor requires a database to persist run state. Pass `database` to ' + - 'addWorkflowExecutor() or set the DATABASE_URL environment variable.', - ); - } - - const { buildDatabaseExecutor } = await this.importPackage(); - - this.executor = buildDatabaseExecutor({ + const commonOptions = { envSecret: this.options.envSecret, authSecret: this.options.authSecret, forestServerUrl: this.options.forestServerUrl, @@ -108,8 +104,32 @@ export default class EmbeddedWorkflowExecutor { // Embedded: the host process owns SIGTERM/SIGINT; the executor must not exit it. agent.stop() // drains the executor explicitly. manageProcessSignals: false, - database: database as Parameters[0]['database'], - }); + }; + + if (config.inMemory) { + this.options.logger( + 'Warn', + formatLog( + 'Using an in-memory run store: workflow runs are kept in memory and lost on restart. ' + + 'Pass a `database` to addWorkflowExecutor() to persist them in production.', + ), + ); + this.executor = buildInMemoryExecutor(commonOptions); + } else { + const { database } = config; + + if (!database) { + throw new Error( + 'Embedded workflow executor requires a database to persist run state. Pass `database` ' + + 'to addWorkflowExecutor(), or use `inMemory: true`.', + ); + } + + this.executor = buildDatabaseExecutor({ + ...commonOptions, + database: database as Parameters[0]['database'], + }); + } await this.executor.start(); this.options.logger( diff --git a/packages/agent/src/types.ts b/packages/agent/src/types.ts index 37faa57585..6e7a955e4d 100644 --- a/packages/agent/src/types.ts +++ b/packages/agent/src/types.ts @@ -62,10 +62,16 @@ export type AgentOptions = { * through `agent.addWorkflowExecutor()`. */ export type WorkflowExecutorEmbedOptions = { + /** + * Use an in-memory run store instead of a database. No database is required, but runs are lost + * when the process restarts, so it is not meant for production. Mutually exclusive with + * `database`. + */ + inMemory?: boolean; /** * Database connection used to persist workflow run state. Accepts a connection URI or a - * Sequelize options object. Falls back to the `DATABASE_URL` environment variable when omitted. - * The agent throws at startup if neither is provided. + * Sequelize options object. The agent throws at startup if it is omitted (unless `inMemory` is + * set). */ database?: { uri?: string; [option: string]: unknown }; /** @@ -77,7 +83,7 @@ export type WorkflowExecutorEmbedOptions = { agentUrl?: string; /** * Loopback port the embedded executor listens on; the agent proxies to it internally. - * Defaults to the `HTTP_PORT` environment variable, or `3400`. + * Defaults to `3400`. */ port?: number; /** Interval in seconds at which the executor polls the orchestrator for pending steps. */ diff --git a/packages/agent/test/agent-workflow-executor.test.ts b/packages/agent/test/agent-workflow-executor.test.ts index b660bafc04..4fd17504e5 100644 --- a/packages/agent/test/agent-workflow-executor.test.ts +++ b/packages/agent/test/agent-workflow-executor.test.ts @@ -21,16 +21,16 @@ jest.mock('@forestadmin/datasource-customizer'); const mockExecutorStart = jest.fn(); const mockExecutorStop = jest.fn(); const mockBuildDatabaseExecutor = jest.fn(); +const mockBuildInMemoryExecutor = jest.fn(); jest.mock('@forestadmin/workflow-executor', () => ({ __esModule: true, buildDatabaseExecutor: (options: unknown) => mockBuildDatabaseExecutor(options), + buildInMemoryExecutor: (options: unknown) => mockBuildInMemoryExecutor(options), })); beforeEach(() => { jest.clearAllMocks(); - delete process.env.DATABASE_URL; - delete process.env.HTTP_PORT; mockMakeRoutes.mockReturnValue([{ setupRoutes: mockSetupRoute, bootstrap: mockBootstrap }]); mockBuildDatabaseExecutor.mockReturnValue({ @@ -38,6 +38,11 @@ beforeEach(() => { stop: mockExecutorStop, state: 'idle', }); + mockBuildInMemoryExecutor.mockReturnValue({ + start: mockExecutorStart, + stop: mockExecutorStop, + state: 'idle', + }); jest .mocked(DataSourceCustomizer.prototype.getDataSource) .mockResolvedValue(factories.dataSource.build()); @@ -64,15 +69,6 @@ describe('Agent.addWorkflowExecutor', () => { expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:5005'); }); - test('falls back to the HTTP_PORT environment variable', () => { - process.env.HTTP_PORT = '4567'; - const agent = new Agent(buildOptions()); - - agent.addWorkflowExecutor(); - - expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:4567'); - }); - test('returns the agent for chaining', () => { const agent = new Agent(buildOptions()); @@ -95,6 +91,14 @@ describe('Agent.addWorkflowExecutor', () => { 'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option', ); }); + + test('throws when both inMemory and database are provided', () => { + const agent = new Agent(buildOptions()); + + expect(() => + agent.addWorkflowExecutor({ inMemory: true, database: { uri: 'postgres://localhost/db' } }), + ).toThrow('`inMemory` and `database` are mutually exclusive'); + }); }); describe('boot on start()', () => { @@ -178,18 +182,6 @@ describe('Agent.addWorkflowExecutor', () => { ); }); - test('falls back to the DATABASE_URL environment variable when no database is provided', async () => { - process.env.DATABASE_URL = 'postgres://env-host/env-db'; - const agent = new Agent(buildOptions()); - agent.addWorkflowExecutor({ agentUrl: 'http://my-agent' }); - - await agent.start(); - - expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith( - expect.objectContaining({ database: { uri: 'postgres://env-host/env-db' } }), - ); - }); - test('derives the agentUrl from the standalone server port and prefix', async () => { const agent = new Agent(buildOptions()); agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } }); @@ -250,6 +242,37 @@ describe('Agent.addWorkflowExecutor', () => { expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled(); }); + test('builds an in-memory executor without any database when inMemory is set', async () => { + const options = buildOptions(); + const agent = new Agent(options); + agent.addWorkflowExecutor({ agentUrl: 'http://my-agent', inMemory: true, port: 4400 }); + + await agent.start(); + + expect(mockBuildInMemoryExecutor).toHaveBeenCalledWith( + expect.objectContaining({ + envSecret: options.envSecret, + authSecret: options.authSecret, + agentUrl: 'http://my-agent', + httpPort: 4400, + manageProcessSignals: false, + }), + ); + // In-memory: the database builder must never be reached. + expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled(); + expect(mockExecutorStart).toHaveBeenCalledTimes(1); + }); + + test('warns that in-memory runs are not persisted', async () => { + const logger = jest.fn(); + const agent = new Agent(buildOptions({ logger })); + agent.addWorkflowExecutor({ agentUrl: 'http://my-agent', inMemory: true }); + + await agent.start(); + + expect(logger).toHaveBeenCalledWith('Warn', expect.stringContaining('in-memory run store')); + }); + test('throws when the agentUrl cannot be derived (not standalone) and is not provided', async () => { const agent = new Agent(buildOptions()); agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } });