Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions packages/agent/src/embedded-workflow-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { WorkflowExecutor } from '@forestadmin/workflow-executor';

import path from 'path';

/** Default loopback port for an embedded workflow executor (mirrors the executor CLI's HTTP_PORT). */
/** Default loopback port for an embedded workflow executor. */
const DEFAULT_EMBEDDED_EXECUTOR_PORT = 3400;

/**
Expand Down Expand Up @@ -55,8 +55,14 @@ export default class EmbeddedWorkflowExecutor {
);
}

const port =
embedOptions.port ?? (Number(process.env.HTTP_PORT) || DEFAULT_EMBEDDED_EXECUTOR_PORT);
if (embedOptions.inMemory && embedOptions.database) {
throw new Error(
'addWorkflowExecutor: `inMemory` and `database` are mutually exclusive. The in-memory run ' +
'store persists nothing; drop one of the two options.',
);
}

const port = embedOptions.port ?? DEFAULT_EMBEDDED_EXECUTOR_PORT;
this.config = { ...embedOptions, port };

return `http://127.0.0.1:${port}`;
Expand Down Expand Up @@ -84,19 +90,9 @@ export default class EmbeddedWorkflowExecutor {
);
}

const database =
config.database ?? (process.env.DATABASE_URL ? { uri: process.env.DATABASE_URL } : undefined);
const { buildDatabaseExecutor, buildInMemoryExecutor } = await this.importPackage();

if (!database) {
throw new Error(
'Embedded workflow executor requires a database to persist run state. Pass `database` to ' +
'addWorkflowExecutor() or set the DATABASE_URL environment variable.',
);
}

const { buildDatabaseExecutor } = await this.importPackage();

this.executor = buildDatabaseExecutor({
const commonOptions = {
envSecret: this.options.envSecret,
authSecret: this.options.authSecret,
forestServerUrl: this.options.forestServerUrl,
Expand All @@ -108,8 +104,32 @@ export default class EmbeddedWorkflowExecutor {
// Embedded: the host process owns SIGTERM/SIGINT; the executor must not exit it. agent.stop()
// drains the executor explicitly.
manageProcessSignals: false,
database: database as Parameters<typeof buildDatabaseExecutor>[0]['database'],
});
};

if (config.inMemory) {
this.options.logger(
'Warn',
formatLog(
'Using an in-memory run store: workflow runs are kept in memory and lost on restart. ' +
'Pass a `database` to addWorkflowExecutor() to persist them in production.',
),
);
this.executor = buildInMemoryExecutor(commonOptions);
} else {
const { database } = config;

if (!database) {
throw new Error(
'Embedded workflow executor requires a database to persist run state. Pass `database` ' +
'to addWorkflowExecutor(), or use `inMemory: true`.',
);
}

this.executor = buildDatabaseExecutor({
...commonOptions,
database: database as Parameters<typeof buildDatabaseExecutor>[0]['database'],
});
}

await this.executor.start();
this.options.logger(
Expand Down
12 changes: 9 additions & 3 deletions packages/agent/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,16 @@ export type AgentOptions = {
* through `agent.addWorkflowExecutor()`.
*/
export type WorkflowExecutorEmbedOptions = {
/**
* Use an in-memory run store instead of a database. No database is required, but runs are lost
* when the process restarts, so it is not meant for production. Mutually exclusive with
* `database`.
*/
inMemory?: boolean;
/**
* Database connection used to persist workflow run state. Accepts a connection URI or a
* Sequelize options object. Falls back to the `DATABASE_URL` environment variable when omitted.
* The agent throws at startup if neither is provided.
* Sequelize options object. The agent throws at startup if it is omitted (unless `inMemory` is
* set).
*/
database?: { uri?: string; [option: string]: unknown };
/**
Expand All @@ -77,7 +83,7 @@ export type WorkflowExecutorEmbedOptions = {
agentUrl?: string;
/**
* Loopback port the embedded executor listens on; the agent proxies to it internally.
* Defaults to the `HTTP_PORT` environment variable, or `3400`.
* Defaults to `3400`.
*/
port?: number;
/** Interval in seconds at which the executor polls the orchestrator for pending steps. */
Expand Down
69 changes: 46 additions & 23 deletions packages/agent/test/agent-workflow-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,28 @@ jest.mock('@forestadmin/datasource-customizer');
const mockExecutorStart = jest.fn();
const mockExecutorStop = jest.fn();
const mockBuildDatabaseExecutor = jest.fn();
const mockBuildInMemoryExecutor = jest.fn();

jest.mock('@forestadmin/workflow-executor', () => ({
__esModule: true,
buildDatabaseExecutor: (options: unknown) => mockBuildDatabaseExecutor(options),
buildInMemoryExecutor: (options: unknown) => mockBuildInMemoryExecutor(options),
}));

beforeEach(() => {
jest.clearAllMocks();
delete process.env.DATABASE_URL;
delete process.env.HTTP_PORT;

mockMakeRoutes.mockReturnValue([{ setupRoutes: mockSetupRoute, bootstrap: mockBootstrap }]);
mockBuildDatabaseExecutor.mockReturnValue({
start: mockExecutorStart,
stop: mockExecutorStop,
state: 'idle',
});
mockBuildInMemoryExecutor.mockReturnValue({
start: mockExecutorStart,
stop: mockExecutorStop,
state: 'idle',
});
jest
.mocked(DataSourceCustomizer.prototype.getDataSource)
.mockResolvedValue(factories.dataSource.build());
Expand All @@ -64,15 +69,6 @@ describe('Agent.addWorkflowExecutor', () => {
expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:5005');
});

test('falls back to the HTTP_PORT environment variable', () => {
process.env.HTTP_PORT = '4567';
const agent = new Agent(buildOptions());

agent.addWorkflowExecutor();

expect((agent as any).options.workflowExecutorUrl).toBe('http://127.0.0.1:4567');
});

test('returns the agent for chaining', () => {
const agent = new Agent(buildOptions());

Expand All @@ -95,6 +91,14 @@ describe('Agent.addWorkflowExecutor', () => {
'Cannot use addWorkflowExecutor together with the workflowExecutorUrl option',
);
});

test('throws when both inMemory and database are provided', () => {
const agent = new Agent(buildOptions());

expect(() =>
agent.addWorkflowExecutor({ inMemory: true, database: { uri: 'postgres://localhost/db' } }),
).toThrow('`inMemory` and `database` are mutually exclusive');
});
});

describe('boot on start()', () => {
Expand Down Expand Up @@ -178,18 +182,6 @@ describe('Agent.addWorkflowExecutor', () => {
);
});

test('falls back to the DATABASE_URL environment variable when no database is provided', async () => {
process.env.DATABASE_URL = 'postgres://env-host/env-db';
const agent = new Agent(buildOptions());
agent.addWorkflowExecutor({ agentUrl: 'http://my-agent' });

await agent.start();

expect(mockBuildDatabaseExecutor).toHaveBeenCalledWith(
expect.objectContaining({ database: { uri: 'postgres://env-host/env-db' } }),
);
});

test('derives the agentUrl from the standalone server port and prefix', async () => {
const agent = new Agent(buildOptions());
agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } });
Expand Down Expand Up @@ -250,6 +242,37 @@ describe('Agent.addWorkflowExecutor', () => {
expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled();
});

test('builds an in-memory executor without any database when inMemory is set', async () => {
const options = buildOptions();
const agent = new Agent(options);
agent.addWorkflowExecutor({ agentUrl: 'http://my-agent', inMemory: true, port: 4400 });

await agent.start();

expect(mockBuildInMemoryExecutor).toHaveBeenCalledWith(
expect.objectContaining({
envSecret: options.envSecret,
authSecret: options.authSecret,
agentUrl: 'http://my-agent',
httpPort: 4400,
manageProcessSignals: false,
}),
);
// In-memory: the database builder must never be reached.
expect(mockBuildDatabaseExecutor).not.toHaveBeenCalled();
expect(mockExecutorStart).toHaveBeenCalledTimes(1);
});

test('warns that in-memory runs are not persisted', async () => {
const logger = jest.fn();
const agent = new Agent(buildOptions({ logger }));
agent.addWorkflowExecutor({ agentUrl: 'http://my-agent', inMemory: true });

await agent.start();

expect(logger).toHaveBeenCalledWith('Warn', expect.stringContaining('in-memory run store'));
});

test('throws when the agentUrl cannot be derived (not standalone) and is not provided', async () => {
const agent = new Agent(buildOptions());
agent.addWorkflowExecutor({ database: { uri: 'postgres://localhost/db' } });
Expand Down
Loading