Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .agents/skills/redis-use-case-ports/assets/audit-checklist.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,55 @@ The recommended cross-client idiom is to **bypass the library wrapper** and send

---

## 22. Typed `XAUTOCLAIM` wrappers that silently drop the deleted-IDs slot

**What to scan for:** any helper that calls the client library's typed `xautoclaim` / `XAutoClaim` / `StreamAutoClaim` wrapper. Look at the return-type binding: does it expose a third slot (deleted IDs / `deleted_messages` / `deletedIds`) alongside the next-cursor and claimed-messages?

**Pass criterion:** the helper must surface the third slot of the Redis 7+ `XAUTOCLAIM` reply (the IDs whose stream payload was trimmed out before the claim ran). The reference helper's API is `(claimed, deleted_ids)` — and the caller is expected to log/route the deleted IDs to a dead-letter store. If the client library's typed wrapper hides the third slot (extremely common), the helper must drop to a raw-command path (`client.Do("XAUTOCLAIM", ...)`, `Jedis.sendCommand(XAUTOCLAIM, ...)`, `connection.dispatch(CommandType.XAUTOCLAIM, NestedMultiOutput, ...)`, `redis.call('XAUTOCLAIM', ...)`, `redis::cmd("XAUTOCLAIM").query_async(...)`) and parse the three-element reply by hand. **A wrapper that returns `(cursor, messages)` only — with no compile-time hint that a third slot exists — silently makes the dead-letter path invisible.**

**Sample audit prompt:**

> Audit every `XAUTOCLAIM` call site across the 9 client implementations under `content/develop/use-cases/{{USE_CASE_NAME}}/`. For each, identify whether the helper goes through the client library's typed wrapper or through a raw command. For the typed wrappers, verify against the library's documentation or source whether the wrapper surfaces all three reply elements (next-cursor, claimed-messages, deleted-IDs). Flag any helper that uses a typed wrapper whose return type omits the deleted-IDs slot — that helper has silently lost the dead-letter signalling path. Cross-check the helper's `_index.md` "Production usage" prose to confirm the deleted-IDs handling is documented for the reader.

**Why on list:** Streaming use case, Phase 2 cross-port finding. Confirmed in **five** independent ports:

- **go-redis v9.18.0** — `client.XAutoClaim(...)` and `XAutoClaimJustID(...)` both parse the reply and call `rd.DiscardNext()` on the third element. Workaround: `client.Do(ctx, "XAUTOCLAIM", ...)` with manual parsing.
- **Jedis 5.0.1 and 6.2.0** — `xautoclaim(...)` returns `Map.Entry<StreamEntryID, List<StreamEntry>>` (only 2 slots). Workaround: `Jedis.sendCommand(STREAM_AUTOCLAIM, ...)` with manual decode.
- **Lettuce 6.5.0** — `RedisCommands.xautoclaim(...)` returns `ClaimedMessages<K,V>` exposing only the cursor and claimed messages. Workaround: `connection.dispatch(CommandType.XAUTOCLAIM, new NestedMultiOutput<>(...), args)`.
- **redis-rb 5.x** — typed `redis.xautoclaim` is decoded via the generic `HashifyStreamAutoclaim` proc, which drops the third element. Workaround: `redis.call('XAUTOCLAIM', ...)` with manual parsing.
- **redis-rs 0.24** — no typed `xautoclaim` wrapper exists at all, so the helper must use `redis::cmd("XAUTOCLAIM").arg(...).query_async()` directly.

This is the most common class of finding in streaming-style ports. The reference's `(claimed, deleted_ids)` API surface assumed wrappers preserve all three reply elements; they don't. Every future port must verify whether its library's typed wrapper has caught up before relying on it.

---

## 23. Handover-then-delete safety on consumer removal

**What to scan for:** any helper / demo path that removes a consumer from a consumer group. Look for the sequence (a) handover the consumer's pending entries to a peer, then (b) `XGROUP DELCONSUMER`. The handover is typically a per-consumer `XPENDING ... CONSUMER` walk plus `XCLAIM` at `MIN-IDLE-TIME 0`.

**Pass criterion:** the `XGROUP DELCONSUMER` call must run **only after the handover has provably succeeded**. Specifically:

- Every error from the handover path (`XPENDING` failure, `XCLAIM` failure, partial-batch break, deadline timeout, etc.) must abort the removal. Do not log-and-continue.
- The handover must verify the source consumer's PEL is empty before deletion, OR the caller must surface the partial-handover failure so the user can retry.
- The registry-removal step (popping from the in-process workers map) must happen **after** the destructive `DELCONSUMER`, not before — otherwise a thrown exception between map-pop and DELCONSUMER leaves a half-removed worker.

A naked `try { handover() } catch { ignore } finally { delete_consumer() }` is the **wrong shape**. `XGROUP DELCONSUMER` destroys the PEL of the deleted consumer — any entries the handover failed to move are unreachable by `XAUTOCLAIM` afterwards. The destruction is silent: no error, no log on the Redis side, no count of lost messages.

**Sample audit prompt:**

> Audit every consumer-removal path in the 9 client implementations under `content/develop/use-cases/{{USE_CASE_NAME}}/`. For each port's `remove_worker` (or equivalent) helper, trace the error-handling boundary between the `handover_pending` (or equivalent) call and the `XGROUP DELCONSUMER` call. Flag any port where: (a) handover errors are silently swallowed before delete fires; (b) the in-process registry entry is removed before delete fires (so a thrown exception between the two leaves a half-removed worker); (c) a partial-handover return value is accepted without verifying the source consumer's PEL is empty. Cross-check the demo's HTTP `/remove-worker` handler — if it returns 200 on a failed handover, the bug is user-visible.

**Why on list:** Streaming use case, Phase 4b Codex independent review. Targeted Phase 4 audits cleared `remove_worker` paths in `rust`, `go`, `nodejs`, and `dotnet`; Codex's fresh-context review then found that all four shipped variants of the same pattern:

- **rust** ([`demo_server.rs:154-160`](../../../content/develop/use-cases/streaming/rust/demo_server.rs)) — `handover_pending(...).await.unwrap_or(0)` swallows errors, then `delete_consumer` runs unconditionally. `event_stream.rs:367-376` discards `XCLAIM` failures as an empty claim list.
- **go** ([`demo_server.go:187-193`](../../../content/develop/use-cases/streaming/go/demo_server.go)) — `HandoverPending` correctly returns errors, but the caller logs them and continues to `DeleteConsumer`.
- **nodejs** ([`demoServer.js:635-649`](../../../content/develop/use-cases/streaming/nodejs/demoServer.js)) — `handoverPending` breaks and returns a partial count on `xPendingRange` or `xClaim` errors (`eventStream.js:365-399`). `removeWorker` then deletes regardless.
- **dotnet** ([`Program.cs:429-433`](../../../content/develop/use-cases/streaming/dotnet/Program.cs)) — `HandoverPending` catches `RedisServerException` and breaks early (`EventStream.cs:321-333`), returning whatever count it has. The caller stops the worker and deletes the consumer; if `StreamClaim` threw, the worker is already gone from `_workers` before `DELCONSUMER` runs.

The reference (`redis-py/demo_server.py:590-598` + `event_stream.py:263-274`) aborts on handover errors before `delete_consumer` is reached, but the reference's `handover_pending` raises rather than returning partial counts — so the safe pattern is implicit and easy to miss when porting to languages where errors are returned values.

---

## How to add a new row

When a bug class is identified after this skill has been used:
Expand Down
139 changes: 139 additions & 0 deletions content/develop/use-cases/streaming/_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
---
categories:
- docs
- develop
- stack
- oss
- rs
- rc
description: Process ordered event streams with consumer groups, replay, and configurable retention.
hideListLinks: true
linkTitle: Streaming
title: Redis streaming
weight: 5
---

## When to use Redis streaming

Use Redis streaming when you need to process and deliver ordered event streams — user actions, telemetry, transactions, inter-service messages — with consumer groups, replay, and configurable retention, without standing up a dedicated streaming platform.

## Why the problem is hard

Continuous event flows pushed through primary databases or ad-hoc queues add latency on the request path, make backpressure hard to control, and tightly couple producers to consumers. Some of the obvious workarounds have real drawbacks:

- **A dedicated streaming platform** (Kafka, Pulsar) solves all of this but adds significant
operational overhead — separate clusters, partition management, consumer rebalancing — that's
disproportionate when retention windows are hours or days, not months.
- **Pub/sub** ([Redis Pub/Sub]({{< relref "/develop/pubsub" >}}), MQTT) is fire-and-forget
transport: messages are delivered to whoever is connected and discarded, with no persistence,
replay, or consumer tracking.
- **Polling a primary database for new rows** generates constant load on the system of record,
struggles to order events from concurrent writers, and offers no replay or per-consumer cursor.

A workable streaming layer needs an ordered, durable log, independent consumer tracking with
acknowledgment, at-least-once delivery, and retention controls — all without introducing a
separate broker for moderate-scale workloads.

This pattern is distinct from [pub/sub]({{< relref "/develop/use-cases/pub-sub" >}}), which is
at-most-once transport with no history: a subscriber that's offline when a message is published
misses it for good. It is also distinct from a
[job queue]({{< relref "/develop/use-cases/job-queue" >}}), where each task is claimed by exactly
one worker and discarded after it completes. Streaming retains the ordered history, so many
independent consumer groups can read the same events at their own pace and replay from any point.

## What you can expect from a Redis solution

You can:

- Deliver ordered events to multiple independent consumer groups, each processing the full
stream at its own pace.
- Scale consumers horizontally within a group to share work across workers, with at-least-once
delivery and per-consumer tracking.
- Replay historical events for debugging, bootstrapping a new projection, or rebuilding a
downstream system from scratch.
- Bound memory by retaining events by length or by minimum ID, without a separate cleanup job.
- Recover unacknowledged entries from crashed consumers, so a worker dying mid-message does not
silently lose work (entries trimmed by `MAXLEN ~` before they are acked are surfaced in
`XAUTOCLAIM`'s deleted-IDs list, so the caller can route them to a dead-letter store rather
than retry against a missing payload).
- Partition streams by tenant, region, or entity for load distribution and per-entity event
sourcing.
- Replace a dedicated Kafka deployment for moderate-scale, short-retention streaming workloads
using infrastructure you already run.

## How Redis supports the solution

In practice, producers append events to a stream with
[`XADD`]({{< relref "/commands/xadd" >}}) and Redis assigns each entry an auto-generated,
time-ordered ID. Consumers either read the stream directly with
[`XREAD`]({{< relref "/commands/xread" >}}), or join a *consumer group* and read with
[`XREADGROUP`]({{< relref "/commands/xreadgroup" >}}). Each consumer gets its own
pending-entries list of in-flight messages, while the group as a whole tracks a single
`last-delivered-id` cursor that advances as entries are handed out to any consumer. Once a consumer finishes processing an
entry, it acknowledges it with [`XACK`]({{< relref "/commands/xack" >}}); entries left
unacknowledged past a timeout can be reassigned to a healthy consumer with
[`XCLAIM`]({{< relref "/commands/xclaim" >}}) or
[`XAUTOCLAIM`]({{< relref "/commands/xautoclaim" >}}).

Redis provides the following features that make it a good fit for streaming:

- [Streams]({{< relref "/develop/data-types/streams" >}})
([`XADD`]({{< relref "/commands/xadd" >}}),
[`XLEN`]({{< relref "/commands/xlen" >}})) provide an append-only log with auto-generated
time-ordered IDs, so ordering is intrinsic to the data structure rather than something the
application has to maintain.
- [Consumer groups]({{< relref "/develop/data-types/streams#consumer-groups" >}})
([`XREADGROUP`]({{< relref "/commands/xreadgroup" >}}),
[`XACK`]({{< relref "/commands/xack" >}})) give at-least-once delivery with per-consumer
cursors and acknowledgment, so workers in a group share the stream's work and multiple groups
read the same stream independently.
- [`XRANGE`]({{< relref "/commands/xrange" >}}) and
[`XREVRANGE`]({{< relref "/commands/xrevrange" >}}) support replay and range queries —
bootstrap a new projection from the start of the stream, audit recent events, or run
point-in-time reads by ID range.
- [`XPENDING`]({{< relref "/commands/xpending" >}}),
[`XCLAIM`]({{< relref "/commands/xclaim" >}}), and
[`XAUTOCLAIM`]({{< relref "/commands/xautoclaim" >}}) recover messages a crashed consumer
left in flight, so no event sits invisibly past its processing window.
- Retention controls — [`XADD ... MAXLEN ~ n`]({{< relref "/commands/xadd" >}}) and
[`XTRIM MINID ~ id`]({{< relref "/commands/xtrim" >}}) — bound stream size by length or by
oldest event, so memory stays bounded as the stream rolls forward.
- Sub-millisecond reads and writes from memory, so streaming runs on the same Redis instance
already handling cache, sessions, or rate limiting at zero marginal cost.

## Ecosystem

The following libraries and frameworks use Redis Streams for event-driven workloads:

- **Java**:
[Spring Data Redis Streams](https://docs.spring.io/spring-data/redis/reference/redis/redis-streams.html)
for consumer-group processing with producer/consumer abstractions and pending-entries handling.
- **Node.js**: [`node-redis`](https://github.com/redis/node-redis) and
[`ioredis`](https://github.com/redis/ioredis) for stream producers and consumers in
event-driven APIs.
- **Python**: [`redis-py`](https://redis.readthedocs.io/) with
[FastAPI](https://fastapi.tiangolo.com/) or [Django](https://www.djangoproject.com/) for
microservice event pipelines.
- **Infrastructure**:
[Active-Active geo-distribution]({{< relref "/operate/rs/databases/active-active" >}}) on
Redis Enterprise / Redis Cloud for cross-region stream replication;
[Azure Managed Redis](https://azure.microsoft.com/en-us/products/managed-redis) with
[Azure Functions](https://azure.microsoft.com/en-us/products/functions) for serverless event
backbones.

## Code examples to build your own Redis streaming pipeline

The following guides show how to build a simple Redis-backed event stream with producers and
consumer groups. Each guide includes a runnable interactive demo that lets you produce events,
scale consumers within a group, replay history from any point, and watch independent groups
read the same stream at their own pace.

* [redis-py (Python)]({{< relref "/develop/use-cases/streaming/redis-py" >}})
* [node-redis (Node.js)]({{< relref "/develop/use-cases/streaming/nodejs" >}})
* [go-redis (Go)]({{< relref "/develop/use-cases/streaming/go" >}})
* [Jedis (Java)]({{< relref "/develop/use-cases/streaming/java-jedis" >}})
* [Lettuce (Java)]({{< relref "/develop/use-cases/streaming/java-lettuce" >}})
* [StackExchange.Redis (C#)]({{< relref "/develop/use-cases/streaming/dotnet" >}})
* [Predis (PHP)]({{< relref "/develop/use-cases/streaming/php" >}})
* [redis-rb (Ruby)]({{< relref "/develop/use-cases/streaming/ruby" >}})
* [redis-rs (Rust)]({{< relref "/develop/use-cases/streaming/rust" >}})
Loading
Loading