Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion WORKFLOW-EXECUTOR-CONTRACT.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ interface LoadRelatedRecordPendingData {
{
userConfirmed: boolean;
name?: string; // override relation
displayName?: string;
selectedRecordId?: Array<string | number>; // min 1 element
// displayName is NOT accepted — derived from FieldSchema after resolving name.
}
```

Expand Down
50 changes: 30 additions & 20 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Comment thread
Tonours marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
IStepExecutor,
StepExecutionResult,
} from '../types/execution-context';
import type { StepExecutionData } from '../types/step-execution-data';
import type { StepExecutionData, WithUserConfirmation } from '../types/step-execution-data';
import type { StepDefinition } from '../types/validated/step-definition';
import type { StepStatus } from '../types/validated/step-outcome';
import type {
Expand All @@ -27,7 +27,9 @@
import patchBodySchemas from '../http/pending-data-validators';
import StepSummaryBuilder from './summary/step-summary-builder';

type WithPendingData = StepExecutionData & { pendingData?: object };
type WithPendingData = StepExecutionData & { pendingData?: object } & WithUserConfirmation;

type PatchBody = Record<string, unknown> & { userConfirmed?: boolean };

export default abstract class BaseStepExecutor<TStep extends StepDefinition = StepDefinition>
implements IStepExecutor
Expand Down Expand Up @@ -211,36 +213,44 @@
);
}

// Preserves the AI suggestion in pendingData: only userConfirmed is mirrored there because
// handleConfirmationFlow gates on it. The full parsed PATCH body is stored in userConfirmation.
protected async patchAndReloadPendingData<TExec extends WithPendingData>(
pendingData?: unknown,
): Promise<TExec | undefined> {
const { type } = this.context.stepDefinition;
const execution = await this.findPendingExecution<TExec>(type);

if (pendingData !== undefined && execution) {
const schema = patchBodySchemas[execution.type]!;
const parsed = schema.safeParse(pendingData);
if (!execution) return undefined;

if (!parsed.success) {
throw new StepStateError(
`Invalid pending data: ${parsed.error.issues.map(i => i.message).join(', ')}`,
);
}
if (pendingData === undefined) return execution;

const updated = {
...execution,
pendingData: { ...(execution.pendingData as object), ...(parsed.data as object) },
} as TExec;
const schema = patchBodySchemas[execution.type]!;

Check warning on line 228 in packages/workflow-executor/src/executors/base-step-executor.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Forbidden non-null assertion
const parsed = schema.safeParse(pendingData);

await this.context.runStore.saveStepExecution(
this.context.runId,
updated as StepExecutionData,
if (!parsed.success) {
throw new StepStateError(
`Invalid pending data: ${parsed.error.issues.map(i => i.message).join(', ')}`,
);

return updated;
}

return execution;
const patchBody = parsed.data as PatchBody;

// Last-write-wins: spread-merging would leak stale keys from prior PATCHes.
const updated: TExec = {
...execution,
pendingData: {
...execution.pendingData,
...(patchBody.userConfirmed !== undefined
? { userConfirmed: patchBody.userConfirmed }
: {}),
},
userConfirmation: patchBody as TExec['userConfirmation'],
};

await this.context.runStore.saveStepExecution(this.context.runId, updated);

return updated;
}

// userConfirmed branches: undefined → re-emit awaiting-input (PATCH not yet called);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,31 +138,31 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
return this.persistAndReturn(record, target, undefined);
}

// Branch A: builds RecordRef from pendingData.selectedRecordId without a new getRelatedData call.
// Re-derives relatedCollectionName so a user-overridden relation name is handled correctly.
// Branch A: builds RecordRef from the user-confirmed selection without a new getRelatedData call.
private async resolveFromSelection(
execution: LoadRelatedRecordStepExecutionData,
): Promise<StepExecutionResult> {
const { selectedRecordRef, pendingData } = execution;
const { selectedRecordRef, pendingData, userConfirmation } = execution;

if (!pendingData) {
throw new StepStateError(`Step at index ${this.context.stepIndex} has no pending data`);
}

const { name, displayName, selectedRecordId } = pendingData;
const name = userConfirmation?.name ?? pendingData.name;
Comment thread
Scra3 marked this conversation as resolved.
const selectedRecordId = userConfirmation?.selectedRecordId ?? pendingData.selectedRecordId;

// Re-derive relatedCollectionName from schema using the (possibly updated) relation name.
// `name` is always a fieldName (set from field.fieldName in buildTarget) — search directly.
// Re-derive relatedCollectionName and displayName because the user may have swapped the relation.
const schema = await this.getCollectionSchema(selectedRecordRef.collectionName);
const field = schema.fields.find(f => f.fieldName === name);
const relatedCollectionName = field?.relatedCollectionName;

if (!relatedCollectionName) {
if (!field?.relatedCollectionName) {
throw new StepStateError(
`Step at index ${this.context.stepIndex} could not resolve relatedCollectionName for relation "${name}"`,
);
}

const { displayName, relatedCollectionName } = field;

const record: RecordRef = {
collectionName: relatedCollectionName,
recordId: selectedRecordId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
return this.handleConfirmationFlow<TriggerRecordActionStepExecutionData>(
pending,
async exec => {
const { selectedRecordRef, pendingData } = exec;
const { selectedRecordRef, pendingData, userConfirmation } = exec;

// The frontend executes the action itself and posts the result back.
// A confirmed step without actionResult is a broken frontend contract.
if (!pendingData || !('actionResult' in pendingData)) {
if (!pendingData || !userConfirmation || !('actionResult' in userConfirmation)) {
throw new StepStateError(
`Frontend confirmed action but did not provide actionResult ` +
`(run "${this.context.runId}", step ${this.context.stepIndex})`,
Expand All @@ -85,7 +85,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
name: pendingData.name,
};

return this.saveFrontendResult(target, pendingData.actionResult, exec);
return this.saveFrontendResult(target, userConfirmation.actionResult, exec);
},
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,15 @@ export default class UpdateRecordStepExecutor extends RecordStepExecutor<UpdateR

if (pending) {
return this.handleConfirmationFlow<UpdateRecordStepExecutionData>(pending, async exec => {
const { selectedRecordRef, pendingData } = exec;
const { selectedRecordRef, pendingData, userConfirmation } = exec;
const userValue =
userConfirmation && 'value' in userConfirmation
? userConfirmation.value
: (pendingData as { value: unknown })?.value;
const target: UpdateTarget = {
selectedRecordRef,
...(pendingData as FieldRef & { value: unknown }),
...(pendingData as FieldRef),
value: userValue,
};

return this.resolveAndUpdate(target, exec);
Expand Down
109 changes: 64 additions & 45 deletions packages/workflow-executor/src/http/pending-data-validators.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,72 @@
import type { StepExecutionData } from '../types/step-execution-data';

import { z } from 'zod';

// Per-step-type schemas for the `pendingData` payload sent by the front via
// POST /runs/:runId/trigger. Consumed by step executors to validate `incomingPendingData`
// before applying user confirmation or override. Schemas use .strict() to reject unknown fields.
const patchBodySchemas: Partial<Record<StepExecutionData['type'], z.ZodTypeAny>> = {
'update-record': z
.object({
userConfirmed: z.boolean(),
value: z.string().optional(), // user may override the AI-proposed value
})
.strict(),

'trigger-action': z
.object({
userConfirmed: z.boolean(),
// Opaque action result from the frontend. Required when userConfirmed=true; the
// presence check lives in the step-executor so a descriptive StepStateError can
// name the runId/stepIndex — not achievable from inside a zod schema.
actionResult: z.unknown().optional(),
})
.strict(),

mcp: z.object({ userConfirmed: z.boolean() }).strict(),

'load-related-record': z
.object({
userConfirmed: z.boolean(),
// User may intentionally switch to a different relation than the one the AI selected.
// The executor re-derives relatedCollectionName from FieldSchema when processing the confirmation.
name: z.string().optional(),
displayName: z.string().optional(),
// User may override the AI-selected record; must be non-empty when provided.
selectedRecordId: z
.array(z.union([z.string(), z.number()]))
.min(1)
.optional(),
})
.strict(),
// relatedCollectionName and suggestedFields are NOT accepted — internal executor data.

guidance: z
.object({
userInput: z.string().optional(),
})
.strict(),

condition: z.object({ selectedOption: z.string() }).strict(),

const updateRecordPatchSchema = z
.object({
userConfirmed: z.boolean(),
value: z.string().optional(), // user may override the AI-proposed value
})
.strict();

const triggerActionPatchSchema = z
.object({
userConfirmed: z.boolean(),
// Opaque action result from the frontend. Required when userConfirmed=true; the
// presence check lives in the step-executor so a descriptive StepStateError can
// name the runId/stepIndex — not achievable from inside a zod schema.
actionResult: z.unknown().optional(),
})
.strict();

const mcpPatchSchema = z.object({ userConfirmed: z.boolean() }).strict();

const loadRelatedRecordPatchSchema = z
.object({
userConfirmed: z.boolean(),
// User may intentionally switch to a different relation than the one the AI selected.
// The executor re-derives relatedCollectionName and displayName from FieldSchema when
// processing the confirmation.
name: z.string().min(1).optional(),
// User may override the AI-selected record; must be non-empty when provided.
// Required when overriding the relation name — the original record ID belongs to a
// different collection and cannot be reused for the new relation.
selectedRecordId: z
.array(z.union([z.string(), z.number()]))
.min(1)
.optional(),
})
.strict()
.refine(data => data.name === undefined || data.selectedRecordId !== undefined, {
message: 'selectedRecordId is required when overriding the relation name',
});
// relatedCollectionName, displayName and suggestedFields are NOT accepted — internal executor data.

const guidancePatchSchema = z
.object({
userInput: z.string().optional(),
})
.strict();

const conditionPatchSchema = z.object({ selectedOption: z.string() }).strict();

// Inferred types — consumed by step-execution-data.ts to type `userConfirmation` precisely,
// removing the need for runtime type guards in executors.
export type UpdateRecordConfirmation = z.infer<typeof updateRecordPatchSchema>;
export type TriggerActionConfirmation = z.infer<typeof triggerActionPatchSchema>;
export type McpConfirmation = z.infer<typeof mcpPatchSchema>;
export type LoadRelatedRecordConfirmation = z.infer<typeof loadRelatedRecordPatchSchema>;
export type GuidanceConfirmation = z.infer<typeof guidancePatchSchema>;

const patchBodySchemas: Partial<Record<string, z.ZodTypeAny>> = {
'update-record': updateRecordPatchSchema,
'trigger-action': triggerActionPatchSchema,
mcp: mcpPatchSchema,
'load-related-record': loadRelatedRecordPatchSchema,
guidance: guidancePatchSchema,
condition: conditionPatchSchema,
};

export default patchBodySchemas;
32 changes: 25 additions & 7 deletions packages/workflow-executor/src/types/step-execution-data.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
/** @draft Types derived from the workflow-executor spec -- subject to change. */

import type { RecordRef } from './validated/collection';
import type {
LoadRelatedRecordConfirmation,
McpConfirmation,
TriggerActionConfirmation,
UpdateRecordConfirmation,
} from '../http/pending-data-validators';

// -- Base --

Expand All @@ -14,6 +20,12 @@ interface MutatingStepExecutionData extends BaseStepExecutionData {
idempotencyPhase?: 'executing' | 'done';
}

// Parsed PATCH body kept beside `pendingData` so executors can read the user's
// final input without overwriting the AI suggestion.
export interface WithUserConfirmation<T extends Record<string, unknown> = Record<string, unknown>> {
userConfirmation?: T;
}

// -- Condition --

export interface ConditionStepExecutionData extends BaseStepExecutionData {
Expand Down Expand Up @@ -50,7 +62,9 @@ export interface ReadRecordStepExecutionData extends BaseStepExecutionData {

// -- Update Record --

export interface UpdateRecordStepExecutionData extends MutatingStepExecutionData {
export interface UpdateRecordStepExecutionData
extends MutatingStepExecutionData,
WithUserConfirmation<UpdateRecordConfirmation> {
type: 'update-record';
executionParams?: FieldRef & { value: unknown };
// User confirmed → values returned by updateRecord. User rejected → skipped.
Expand All @@ -73,13 +87,13 @@ export interface RelationRef {
displayName: string;
}

export interface TriggerRecordActionStepExecutionData extends MutatingStepExecutionData {
export interface TriggerRecordActionStepExecutionData
extends MutatingStepExecutionData,
WithUserConfirmation<TriggerActionConfirmation> {
type: 'trigger-action';
executionParams?: ActionRef;
executionResult?: { success: true; actionResult: unknown } | { skipped: true };
// When userConfirmed=true, actionResult is required: the frontend executes the action and
// posts the result back (the executor never re-executes on confirmation).
pendingData?: ActionRef & { userConfirmed?: boolean; actionResult?: unknown };
pendingData?: ActionRef & { userConfirmed?: boolean };
selectedRecordRef: RecordRef;
}

Expand All @@ -95,7 +109,9 @@ export interface McpToolCall extends McpToolRef {
input: Record<string, unknown>;
}

export interface McpStepExecutionData extends MutatingStepExecutionData {
export interface McpStepExecutionData
extends MutatingStepExecutionData,
WithUserConfirmation<McpConfirmation> {
type: 'mcp';
executionParams?: McpToolCall;
executionResult?:
Expand Down Expand Up @@ -123,7 +139,9 @@ export interface LoadRelatedRecordPendingData extends RelationRef {
userConfirmed?: boolean;
}

export interface LoadRelatedRecordStepExecutionData extends BaseStepExecutionData {
export interface LoadRelatedRecordStepExecutionData
extends BaseStepExecutionData,
WithUserConfirmation<LoadRelatedRecordConfirmation> {
type: 'load-related-record';
pendingData?: LoadRelatedRecordPendingData;
selectedRecordRef: RecordRef;
Expand Down
Loading
Loading