Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@
# PUBLISH_AZURE_SERVICEBUS_TOPIC="outpost-publish"
# PUBLISH_AZURE_SERVICEBUS_SUBSCRIPTION="outpost-publish-sub"

## NATS JetStream
# Comma-separated NATS server URLs (cluster-aware).
# PUBLISH_NATS_SERVERS="nats://nats:4222"
# Per-tenant accounts live in subdirectories of this dir. Watched at runtime.
# Each subdirectory contains a meta.yaml plus an optional user.creds file.
# PUBLISH_NATS_ACCOUNTS_DIR="/etc/outpost/nats-accounts"

# ============================== Others ==============================

# Portal
Expand Down
1 change: 1 addition & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ TEST_CLICKHOUSE_URL="localhost:39000"
# MQs
TEST_RABBITMQ_URL="localhost:35672"
TEST_KAFKA_URL="localhost:39092"
TEST_NATS_URL=""
TEST_LOCALSTACK_URL="localhost:34566"
TEST_GCP_URL="localhost:38085"
TEST_AZURE_SB_CONNSTRING="Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
Expand Down
22 changes: 22 additions & 0 deletions .outpost.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ mqs:
delivery_queue: "outpost-delivery" # SQS queue name for delivery events
log_queue: "outpost-log" # SQS queue name for log events

# Publish Message Queue (optional)
# Configure one provider to ingest events from an external source instead
# of (or in addition to) the Publish API. Only one provider may be set.
# publishmq:
# rabbitmq:
# server_url: "amqp://guest:guest@localhost:5673"
# exchange: "outpost"
# queue: "publish"
# nats:
# servers:
# - "nats://nats:4222"
# # Watched directory of per-tenant account subdirectories.
# # Each subdirectory holds a meta.yaml and optional user.creds.
# accounts_dir: "/etc/outpost/nats-accounts"
# # Static list of accounts (combined with accounts_dir if both are set).
# accounts:
# - name: "acme"
# credentials_file: "/etc/outpost/acme.creds"
# stream: "events"
# consumer: "outpost"
# tenant_id: "acme" # Outpost stamps this on every event; overrides payload.

# Application Configuration
aes_encryption_secret: "" # Secret for AES encryption
topics: # List of topics to subscribe to
Expand Down
2 changes: 2 additions & 0 deletions cmd/publish/declare_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func handleDeclare(w http.ResponseWriter, r *http.Request) {
err = declareGCP()
case "rabbitmq":
err = declareRabbitMQ()
case "nats":
err = declareNATS()
case "http":
log.Println("[*] Declare HTTP - nothing to declare")
default:
Expand Down
2 changes: 2 additions & 0 deletions cmd/publish/publish_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func handlePublish(w http.ResponseWriter, r *http.Request) {
err = publishGCP(body)
case "rabbitmq":
err = publishRabbitMQ(body)
case "nats":
err = publishNATS(body)
case "http":
fallthrough
default:
Expand Down
123 changes: 123 additions & 0 deletions cmd/publish/publish_nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"context"
"encoding/json"
"log"
"os"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

// Defaults match examples/docker-compose/compose-publish-nats.yml.
const (
natsDefaultURL = "nats://localhost:4222"
natsDefaultStream = "outpost-publish"
natsDefaultSubject = "outpost.publish"
natsDefaultConsumer = "outpost"
)

func natsURL() string {
if v := os.Getenv("PUBLISH_NATS_URL"); v != "" {
return v
}
return natsDefaultURL
}

func natsSubject() string {
if v := os.Getenv("PUBLISH_NATS_SUBJECT"); v != "" {
return v
}
return natsDefaultSubject
}

func natsStream() string {
if v := os.Getenv("PUBLISH_NATS_STREAM"); v != "" {
return v
}
return natsDefaultStream
}

func natsConsumer() string {
if v := os.Getenv("PUBLISH_NATS_CONSUMER"); v != "" {
return v
}
return natsDefaultConsumer
}

func natsCredsFile() string {
return os.Getenv("PUBLISH_NATS_CREDS")
}

func publishNATS(body map[string]interface{}) error {
log.Printf("[x] Publishing NATS JetStream")

opts := []nats.Option{nats.Name("outpost-publish-dev")}
if creds := natsCredsFile(); creds != "" {
opts = append(opts, nats.UserCredentials(creds))
}

nc, err := nats.Connect(natsURL(), opts...)
if err != nil {
return err
}
defer nc.Drain()

js, err := jetstream.New(nc)
if err != nil {
return err
}

payload, err := json.Marshal(body)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, err = js.Publish(ctx, natsSubject(), payload)
return err
}

func declareNATS() error {
log.Printf("[*] Declaring NATS JetStream Publish infra")

opts := []nats.Option{nats.Name("outpost-publish-dev-declare")}
if creds := natsCredsFile(); creds != "" {
opts = append(opts, nats.UserCredentials(creds))
}

nc, err := nats.Connect(natsURL(), opts...)
if err != nil {
return err
}
defer nc.Drain()

js, err := jetstream.New(nc)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: natsStream(),
Subjects: []string{natsSubject()},
Retention: jetstream.WorkQueuePolicy,
Storage: jetstream.FileStorage,
})
if err != nil {
return err
}

_, err = stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: natsConsumer(),
AckPolicy: jetstream.AckExplicitPolicy,
FilterSubject: natsSubject(),
})
return err
}
27 changes: 27 additions & 0 deletions contributing/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ This document will focus on the first two types of integrations. The third type,
publishmq only:

- [ ] Kafka
- [x] NATS JetStream

## Configuration

Expand Down Expand Up @@ -116,3 +117,29 @@ RabbitMQ's concept of visibility timeout is called [acknowledgement timeout](htt
**Retry Behavior**:

**When a message is nacked, it is retried immediately.** After reaching the retry limit, the message will be sent to the DLX (dead-lettered exchange) and it then routed to the DLQ.

### NATS JetStream (publishmq only)

**Scope**: NATS JetStream is supported as a publish-mq only. Outpost reads events from one or more pre-provisioned JetStream consumers. It is **not** used as an internal MQ.

**Configuration** (per account):

- Servers (required, NATS cluster URLs)
- Credentials file (optional, NATS `.creds` JWT + NKey seed)
- Stream name (required)
- Durable consumer name (required)
- Tenant ID (optional; when set, Outpost stamps it on every event from this account)

**Multi-Tenancy**: the driver supports multiple NATS Accounts on one queue instance. Each account is dialled on its own connection with its own credentials, and the recommended pattern is one Account per Outpost tenant. Accounts can be provided either as a static list or via a watched directory (see `docs/content/publishing/publish-from-nats.mdoc`).

**Infrastructure**:

Outpost does **not** create the JetStream stream or consumer. The operator is expected to provision them — typically alongside the NATS Account itself — and to provide them to Outpost via the credentials file plus `meta.yaml`. Outpost verifies that both stream and consumer exist on startup and fails loudly if either is missing.

**Visibility Timeout**:

NATS JetStream's equivalent is the consumer's `AckWait`. Configure it on the consumer; Outpost does not override it.

**Retry Behavior**:

When a message is nacked or AckWait elapses without ack, JetStream redelivers up to the consumer's `MaxDeliver`. DLQ semantics are operator-owned; the JetStream consumer can be configured to republish max-delivery-exceeded messages to a dedicated DLQ stream via the consumer's `republish` policy or by observing `$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>` advisories.
117 changes: 117 additions & 0 deletions docs/content/publishing/publish-from-nats.mdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
---
title: "Publish from NATS JetStream"
description: "Configure Outpost to ingest published events from one or more NATS JetStream streams, with per-tenant NATS Account isolation."
---

This guide describes how to publish events into Outpost from NATS JetStream. The NATS source is **publish-mq only** — Outpost reads events from one or more pre-provisioned JetStream consumers, and never creates or modifies streams itself.

A common architecture is one NATS Account per Outpost tenant: each tenant publishes into their own account (isolated subjects, isolated JetStream quota, separate credentials), and Outpost holds one connection per account.

## Message Structure

Events published to the JetStream subject should match the [Publish API endpoint](/docs/outpost/publishing/events) payload:

```json
{
"tenant_id": "<TENANT_ID>",
"destination_id": "<DESTINATION_ID>", // Optional. Provide a way of routing events to a specific destination
"topic": "topic.name", // Topic defined in TOPICS environment variable
"eligible_for_retry": true | false, // Should event delivery be retried? Default is true.
"metadata": Payload, // can be any JSON payload,
"data": Payload // can be any JSON payload
}
```

When an account has `tenant_id` configured (recommended), Outpost overrides the payload's `tenant_id` with the configured value. This guarantees that a publisher with credentials for Account A can only ever produce events for the tenant mapped to A.

## Prerequisites

- A NATS server (2.10+) with JetStream enabled.
- One JetStream **stream** per account, holding the events Outpost should consume.
- One durable JetStream **consumer** per stream, with `AckPolicy: explicit`.

Outpost does **not** create streams or consumers — the operator is expected to provision these alongside the account itself. A typical provisioning flow is:

1. Mint the NATS Account JWT (e.g. using `nsc` or the `github.com/nats-io/jwt/v2` Go library).
2. Push the JWT to the NATS resolver via `$SYS.REQ.ACCOUNT.<id>.CLAIMS.UPDATE`.
3. Mint a User JWT and write the resulting `.creds` file.
4. Create the JetStream stream + durable consumer in that account.
5. Drop the `.creds` plus a `meta.yaml` into Outpost's `accounts_dir` (or restart Outpost with an updated static `accounts` list).

## Configuration

### Environment Variables

```
PUBLISH_NATS_SERVERS="nats://nats:4222"
PUBLISH_NATS_ACCOUNTS_DIR="/etc/outpost/nats-accounts"
```

`PUBLISH_NATS_SERVERS` accepts a comma-separated list of URLs to support NATS clusters.

### YAML

```yaml
publishmq:
nats:
servers:
- nats://nats:4222
accounts_dir: /etc/outpost/nats-accounts
# Optional inline accounts (combined with accounts_dir if both are set):
accounts:
- name: static-tenant
credentials_file: /etc/outpost/static.creds
stream: events
consumer: outpost
tenant_id: static-tenant
```

### Accounts Directory Layout

When `accounts_dir` is set, Outpost watches it for tenant subdirectories and reconciles its connections at runtime — new accounts are added without restart, removed accounts have their connections drained.

```
/etc/outpost/nats-accounts/
├── acme/
│ ├── user.creds # NATS .creds (JWT + NKey seed)
│ └── meta.yaml
└── globex/
├── user.creds
└── meta.yaml
```

Each `meta.yaml`:

```yaml
name: acme # optional; defaults to the directory name
credentials_file: "" # optional; defaults to <dir>/user.creds when present
stream: events # required
consumer: outpost # required (durable consumer name)
tenant_id: acme # optional; when set, stamped on every event
```

`credentials_file` may also be an absolute path or a path relative to the account directory — useful when you mount credentials read-only from a different source than the metadata.

## Multi-Tenancy via NATS Accounts

The recommended pattern is **one NATS Account per Outpost tenant**:

- Each tenant publishes with credentials for their own account.
- Subjects, JetStream quota, and credentials are isolated by the NATS server.
- Outpost holds one connection per account. The pull-loops run in parallel and feed into a single delivery pipeline.
- Setting `tenant_id` on each account guarantees Outpost stamps the correct tenant on every event, regardless of payload content. A publisher cannot spoof events for a different tenant.

For dynamic tenant provisioning, run NATS in operator/JWT-resolver mode and update the resolver via `$SYS.REQ.ACCOUNT.…CLAIMS.UPDATE` — no NATS-server restart required.

## Operational Notes

- **At-least-once delivery**: messages are Ack'd only after Outpost has successfully ingested them. On failure (or restart) the consumer's `AckWait` triggers a redelivery.
- **Retry limit / DLQ**: configure `MaxDeliver` and a republish target on your JetStream consumer per your retention policy. Outpost does not own DLQ semantics.
- **Credential rotation**: replace the `.creds` file and re-create or rename the account directory to force Outpost to reconnect. Hot-reloading a single file without re-creating the directory is not currently supported.
- **No publish from Outpost**: this source is read-only. Outpost will not produce messages back into the JetStream stream.

### Troubleshooting

- [Ask a question](https://github.com/hookdeck/outpost/discussions/new?category=q-a)
- [Report a bug](https://github.com/hookdeck/outpost/issues/new?assignees=&labels=bug&projects=&template=bug_report.md&title=%F0%9F%90%9B+Bug+Report%3A+)
- [Request a feature](https://github.com/hookdeck/outpost/issues/new?assignees=&labels=enhancement&projects=&template=feature_request.md&title=%F0%9F%9A%80+Feature%3A+)
19 changes: 19 additions & 0 deletions examples/docker-compose/compose-publish-nats.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: outpost-publish-nats
services:
publish_nats:
container_name: publish_nats
image: nats:2.10-alpine
command:
- "-js"
- "-m"
- "8222"
ports:
- "4222:4222"
- "8222:8222"
healthcheck:
test: ["CMD", "wget", "-q", "-O", "-", "http://localhost:8222/healthz"]
interval: 5s
timeout: 3s
retries: 5
volumes:
- ./data/publish-nats:/data
25 changes: 25 additions & 0 deletions examples/docker-compose/start-nats-publish.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

# Make the script executable with: chmod +x start-nats-publish.sh

echo "Starting publish NATS JetStream container..."
docker-compose -f compose-publish-nats.yml down
docker-compose -f compose-publish-nats.yml up -d

echo "Waiting for NATS to start..."
sleep 3

echo "Checking container status..."
docker ps | grep publish_nats

echo "NATS should now be available at:"
echo "- Client: nats://localhost:4222"
echo "- Monitoring: http://localhost:8222"

echo "Provision the stream + consumer used by the publish dev helper:"
echo " curl -X POST 'http://localhost:5555/declare?method=nats'"
echo ""
echo "Publish a test event:"
echo " curl -X POST 'http://localhost:5555/publish?method=nats' \\"
echo " -H 'Content-Type: application/json' \\"
echo " -d '{\"tenant_id\":\"acme\",\"topic\":\"user.created\",\"data\":{\"id\":1}}'"
Loading