Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ begin
max_attempts integer,
cancellation jsonb,
parent_task_id uuid,
idempotency_key text,
enqueue_at timestamptz not null default durable.current_time(),
first_started_at timestamptz,
state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')),
Expand All @@ -104,10 +105,17 @@ begin
't_' || p_queue_name
);

-- Idempotency might be added after the table was created; handle both cases
execute format(
'alter table durable.%I add column if not exists idempotency_key text',
't_' || p_queue_name
);

execute format('comment on column durable.%I.params is %L', 't_' || p_queue_name, 'User-defined. Task input parameters. Schema depends on Task::Params type.');
execute format('comment on column durable.%I.headers is %L', 't_' || p_queue_name, 'User-defined. Optional key-value metadata as {"key": <any JSON value>}.');
execute format('comment on column durable.%I.retry_strategy is %L', 't_' || p_queue_name, '{"kind": "none"} | {"kind": "fixed", "base_seconds": <u64>} | {"kind": "exponential", "base_seconds": <u64>, "factor": <f64>, "max_seconds": <u64>}');
execute format('comment on column durable.%I.cancellation is %L', 't_' || p_queue_name, '{"max_delay": <seconds>, "max_duration": <seconds>} - both optional. max_delay: cancel if not started within N seconds of enqueue. max_duration: cancel if not completed within N seconds of first start.');
execute format('comment on column durable.%I.idempotency_key is %L', 't_' || p_queue_name, 'Optional dedup key. When set, only one non-terminal task with this key can exist. Set via SpawnOptions.only_once or SpawnOptions.idempotency_key.');
execute format('comment on column durable.%I.completed_payload is %L', 't_' || p_queue_name, 'User-defined. Task return value. Schema depends on Task::Output type.');

execute format(
Expand Down Expand Up @@ -215,6 +223,13 @@ begin
't_' || p_queue_name
);

-- Idempotency key unique index (partial: only non-null keys)
execute format(
'create unique index if not exists %I on durable.%I (idempotency_key) where idempotency_key is not null',
('t_' || p_queue_name) || '_ik',
't_' || p_queue_name
);

-- Speed up claim timeout scans.
execute format(
'create index if not exists %I on durable.%I (claim_expires_at)
Expand Down Expand Up @@ -353,8 +368,10 @@ declare
v_max_attempts integer;
v_cancellation jsonb;
v_parent_task_id uuid;
v_idempotency_key text;
v_now timestamptz := durable.current_time();
v_params jsonb := coalesce(p_params, 'null'::jsonb);
v_existing_task_id uuid;
begin
if p_task_name is null or length(trim(p_task_name)) = 0 then
raise exception 'task_name must be provided';
Expand All @@ -370,16 +387,48 @@ begin
end if;
end if;
v_cancellation := p_options->'cancellation';
-- Extract parent_task_id for subtask tracking
v_parent_task_id := (p_options->>'parent_task_id')::uuid;

-- Resolve idempotency key: explicit key takes precedence over only_once
v_idempotency_key := p_options->>'idempotency_key';
if v_idempotency_key is null and (p_options->>'only_once')::boolean = true then
v_idempotency_key := md5(p_task_name || '::' || v_params::text);
end if;
end if;

-- Idempotency check: return existing non-terminal task if key matches
if v_idempotency_key is not null then
execute format(
'select t.task_id from durable.%I t
where t.idempotency_key = $1
and t.state not in (''completed'', ''failed'', ''cancelled'')
limit 1',
't_' || p_queue_name
)
into v_existing_task_id
using v_idempotency_key;

if v_existing_task_id is not null then
return query
execute format(
'select t.task_id, r.run_id, r.attempt
from durable.%I t
join durable.%I r on r.task_id = t.task_id and r.run_id = t.last_attempt_run
where t.task_id = $1',
't_' || p_queue_name,
'r_' || p_queue_name
)
using v_existing_task_id;
return;
end if;
end if;

execute format(
'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, null, ''pending'', $10, $11, null, null)',
'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, idempotency_key, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, null, ''pending'', $11, $12, null, null)',
't_' || p_queue_name
)
using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_now, v_attempt, v_run_id;
using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_idempotency_key, v_now, v_attempt, v_run_id;

execute format(
'insert into durable.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason)
Expand Down
6 changes: 6 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ struct SpawnOptionsDb<'a> {
cancellation: Option<CancellationPolicyDb>,
#[serde(skip_serializing_if = "Option::is_none")]
parent_task_id: Option<&'a Uuid>,
#[serde(skip_serializing_if = "std::ops::Not::not")]
only_once: bool,
#[serde(skip_serializing_if = "Option::is_none")]
idempotency_key: Option<&'a str>,
}

/// Internal struct for serializing cancellation policy (only non-None fields).
Expand Down Expand Up @@ -597,6 +601,8 @@ where
.as_ref()
.and_then(CancellationPolicyDb::from_policy),
parent_task_id: options.parent_task_id.as_ref(),
only_once: options.only_once,
idempotency_key: options.idempotency_key.as_deref(),
};
serde_json::to_value(db_options)
}
Expand Down
Loading
Loading