diff --git a/.env.example b/.env.example index dac8d6603..37b0dc2f2 100644 --- a/.env.example +++ b/.env.example @@ -97,6 +97,13 @@ # PUBLISH_AZURE_SERVICEBUS_TOPIC="outpost-publish" # PUBLISH_AZURE_SERVICEBUS_SUBSCRIPTION="outpost-publish-sub" +## NATS JetStream +# Comma-separated NATS server URLs (cluster-aware). +# PUBLISH_NATS_SERVERS="nats://nats:4222" +# Per-tenant accounts live in subdirectories of this dir. Watched at runtime. +# Each subdirectory contains a meta.yaml plus an optional user.creds file. +# PUBLISH_NATS_ACCOUNTS_DIR="/etc/outpost/nats-accounts" + # ============================== Others ============================== # Portal diff --git a/.env.test b/.env.test index 26e04f292..272070f41 100644 --- a/.env.test +++ b/.env.test @@ -4,6 +4,7 @@ TEST_CLICKHOUSE_URL="localhost:39000" # MQs TEST_RABBITMQ_URL="localhost:35672" TEST_KAFKA_URL="localhost:39092" +TEST_NATS_URL="" TEST_LOCALSTACK_URL="localhost:34566" TEST_GCP_URL="localhost:38085" TEST_AZURE_SB_CONNSTRING="Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;" diff --git a/.outpost.yaml.example b/.outpost.yaml.example index af283e9ad..aef8fe7eb 100644 --- a/.outpost.yaml.example +++ b/.outpost.yaml.example @@ -42,6 +42,28 @@ mqs: delivery_queue: "outpost-delivery" # SQS queue name for delivery events log_queue: "outpost-log" # SQS queue name for log events +# Publish Message Queue (optional) +# Configure one provider to ingest events from an external source instead +# of (or in addition to) the Publish API. Only one provider may be set. +# publishmq: +# rabbitmq: +# server_url: "amqp://guest:guest@localhost:5673" +# exchange: "outpost" +# queue: "publish" +# nats: +# servers: +# - "nats://nats:4222" +# # Watched directory of per-tenant account subdirectories. +# # Each subdirectory holds a meta.yaml and optional user.creds. +# accounts_dir: "/etc/outpost/nats-accounts" +# # Static list of accounts (combined with accounts_dir if both are set). +# accounts: +# - name: "acme" +# credentials_file: "/etc/outpost/acme.creds" +# stream: "events" +# consumer: "outpost" +# tenant_id: "acme" # Outpost stamps this on every event; overrides payload. + # Application Configuration aes_encryption_secret: "" # Secret for AES encryption topics: # List of topics to subscribe to diff --git a/cmd/publish/declare_handler.go b/cmd/publish/declare_handler.go index 89053a331..e017bebe2 100644 --- a/cmd/publish/declare_handler.go +++ b/cmd/publish/declare_handler.go @@ -16,6 +16,8 @@ func handleDeclare(w http.ResponseWriter, r *http.Request) { err = declareGCP() case "rabbitmq": err = declareRabbitMQ() + case "nats": + err = declareNATS() case "http": log.Println("[*] Declare HTTP - nothing to declare") default: diff --git a/cmd/publish/publish_handler.go b/cmd/publish/publish_handler.go index e40e96e8f..e8ec376d3 100644 --- a/cmd/publish/publish_handler.go +++ b/cmd/publish/publish_handler.go @@ -23,6 +23,8 @@ func handlePublish(w http.ResponseWriter, r *http.Request) { err = publishGCP(body) case "rabbitmq": err = publishRabbitMQ(body) + case "nats": + err = publishNATS(body) case "http": fallthrough default: diff --git a/cmd/publish/publish_nats.go b/cmd/publish/publish_nats.go new file mode 100644 index 000000000..f365b5336 --- /dev/null +++ b/cmd/publish/publish_nats.go @@ -0,0 +1,123 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "os" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// Defaults match examples/docker-compose/compose-publish-nats.yml. +const ( + natsDefaultURL = "nats://localhost:4222" + natsDefaultStream = "outpost-publish" + natsDefaultSubject = "outpost.publish" + natsDefaultConsumer = "outpost" +) + +func natsURL() string { + if v := os.Getenv("PUBLISH_NATS_URL"); v != "" { + return v + } + return natsDefaultURL +} + +func natsSubject() string { + if v := os.Getenv("PUBLISH_NATS_SUBJECT"); v != "" { + return v + } + return natsDefaultSubject +} + +func natsStream() string { + if v := os.Getenv("PUBLISH_NATS_STREAM"); v != "" { + return v + } + return natsDefaultStream +} + +func natsConsumer() string { + if v := os.Getenv("PUBLISH_NATS_CONSUMER"); v != "" { + return v + } + return natsDefaultConsumer +} + +func natsCredsFile() string { + return os.Getenv("PUBLISH_NATS_CREDS") +} + +func publishNATS(body map[string]interface{}) error { + log.Printf("[x] Publishing NATS JetStream") + + opts := []nats.Option{nats.Name("outpost-publish-dev")} + if creds := natsCredsFile(); creds != "" { + opts = append(opts, nats.UserCredentials(creds)) + } + + nc, err := nats.Connect(natsURL(), opts...) + if err != nil { + return err + } + defer nc.Drain() + + js, err := jetstream.New(nc) + if err != nil { + return err + } + + payload, err := json.Marshal(body) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = js.Publish(ctx, natsSubject(), payload) + return err +} + +func declareNATS() error { + log.Printf("[*] Declaring NATS JetStream Publish infra") + + opts := []nats.Option{nats.Name("outpost-publish-dev-declare")} + if creds := natsCredsFile(); creds != "" { + opts = append(opts, nats.UserCredentials(creds)) + } + + nc, err := nats.Connect(natsURL(), opts...) + if err != nil { + return err + } + defer nc.Drain() + + js, err := jetstream.New(nc) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: natsStream(), + Subjects: []string{natsSubject()}, + Retention: jetstream.WorkQueuePolicy, + Storage: jetstream.FileStorage, + }) + if err != nil { + return err + } + + _, err = stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: natsConsumer(), + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: natsSubject(), + }) + return err +} diff --git a/contributing/mq.md b/contributing/mq.md index 79a966e00..d2c8d49cd 100644 --- a/contributing/mq.md +++ b/contributing/mq.md @@ -29,6 +29,7 @@ This document will focus on the first two types of integrations. The third type, publishmq only: - [ ] Kafka +- [x] NATS JetStream ## Configuration @@ -116,3 +117,29 @@ RabbitMQ's concept of visibility timeout is called [acknowledgement timeout](htt **Retry Behavior**: **When a message is nacked, it is retried immediately.** After reaching the retry limit, the message will be sent to the DLX (dead-lettered exchange) and it then routed to the DLQ. + +### NATS JetStream (publishmq only) + +**Scope**: NATS JetStream is supported as a publish-mq only. Outpost reads events from one or more pre-provisioned JetStream consumers. It is **not** used as an internal MQ. + +**Configuration** (per account): + +- Servers (required, NATS cluster URLs) +- Credentials file (optional, NATS `.creds` JWT + NKey seed) +- Stream name (required) +- Durable consumer name (required) +- Tenant ID (optional; when set, Outpost stamps it on every event from this account) + +**Multi-Tenancy**: the driver supports multiple NATS Accounts on one queue instance. Each account is dialled on its own connection with its own credentials, and the recommended pattern is one Account per Outpost tenant. Accounts can be provided either as a static list or via a watched directory (see `docs/content/publishing/publish-from-nats.mdoc`). + +**Infrastructure**: + +Outpost does **not** create the JetStream stream or consumer. The operator is expected to provision them — typically alongside the NATS Account itself — and to provide them to Outpost via the credentials file plus `meta.yaml`. Outpost verifies that both stream and consumer exist on startup and fails loudly if either is missing. + +**Visibility Timeout**: + +NATS JetStream's equivalent is the consumer's `AckWait`. Configure it on the consumer; Outpost does not override it. + +**Retry Behavior**: + +When a message is nacked or AckWait elapses without ack, JetStream redelivers up to the consumer's `MaxDeliver`. DLQ semantics are operator-owned; the JetStream consumer can be configured to republish max-delivery-exceeded messages to a dedicated DLQ stream via the consumer's `republish` policy or by observing `$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>` advisories. diff --git a/docs/content/publishing/publish-from-nats.mdoc b/docs/content/publishing/publish-from-nats.mdoc new file mode 100644 index 000000000..45ade1811 --- /dev/null +++ b/docs/content/publishing/publish-from-nats.mdoc @@ -0,0 +1,117 @@ +--- +title: "Publish from NATS JetStream" +description: "Configure Outpost to ingest published events from one or more NATS JetStream streams, with per-tenant NATS Account isolation." +--- + +This guide describes how to publish events into Outpost from NATS JetStream. The NATS source is **publish-mq only** — Outpost reads events from one or more pre-provisioned JetStream consumers, and never creates or modifies streams itself. + +A common architecture is one NATS Account per Outpost tenant: each tenant publishes into their own account (isolated subjects, isolated JetStream quota, separate credentials), and Outpost holds one connection per account. + +## Message Structure + +Events published to the JetStream subject should match the [Publish API endpoint](/docs/outpost/publishing/events) payload: + +```json +{ + "tenant_id": "", + "destination_id": "", // Optional. Provide a way of routing events to a specific destination + "topic": "topic.name", // Topic defined in TOPICS environment variable + "eligible_for_retry": true | false, // Should event delivery be retried? Default is true. + "metadata": Payload, // can be any JSON payload, + "data": Payload // can be any JSON payload +} +``` + +When an account has `tenant_id` configured (recommended), Outpost overrides the payload's `tenant_id` with the configured value. This guarantees that a publisher with credentials for Account A can only ever produce events for the tenant mapped to A. + +## Prerequisites + +- A NATS server (2.10+) with JetStream enabled. +- One JetStream **stream** per account, holding the events Outpost should consume. +- One durable JetStream **consumer** per stream, with `AckPolicy: explicit`. + +Outpost does **not** create streams or consumers — the operator is expected to provision these alongside the account itself. A typical provisioning flow is: + +1. Mint the NATS Account JWT (e.g. using `nsc` or the `github.com/nats-io/jwt/v2` Go library). +2. Push the JWT to the NATS resolver via `$SYS.REQ.ACCOUNT..CLAIMS.UPDATE`. +3. Mint a User JWT and write the resulting `.creds` file. +4. Create the JetStream stream + durable consumer in that account. +5. Drop the `.creds` plus a `meta.yaml` into Outpost's `accounts_dir` (or restart Outpost with an updated static `accounts` list). + +## Configuration + +### Environment Variables + +``` +PUBLISH_NATS_SERVERS="nats://nats:4222" +PUBLISH_NATS_ACCOUNTS_DIR="/etc/outpost/nats-accounts" +``` + +`PUBLISH_NATS_SERVERS` accepts a comma-separated list of URLs to support NATS clusters. + +### YAML + +```yaml +publishmq: + nats: + servers: + - nats://nats:4222 + accounts_dir: /etc/outpost/nats-accounts + # Optional inline accounts (combined with accounts_dir if both are set): + accounts: + - name: static-tenant + credentials_file: /etc/outpost/static.creds + stream: events + consumer: outpost + tenant_id: static-tenant +``` + +### Accounts Directory Layout + +When `accounts_dir` is set, Outpost watches it for tenant subdirectories and reconciles its connections at runtime — new accounts are added without restart, removed accounts have their connections drained. + +``` +/etc/outpost/nats-accounts/ +├── acme/ +│ ├── user.creds # NATS .creds (JWT + NKey seed) +│ └── meta.yaml +└── globex/ + ├── user.creds + └── meta.yaml +``` + +Each `meta.yaml`: + +```yaml +name: acme # optional; defaults to the directory name +credentials_file: "" # optional; defaults to /user.creds when present +stream: events # required +consumer: outpost # required (durable consumer name) +tenant_id: acme # optional; when set, stamped on every event +``` + +`credentials_file` may also be an absolute path or a path relative to the account directory — useful when you mount credentials read-only from a different source than the metadata. + +## Multi-Tenancy via NATS Accounts + +The recommended pattern is **one NATS Account per Outpost tenant**: + +- Each tenant publishes with credentials for their own account. +- Subjects, JetStream quota, and credentials are isolated by the NATS server. +- Outpost holds one connection per account. The pull-loops run in parallel and feed into a single delivery pipeline. +- Setting `tenant_id` on each account guarantees Outpost stamps the correct tenant on every event, regardless of payload content. A publisher cannot spoof events for a different tenant. + +For dynamic tenant provisioning, run NATS in operator/JWT-resolver mode and update the resolver via `$SYS.REQ.ACCOUNT.…CLAIMS.UPDATE` — no NATS-server restart required. + +## Operational Notes + +- **At-least-once delivery**: messages are Ack'd only after Outpost has successfully ingested them. On failure (or restart) the consumer's `AckWait` triggers a redelivery. +- **Retry limit / DLQ**: configure `MaxDeliver` and a republish target on your JetStream consumer per your retention policy. Outpost does not own DLQ semantics. +- **Credential rotation**: replace the `.creds` file and re-create or rename the account directory to force Outpost to reconnect. Hot-reloading a single file without re-creating the directory is not currently supported. +- **No publish from Outpost**: this source is read-only. Outpost will not produce messages back into the JetStream stream. + +### Troubleshooting + +- [Ask a question](https://github.com/hookdeck/outpost/discussions/new?category=q-a) +- [Report a bug](https://github.com/hookdeck/outpost/issues/new?assignees=&labels=bug&projects=&template=bug_report.md&title=%F0%9F%90%9B+Bug+Report%3A+) +- [Request a feature](https://github.com/hookdeck/outpost/issues/new?assignees=&labels=enhancement&projects=&template=feature_request.md&title=%F0%9F%9A%80+Feature%3A+) diff --git a/examples/docker-compose/compose-publish-nats.yml b/examples/docker-compose/compose-publish-nats.yml new file mode 100644 index 000000000..fe79e6a9a --- /dev/null +++ b/examples/docker-compose/compose-publish-nats.yml @@ -0,0 +1,19 @@ +name: outpost-publish-nats +services: + publish_nats: + container_name: publish_nats + image: nats:2.10-alpine + command: + - "-js" + - "-m" + - "8222" + ports: + - "4222:4222" + - "8222:8222" + healthcheck: + test: ["CMD", "wget", "-q", "-O", "-", "http://localhost:8222/healthz"] + interval: 5s + timeout: 3s + retries: 5 + volumes: + - ./data/publish-nats:/data diff --git a/examples/docker-compose/start-nats-publish.sh b/examples/docker-compose/start-nats-publish.sh new file mode 100755 index 000000000..72a93edb5 --- /dev/null +++ b/examples/docker-compose/start-nats-publish.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# Make the script executable with: chmod +x start-nats-publish.sh + +echo "Starting publish NATS JetStream container..." +docker-compose -f compose-publish-nats.yml down +docker-compose -f compose-publish-nats.yml up -d + +echo "Waiting for NATS to start..." +sleep 3 + +echo "Checking container status..." +docker ps | grep publish_nats + +echo "NATS should now be available at:" +echo "- Client: nats://localhost:4222" +echo "- Monitoring: http://localhost:8222" + +echo "Provision the stream + consumer used by the publish dev helper:" +echo " curl -X POST 'http://localhost:5555/declare?method=nats'" +echo "" +echo "Publish a test event:" +echo " curl -X POST 'http://localhost:5555/publish?method=nats' \\" +echo " -H 'Content-Type: application/json' \\" +echo " -d '{\"tenant_id\":\"acme\",\"topic\":\"user.created\",\"data\":{\"id\":1}}'" diff --git a/go.mod b/go.mod index f498cf41b..8c78e2a42 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/aws/smithy-go v1.22.4 github.com/brianvoe/gofakeit/v6 v6.28.0 github.com/caarlos0/env/v9 v9.0.0 + github.com/fsnotify/fsnotify v1.7.0 github.com/getsentry/sentry-go v0.31.1 github.com/getsentry/sentry-go/gin v0.31.1 github.com/gin-contrib/static v1.1.2 @@ -36,6 +37,7 @@ require ( github.com/joho/godotenv v1.5.1 github.com/matoous/go-nanoid/v2 v2.1.0 github.com/mikestefanello/batcher v0.1.0 + github.com/nats-io/nats.go v1.34.0 github.com/pkg/errors v0.9.1 github.com/rabbitmq/amqp091-go v1.10.0 github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 @@ -131,7 +133,6 @@ require ( github.com/ebitengine/purego v0.8.2 // indirect github.com/ericlagergren/decimal v0.0.0-20221120152707-495c53812d05 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.4 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-faster/city v1.0.1 // indirect @@ -177,6 +178,8 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/nxadm/tail v1.4.11 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.18.1 // indirect diff --git a/go.sum b/go.sum index e7deb082b..d83c2180a 100644 --- a/go.sum +++ b/go.sum @@ -1156,6 +1156,12 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk= +github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= diff --git a/internal/config/publishmq.go b/internal/config/publishmq.go index d77d4172b..0e1edb0ba 100644 --- a/internal/config/publishmq.go +++ b/internal/config/publishmq.go @@ -33,11 +33,26 @@ type PublishRabbitMQConfig struct { Queue string `yaml:"queue" env:"PUBLISH_RABBITMQ_QUEUE" desc:"Name of the RabbitMQ queue for publishing events. Required if RabbitMQ is the chosen publish MQ provider." required:"C"` } +type PublishNATSAccountConfig struct { + Name string `yaml:"name" desc:"Account label used for logging and metrics. Must be unique within the publish source." required:"C"` + CredentialsFile string `yaml:"credentials_file" desc:"Path to the NATS .creds file (JWT + NKey seed) for this account." required:"C"` + Stream string `yaml:"stream" desc:"JetStream stream name to consume from. Must be pre-created on the NATS side." required:"C"` + Consumer string `yaml:"consumer" desc:"Durable JetStream consumer name. Must be pre-created on the NATS side." required:"C"` + TenantID string `yaml:"tenant_id" desc:"Outpost tenant_id to stamp on every event from this account. Recommended for one-account-per-tenant setups; overrides any tenant_id in the payload." required:"N"` +} + +type PublishNATSConfig struct { + Servers []string `yaml:"servers" env:"PUBLISH_NATS_SERVERS" envSeparator:"," desc:"NATS cluster URLs for the publish source (comma-separated, e.g. 'nats://a:4222,nats://b:4222'). Required if NATS is the chosen publish MQ provider." required:"C"` + AccountsDir string `yaml:"accounts_dir" env:"PUBLISH_NATS_ACCOUNTS_DIR" desc:"Directory containing per-tenant NATS account subdirectories (each with a meta.yaml plus credentials file). Watched for runtime changes. Combined with 'accounts' if both are set." required:"N"` + Accounts []PublishNATSAccountConfig `yaml:"accounts" desc:"Static list of NATS accounts to consume from. Alternative or supplement to accounts_dir." required:"N"` +} + type PublishMQConfig struct { AWSSQS PublishAWSSQSConfig `yaml:"aws_sqs" desc:"Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured." required:"N"` AzureServiceBus PublishAzureServiceBusConfig `yaml:"azure_servicebus" desc:"Configuration for using Azure Service Bus as the publish message queue. Only one publish MQ provider should be configured." required:"N"` GCPPubSub PublishGCPPubSubConfig `yaml:"gcp_pubsub" desc:"Configuration for using GCP Pub/Sub as the publish message queue. Only one publish MQ provider should be configured." required:"N"` RabbitMQ PublishRabbitMQConfig `yaml:"rabbitmq" desc:"Configuration for using RabbitMQ as the publish message queue. Only one publish MQ provider should be configured." required:"N"` + NATS PublishNATSConfig `yaml:"nats" desc:"Configuration for using NATS JetStream as the publish message queue. Only one publish MQ provider should be configured." required:"N"` } func (c PublishMQConfig) GetInfraType() string { @@ -53,6 +68,9 @@ func (c PublishMQConfig) GetInfraType() string { if hasPublishRabbitMQConfig(c.RabbitMQ) { return "rabbitmq" } + if hasPublishNATSConfig(c.NATS) { + return "nats" + } return "" } @@ -94,6 +112,24 @@ func (c *PublishMQConfig) GetQueueConfig() *mqs.QueueConfig { Queue: c.RabbitMQ.Queue, }, } + case "nats": + accounts := make([]mqs.NATSAccountConfig, 0, len(c.NATS.Accounts)) + for _, a := range c.NATS.Accounts { + accounts = append(accounts, mqs.NATSAccountConfig{ + Name: a.Name, + CredentialsFile: a.CredentialsFile, + Stream: a.Stream, + Consumer: a.Consumer, + TenantID: a.TenantID, + }) + } + return &mqs.QueueConfig{ + NATS: &mqs.NATSConfig{ + Servers: c.NATS.Servers, + AccountsDir: c.NATS.AccountsDir, + Accounts: accounts, + }, + } default: return nil } @@ -115,3 +151,10 @@ func hasPublishGCPPubSubConfig(config PublishGCPPubSubConfig) bool { func hasPublishRabbitMQConfig(config PublishRabbitMQConfig) bool { return config.ServerURL != "" } + +func hasPublishNATSConfig(config PublishNATSConfig) bool { + if len(config.Servers) == 0 { + return false + } + return config.AccountsDir != "" || len(config.Accounts) > 0 +} diff --git a/internal/mqs/nats_accounts.go b/internal/mqs/nats_accounts.go new file mode 100644 index 000000000..d1da253c8 --- /dev/null +++ b/internal/mqs/nats_accounts.go @@ -0,0 +1,207 @@ +package mqs + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" + "gopkg.in/yaml.v3" +) + +// accountMetaFile is the on-disk representation of a single tenant's +// NATS account inside AccountsDir. +// +// Layout under AccountsDir: +// +// / +// user.creds NATS .creds (JWT + NKey seed) — default credentials path +// meta.yaml this struct +// +// CredentialsFile may be overridden in meta.yaml (absolute or relative +// to the account directory). The default is "user.creds". +type accountMetaFile struct { + Name string `yaml:"name"` + CredentialsFile string `yaml:"credentials_file"` + Stream string `yaml:"stream"` + Consumer string `yaml:"consumer"` + TenantID string `yaml:"tenant_id"` +} + +// loadAccountsFromDir scans dir for tenant subdirectories that contain a +// meta.yaml file and returns the list of accounts. Subdirectories without +// a meta.yaml are skipped (still being written, perhaps). +func loadAccountsFromDir(dir string) ([]NATSAccountConfig, error) { + entries, err := os.ReadDir(dir) + if err != nil { + return nil, fmt.Errorf("read accounts_dir %q: %w", dir, err) + } + var accounts []NATSAccountConfig + for _, e := range entries { + if !e.IsDir() { + continue + } + accountDir := filepath.Join(dir, e.Name()) + metaPath := filepath.Join(accountDir, "meta.yaml") + if _, err := os.Stat(metaPath); err != nil { + if errors.Is(err, os.ErrNotExist) { + // Subdirectory without meta.yaml: still being provisioned + // or simply not an outpost account. Silently skip. + continue + } + return nil, fmt.Errorf("stat meta for account %q: %w", e.Name(), err) + } + acc, err := loadAccountMeta(metaPath, accountDir, e.Name()) + if err != nil { + return nil, fmt.Errorf("account %q: %w", e.Name(), err) + } + accounts = append(accounts, acc) + } + return accounts, nil +} + +func loadAccountMeta(metaPath, accountDir, dirName string) (NATSAccountConfig, error) { + body, err := os.ReadFile(metaPath) + if err != nil { + return NATSAccountConfig{}, err + } + var meta accountMetaFile + if err := yaml.Unmarshal(body, &meta); err != nil { + return NATSAccountConfig{}, fmt.Errorf("parse meta.yaml: %w", err) + } + + name := meta.Name + if name == "" { + name = dirName + } + + creds := meta.CredentialsFile + if creds == "" { + // Convention: /user.creds when present. If absent the + // account runs with no credentials (valid for trusted-network or + // token-via-URL setups). + candidate := filepath.Join(accountDir, "user.creds") + if _, err := os.Stat(candidate); err == nil { + creds = candidate + } + } else if !filepath.IsAbs(creds) { + creds = filepath.Join(accountDir, creds) + } + + return NATSAccountConfig{ + Name: name, + CredentialsFile: creds, + Stream: meta.Stream, + Consumer: meta.Consumer, + TenantID: meta.TenantID, + }, nil +} + +// natsAccountsWatcher watches a directory for create/remove/rename events +// and invokes onChange (debounced) so the queue can reconcile. +type natsAccountsWatcher struct { + dir string + onChange func() + w *fsnotify.Watcher + + stopCh chan struct{} + once sync.Once +} + +// debounceWindow collapses bursts of FS events (e.g. provisioning writes +// user.creds then meta.yaml) into a single reconcile. +const debounceWindow = 250 * time.Millisecond + +func newNATSAccountsWatcher(dir string, onChange func()) (*natsAccountsWatcher, error) { + w, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + if err := w.Add(dir); err != nil { + _ = w.Close() + return nil, err + } + + // Also watch existing subdirectories so we catch file changes inside + // an account dir (e.g. credentials rotation, meta.yaml edits). + entries, err := os.ReadDir(dir) + if err == nil { + for _, e := range entries { + if e.IsDir() { + _ = w.Add(filepath.Join(dir, e.Name())) + } + } + } + + return &natsAccountsWatcher{ + dir: dir, + onChange: onChange, + w: w, + stopCh: make(chan struct{}), + }, nil +} + +func (w *natsAccountsWatcher) start() { + go w.run() +} + +func (w *natsAccountsWatcher) run() { + defer w.w.Close() + + // Single reusable timer for debouncing FS events. Initialise stopped + // so timer.C blocks until the first event arms it. + timer := time.NewTimer(debounceWindow) + if !timer.Stop() { + <-timer.C + } + armed := false + + arm := func() { + if armed { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + } + timer.Reset(debounceWindow) + armed = true + } + + for { + select { + case <-w.stopCh: + if armed { + timer.Stop() + } + return + case ev, ok := <-w.w.Events: + if !ok { + return + } + // Newly created subdirectory: add it to the watch list so we + // also see meta.yaml/credentials writes inside. + if ev.Op&fsnotify.Create != 0 { + if fi, err := os.Stat(ev.Name); err == nil && fi.IsDir() { + _ = w.w.Add(ev.Name) + } + } + // On Remove/Rename fsnotify cleans up its own watch — nothing + // to do here. The next reconcile will catch the diff. + arm() + case <-w.w.Errors: + // Errors are non-fatal; the next event will retry. + case <-timer.C: + armed = false + w.onChange() + } + } +} + +func (w *natsAccountsWatcher) stop() { + w.once.Do(func() { close(w.stopCh) }) +} diff --git a/internal/mqs/queue.go b/internal/mqs/queue.go index 99497412b..cb4492ece 100644 --- a/internal/mqs/queue.go +++ b/internal/mqs/queue.go @@ -17,6 +17,7 @@ type QueueConfig struct { AzureServiceBus *AzureServiceBusConfig GCPPubSub *GCPPubSubConfig RabbitMQ *RabbitMQConfig + NATS *NATSConfig InMemory *InMemoryConfig // mainly for testing purposes VisibilityTimeout time.Duration @@ -94,6 +95,8 @@ func NewQueue(config *QueueConfig) Queue { return NewGCPPubSubQueue(config.GCPPubSub, config.VisibilityTimeout) } else if config.RabbitMQ != nil { return NewRabbitMQQueue(config.RabbitMQ) + } else if config.NATS != nil { + return NewNATSQueue(config.NATS) } else { return NewInMemoryQueue(config.InMemory) } diff --git a/internal/mqs/queue_nats.go b/internal/mqs/queue_nats.go new file mode 100644 index 000000000..093b32d5d --- /dev/null +++ b/internal/mqs/queue_nats.go @@ -0,0 +1,522 @@ +package mqs + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// pumpBackoff is the cooldown between iter.Next() returns and the next +// fetch when the previous call returned a transient error. Prevents +// busy-spinning when the upstream consumer is briefly unhealthy. +const pumpBackoff = 250 * time.Millisecond + +// NATSConfig configures a NATS JetStream publish source. +// +// NATS JetStream is supported as a publish-mq only (Outpost reads events +// from one or more pre-provisioned JetStream consumers). Outpost does not +// create the underlying stream or consumer — the operator is expected to +// manage that lifecycle, typically alongside per-tenant NATS Account +// provisioning. +// +// One queue instance can consume from multiple NATS Accounts in parallel. +// Each account holds its own credentials and is dialled on its own NATS +// connection. With one Account per tenant, set Account.TenantID to make +// Outpost stamp the correct tenant on every event regardless of payload. +type NATSConfig struct { + // Servers is the NATS cluster URL list (e.g. ["nats://a:4222","nats://b:4222"]). + Servers []string + + // Accounts is the static list of accounts the queue should consume from. + // Combined with AccountsDir if both are set. + Accounts []NATSAccountConfig + + // AccountsDir, when set, makes the queue watch a directory for tenant + // subdirectories. Each subdirectory contains a meta.yaml describing the + // account plus a .creds file. Accounts are added and removed at runtime + // as directories appear and disappear, without restarting Outpost. + AccountsDir string +} + +// NATSAccountConfig is a single NATS Account that Outpost consumes from. +type NATSAccountConfig struct { + // Name is a short label used for logging, metrics, and account identity + // inside the queue. Must be unique within a NATSConfig. + Name string + + // CredentialsFile points at a NATS .creds file (JWT + NKey seed). + CredentialsFile string + + // Stream is the JetStream stream name the consumer reads from. + Stream string + + // Consumer is the durable JetStream consumer name. Must be pre-created. + Consumer string + + // TenantID, when set, overrides the tenant_id field on every event + // from this account. Recommended pattern: one Account per Outpost tenant. + // Leave empty to trust whatever tenant_id is in the payload. + TenantID string +} + +// NATSQueue is a publish-mq driver backed by NATS JetStream. +type NATSQueue struct { + config *NATSConfig + + mu sync.Mutex + conns map[string]*natsConn // by account name + sub *natsSubscription // active subscription, nil before Subscribe + + servers string + watcher *natsAccountsWatcher +} + +type natsConn struct { + account NATSAccountConfig + nc *nats.Conn + js jetstream.JetStream +} + +var _ Queue = (*NATSQueue)(nil) + +// NewNATSQueue constructs (but does not connect) a NATS JetStream queue. +// Call Init to open the connections. +func NewNATSQueue(config *NATSConfig) *NATSQueue { + return &NATSQueue{ + config: config, + conns: make(map[string]*natsConn), + } +} + +// Init validates the configuration, opens one NATS connection per account, +// verifies each stream + consumer exists, and (if AccountsDir is set) starts +// a filesystem watcher that adds and removes accounts at runtime. The +// returned cleanup function drains every connection and stops the watcher. +func (q *NATSQueue) Init(ctx context.Context) (func(), error) { + if q.config == nil { + return nil, errors.New("nats: nil config") + } + if len(q.config.Servers) == 0 { + return nil, errors.New("nats: no servers configured") + } + + q.servers = strings.Join(q.config.Servers, ",") + + accounts := append([]NATSAccountConfig(nil), q.config.Accounts...) + if q.config.AccountsDir != "" { + dirAccounts, err := loadAccountsFromDir(q.config.AccountsDir) + if err != nil { + return nil, fmt.Errorf("nats: %w", err) + } + accounts = append(accounts, dirAccounts...) + } + if len(accounts) == 0 { + return nil, errors.New("nats: no accounts configured") + } + + for _, acc := range accounts { + if err := q.addAccount(ctx, acc); err != nil { + q.closeAll() + return nil, err + } + } + + if q.config.AccountsDir != "" { + w, err := newNATSAccountsWatcher(q.config.AccountsDir, q.reconcileFromDir) + if err != nil { + q.closeAll() + return nil, fmt.Errorf("nats: watch accounts_dir: %w", err) + } + q.watcher = w + w.start() + } + + return func() { q.closeAll() }, nil +} + +func (q *NATSQueue) closeAll() { + if q.watcher != nil { + q.watcher.stop() + q.watcher = nil + } + + q.mu.Lock() + if q.sub != nil { + q.sub.stopAll() + } + conns := q.conns + q.conns = make(map[string]*natsConn) + q.mu.Unlock() + + for _, c := range conns { + if c.nc != nil { + _ = c.nc.Drain() + } + } +} + +// addAccount connects to NATS for a single account, verifies stream and +// consumer, registers the connection, and (if a subscription is active) +// starts a pump for it. Caller must not hold q.mu. +func (q *NATSQueue) addAccount(ctx context.Context, acc NATSAccountConfig) error { + if err := acc.validate(); err != nil { + return fmt.Errorf("nats: account %q: %w", acc.Name, err) + } + + q.mu.Lock() + if _, exists := q.conns[acc.Name]; exists { + q.mu.Unlock() + return nil + } + q.mu.Unlock() + + opts := []nats.Option{ + nats.Name(fmt.Sprintf("outpost:%s", acc.Name)), + nats.MaxReconnects(-1), + } + if acc.CredentialsFile != "" { + opts = append(opts, nats.UserCredentials(acc.CredentialsFile)) + } + nc, err := nats.Connect(q.servers, opts...) + if err != nil { + return fmt.Errorf("nats: account %q: connect: %w", acc.Name, err) + } + + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + return fmt.Errorf("nats: account %q: jetstream: %w", acc.Name, err) + } + + if _, err := js.Stream(ctx, acc.Stream); err != nil { + nc.Close() + return fmt.Errorf("nats: account %q: stream %q: %w", acc.Name, acc.Stream, err) + } + if _, err := js.Consumer(ctx, acc.Stream, acc.Consumer); err != nil { + nc.Close() + return fmt.Errorf("nats: account %q: consumer %q: %w", acc.Name, acc.Consumer, err) + } + + conn := &natsConn{account: acc, nc: nc, js: js} + + q.mu.Lock() + q.conns[acc.Name] = conn + sub := q.sub + q.mu.Unlock() + + if sub != nil { + if err := sub.startPump(ctx, conn); err != nil { + q.removeAccount(acc.Name) + return err + } + } + return nil +} + +// removeAccount stops the pump (if any) and drains the connection for the +// named account. No-op if the account is not currently registered. +func (q *NATSQueue) removeAccount(name string) { + q.mu.Lock() + conn, ok := q.conns[name] + if !ok { + q.mu.Unlock() + return + } + delete(q.conns, name) + sub := q.sub + q.mu.Unlock() + + if sub != nil { + sub.stopPump(name) + } + if conn.nc != nil { + _ = conn.nc.Drain() + } +} + +// reconcileFromDir is invoked by the watcher on every directory change. +// It diffs the current set of dir-derived accounts against q.conns and +// adds/removes as needed. Static accounts (from q.config.Accounts) are +// preserved. +func (q *NATSQueue) reconcileFromDir() { + desired, err := loadAccountsFromDir(q.config.AccountsDir) + if err != nil { + return + } + + staticNames := make(map[string]struct{}, len(q.config.Accounts)) + for _, a := range q.config.Accounts { + staticNames[a.Name] = struct{}{} + } + + desiredSet := make(map[string]NATSAccountConfig, len(desired)) + for _, a := range desired { + desiredSet[a.Name] = a + } + + q.mu.Lock() + current := make(map[string]NATSAccountConfig, len(q.conns)) + for name, c := range q.conns { + current[name] = c.account + } + q.mu.Unlock() + + for name := range current { + if _, isStatic := staticNames[name]; isStatic { + continue + } + if _, stillThere := desiredSet[name]; !stillThere { + q.removeAccount(name) + } + } + + for _, acc := range desired { + if _, alreadyHave := current[acc.Name]; alreadyHave { + continue + } + if err := q.addAccount(context.Background(), acc); err != nil { + // Surface the failure so operators can spot bad creds, missing + // streams, or unreachable servers. The next FS event re-triggers + // reconcile, so a transient failure self-heals. + log.Printf("nats: reconcile add account %q failed: %v", acc.Name, err) + } + } +} + +// Publish is intentionally not implemented. JetStream is a publish-mq +// source only; events enter Outpost from publishers outside Outpost. +func (q *NATSQueue) Publish(ctx context.Context, msg IncomingMessage) error { + return errors.New("nats: publish is not supported by the JetStream publish-mq driver") +} + +// Subscribe opens a pull-based JetStream consumer per registered account +// and fans messages into a single multiplexed Subscription. Accounts added +// later (via the directory watcher) automatically get a pump started too. +func (q *NATSQueue) Subscribe(ctx context.Context, opts ...SubscribeOption) (Subscription, error) { + q.mu.Lock() + if len(q.conns) == 0 { + q.mu.Unlock() + return nil, errors.New("nats: queue not initialized") + } + if q.sub != nil { + q.mu.Unlock() + return nil, errors.New("nats: already subscribed") + } + + options := ApplySubscribeOptions(opts) + perAccount := options.Concurrency + if perAccount <= 0 { + perAccount = 1 + } + + sub := &natsSubscription{ + msgs: make(chan *Message), + done: make(chan struct{}), + iters: make(map[string]jetstream.MessagesContext), + perAccount: perAccount, + } + q.sub = sub + + conns := make([]*natsConn, 0, len(q.conns)) + for _, c := range q.conns { + conns = append(conns, c) + } + q.mu.Unlock() + + for _, c := range conns { + if err := sub.startPump(ctx, c); err != nil { + _ = sub.Shutdown(context.Background()) + return nil, err + } + } + + return sub, nil +} + +// SupportsConcurrency tells the upstream consumer to skip its own semaphore +// since pull concurrency is already bounded by PullMaxMessages per account. +func (q *NATSQueue) SupportsConcurrency() bool { return true } + +// natsSubscription multiplexes messages from N per-account pull loops +// into a single Receive channel. +type natsSubscription struct { + msgs chan *Message + done chan struct{} + wg sync.WaitGroup + + perAccount int + + itersMu sync.Mutex + iters map[string]jetstream.MessagesContext // by account name +} + +var ( + _ Subscription = (*natsSubscription)(nil) + _ ConcurrentSubscription = (*NATSQueue)(nil) +) + +func (s *natsSubscription) Receive(ctx context.Context) (*Message, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-s.done: + return nil, errors.New("nats: subscription closed") + case msg := <-s.msgs: + return msg, nil + } +} + +func (s *natsSubscription) Shutdown(_ context.Context) error { + s.stopAll() + return nil +} + +func (s *natsSubscription) stopAll() { + select { + case <-s.done: + return + default: + } + close(s.done) + + s.itersMu.Lock() + for _, iter := range s.iters { + iter.Stop() + } + s.iters = make(map[string]jetstream.MessagesContext) + s.itersMu.Unlock() + + s.wg.Wait() +} + +func (s *natsSubscription) startPump(ctx context.Context, conn *natsConn) error { + consumer, err := conn.js.Consumer(ctx, conn.account.Stream, conn.account.Consumer) + if err != nil { + return fmt.Errorf("nats: account %q: open consumer: %w", conn.account.Name, err) + } + iter, err := consumer.Messages(jetstream.PullMaxMessages(s.perAccount)) + if err != nil { + return fmt.Errorf("nats: account %q: messages: %w", conn.account.Name, err) + } + + s.itersMu.Lock() + if existing, ok := s.iters[conn.account.Name]; ok { + existing.Stop() + } + s.iters[conn.account.Name] = iter + s.itersMu.Unlock() + + s.wg.Add(1) + go s.pump(conn.account, iter) + return nil +} + +func (s *natsSubscription) stopPump(name string) { + s.itersMu.Lock() + iter, ok := s.iters[name] + if ok { + delete(s.iters, name) + } + s.itersMu.Unlock() + if ok { + iter.Stop() + } +} + +func (s *natsSubscription) pump(account NATSAccountConfig, iter jetstream.MessagesContext) { + defer s.wg.Done() + + for { + select { + case <-s.done: + return + default: + } + + jmsg, err := iter.Next() + if err != nil { + if errors.Is(err, jetstream.ErrMsgIteratorClosed) { + return + } + // Transient error (connection blip, server restart, consumer + // leader change). Back off briefly so we don't peg CPU and + // give the cluster room to recover, but stay responsive to + // shutdown. + select { + case <-time.After(pumpBackoff): + case <-s.done: + return + } + continue + } + + body := jmsg.Data() + if account.TenantID != "" { + if rewritten, rerr := overrideTenantID(body, account.TenantID); rerr == nil { + body = rewritten + } + } + + out := &Message{ + QueueMessage: &natsQueueMessage{msg: jmsg}, + LoggableID: formatLoggableID(account, jmsg), + Body: body, + } + + select { + case s.msgs <- out: + case <-s.done: + _ = jmsg.Nak() + return + } + } +} + +type natsQueueMessage struct { + msg jetstream.Msg +} + +func (m *natsQueueMessage) Ack() { _ = m.msg.Ack() } +func (m *natsQueueMessage) Nack() { _ = m.msg.Nak() } + +func (a NATSAccountConfig) validate() error { + if a.Name == "" { + return errors.New("name is required") + } + if a.Stream == "" { + return errors.New("stream is required") + } + if a.Consumer == "" { + return errors.New("consumer is required") + } + // credentials_file is optional: a NATS deployment may use no-auth, + // nkey-via-server-config, or token-via-URL. + return nil +} + +func formatLoggableID(account NATSAccountConfig, jmsg jetstream.Msg) string { + md, err := jmsg.Metadata() + if err != nil || md == nil { + return account.Name + } + return fmt.Sprintf("%s/%s:%d", account.Name, md.Stream, md.Sequence.Stream) +} + +// overrideTenantID rewrites the tenant_id field on a JSON event payload. +// If the body is not a JSON object, it is returned unchanged. +func overrideTenantID(body []byte, tenantID string) ([]byte, error) { + var event map[string]any + if err := json.Unmarshal(body, &event); err != nil { + return body, err + } + event["tenant_id"] = tenantID + return json.Marshal(event) +} diff --git a/internal/mqs/queue_nats_test.go b/internal/mqs/queue_nats_test.go new file mode 100644 index 000000000..49e0ff41a --- /dev/null +++ b/internal/mqs/queue_nats_test.go @@ -0,0 +1,278 @@ +package mqs_test + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/uuid" + "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/util/testinfra" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestIntegrationMQ_NATS verifies that the NATS JetStream queue driver +// receives messages, surfaces them via Subscribe/Receive, and that Ack +// removes the message from the work queue. +func TestIntegrationMQ_NATS(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + t.Parallel() + t.Cleanup(testinfra.Start(t)) + + config := testinfra.NewMQNATSConfig(t) + queue := mqs.NewQueue(&config) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cleanup, err := queue.Init(ctx) + require.NoError(t, err) + defer cleanup() + + sub, err := queue.Subscribe(ctx) + require.NoError(t, err) + defer sub.Shutdown(ctx) + + payload, _ := json.Marshal(map[string]any{ + "tenant_id": "ignored-from-payload", + "topic": "user.created", + "data": map[string]any{"hello": "world"}, + }) + require.NoError(t, testutil.PublishToNATSStream(ctx, config.NATS.Servers, config.NATS.Accounts[0].Stream, payload)) + + msg, err := sub.Receive(ctx) + require.NoError(t, err) + require.NotNil(t, msg) + + var got map[string]any + require.NoError(t, json.Unmarshal(msg.Body, &got)) + assert.Equal(t, "user.created", got["topic"]) + // No TenantID override set on the account — payload value is preserved. + assert.Equal(t, "ignored-from-payload", got["tenant_id"]) + msg.Ack() +} + +// TestIntegrationMQ_NATS_TenantOverride verifies that when an account +// has TenantID set, the queue stamps that tenant_id on every event +// regardless of what the payload contained. +func TestIntegrationMQ_NATS_TenantOverride(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + t.Parallel() + t.Cleanup(testinfra.Start(t)) + + stream := "test-" + uuid.New().String() + consumer := "test-" + uuid.New().String() + config := mqs.QueueConfig{ + NATS: &mqs.NATSConfig{ + Servers: []string{testinfra.EnsureNATS()}, + Accounts: []mqs.NATSAccountConfig{{ + Name: "acme", + Stream: stream, + Consumer: consumer, + TenantID: "acme-tenant", + }}, + }, + } + require.NoError(t, testutil.DeclareTestNATSInfrastructure(context.Background(), config.NATS)) + t.Cleanup(func() { _ = testutil.TeardownTestNATSInfrastructure(context.Background(), config.NATS) }) + + queue := mqs.NewQueue(&config) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cleanup, err := queue.Init(ctx) + require.NoError(t, err) + defer cleanup() + + sub, err := queue.Subscribe(ctx) + require.NoError(t, err) + defer sub.Shutdown(ctx) + + payload, _ := json.Marshal(map[string]any{ + "tenant_id": "spoofed-tenant", + "topic": "x.y", + }) + require.NoError(t, testutil.PublishToNATSStream(ctx, config.NATS.Servers, stream, payload)) + + msg, err := sub.Receive(ctx) + require.NoError(t, err) + + var got map[string]any + require.NoError(t, json.Unmarshal(msg.Body, &got)) + assert.Equal(t, "acme-tenant", got["tenant_id"], "TenantID on account config must override payload") + msg.Ack() +} + +// TestIntegrationMQ_NATS_MultiAccount verifies that two accounts +// can be consumed in parallel and events end up tagged with the +// correct tenant_id. +func TestIntegrationMQ_NATS_MultiAccount(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + t.Parallel() + t.Cleanup(testinfra.Start(t)) + + streamA := "test-a-" + uuid.New().String() + streamB := "test-b-" + uuid.New().String() + config := mqs.QueueConfig{ + NATS: &mqs.NATSConfig{ + Servers: []string{testinfra.EnsureNATS()}, + Accounts: []mqs.NATSAccountConfig{ + {Name: "acme", Stream: streamA, Consumer: "outpost", TenantID: "acme"}, + {Name: "globex", Stream: streamB, Consumer: "outpost", TenantID: "globex"}, + }, + }, + } + require.NoError(t, testutil.DeclareTestNATSInfrastructure(context.Background(), config.NATS)) + t.Cleanup(func() { _ = testutil.TeardownTestNATSInfrastructure(context.Background(), config.NATS) }) + + queue := mqs.NewQueue(&config) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cleanup, err := queue.Init(ctx) + require.NoError(t, err) + defer cleanup() + + sub, err := queue.Subscribe(ctx) + require.NoError(t, err) + defer sub.Shutdown(ctx) + + // Send one event to each stream. + payloadA, _ := json.Marshal(map[string]any{"topic": "a"}) + payloadB, _ := json.Marshal(map[string]any{"topic": "b"}) + require.NoError(t, testutil.PublishToNATSStream(ctx, config.NATS.Servers, streamA, payloadA)) + require.NoError(t, testutil.PublishToNATSStream(ctx, config.NATS.Servers, streamB, payloadB)) + + seen := map[string]string{} + for range 2 { + msg, err := sub.Receive(ctx) + require.NoError(t, err) + var got map[string]any + require.NoError(t, json.Unmarshal(msg.Body, &got)) + seen[got["tenant_id"].(string)] = got["topic"].(string) + msg.Ack() + } + + assert.Equal(t, "a", seen["acme"]) + assert.Equal(t, "b", seen["globex"]) +} + +// TestIntegrationMQ_NATS_AccountsDir verifies the directory watcher: +// 1. Initial accounts in the directory are picked up at Init time. +// 2. A new account directory created after Init triggers a new +// connection and the queue receives events from it. +func TestIntegrationMQ_NATS_AccountsDir(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + t.Parallel() + t.Cleanup(testinfra.Start(t)) + + accountsDir := t.TempDir() + servers := []string{testinfra.EnsureNATS()} + + // Pre-create one account on disk before Init. + initialStream := "init-" + uuid.New().String() + initialAcc := mqs.NATSAccountConfig{ + Name: "initial", + Stream: initialStream, + Consumer: "outpost", + TenantID: "initial-tenant", + } + writeAccountDir(t, accountsDir, initialAcc) + + // Provision JetStream resources for both initial + late account up-front + // so the watcher only has to add the file to trigger the new connection. + lateStream := "late-" + uuid.New().String() + lateAcc := mqs.NATSAccountConfig{ + Name: "late", + Stream: lateStream, + Consumer: "outpost", + TenantID: "late-tenant", + } + provisionConfig := &mqs.NATSConfig{ + Servers: servers, + Accounts: []mqs.NATSAccountConfig{initialAcc, lateAcc}, + } + require.NoError(t, testutil.DeclareTestNATSInfrastructure(context.Background(), provisionConfig)) + t.Cleanup(func() { _ = testutil.TeardownTestNATSInfrastructure(context.Background(), provisionConfig) }) + + config := mqs.QueueConfig{ + NATS: &mqs.NATSConfig{ + Servers: servers, + AccountsDir: accountsDir, + }, + } + + queue := mqs.NewQueue(&config) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cleanup, err := queue.Init(ctx) + require.NoError(t, err) + defer cleanup() + + sub, err := queue.Subscribe(ctx) + require.NoError(t, err) + defer sub.Shutdown(ctx) + + // Inject one event for the initial account. + initialPayload, _ := json.Marshal(map[string]any{"src": "initial"}) + require.NoError(t, testutil.PublishToNATSStream(ctx, servers, initialStream, initialPayload)) + + msg1, err := sub.Receive(ctx) + require.NoError(t, err) + var got1 map[string]any + require.NoError(t, json.Unmarshal(msg1.Body, &got1)) + assert.Equal(t, "initial-tenant", got1["tenant_id"]) + msg1.Ack() + + // Drop a new account directory in — the watcher should pick it up + // and open a connection within the debounce window. + writeAccountDir(t, accountsDir, lateAcc) + + // Give the watcher a moment to reconcile. + deadline := time.Now().Add(5 * time.Second) + latePayload, _ := json.Marshal(map[string]any{"src": "late"}) + var msg2 *mqs.Message + for time.Now().Before(deadline) { + require.NoError(t, testutil.PublishToNATSStream(ctx, servers, lateStream, latePayload)) + readCtx, readCancel := context.WithTimeout(ctx, 1*time.Second) + msg2, err = sub.Receive(readCtx) + readCancel() + if err == nil { + break + } + } + require.NotNil(t, msg2, "expected to receive event from late-added account") + var got2 map[string]any + require.NoError(t, json.Unmarshal(msg2.Body, &got2)) + assert.Equal(t, "late-tenant", got2["tenant_id"]) + msg2.Ack() +} + +// writeAccountDir lays out a single account inside accountsDir using +// the meta.yaml + (empty) user.creds convention. Tests using no-auth +// NATS leave CredentialsFile empty in meta.yaml. +func writeAccountDir(t *testing.T, accountsDir string, acc mqs.NATSAccountConfig) { + t.Helper() + dir := filepath.Join(accountsDir, acc.Name) + require.NoError(t, os.MkdirAll(dir, 0o755)) + + meta := "name: " + acc.Name + "\n" + + "stream: " + acc.Stream + "\n" + + "consumer: " + acc.Consumer + "\n" + + "tenant_id: " + acc.TenantID + "\n" + require.NoError(t, os.WriteFile(filepath.Join(dir, "meta.yaml"), []byte(meta), 0o600)) +} diff --git a/internal/util/testinfra/nats.go b/internal/util/testinfra/nats.go new file mode 100644 index 000000000..6bd8ecced --- /dev/null +++ b/internal/util/testinfra/nats.go @@ -0,0 +1,94 @@ +package testinfra + +import ( + "context" + "log" + "sync" + "testing" + + "github.com/google/uuid" + "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +// NewMQNATSConfig spins up (if necessary) a NATS JetStream test container, +// provisions one stream + consumer with random names, and returns a +// QueueConfig with one account pointing at them. The stream/consumer are +// torn down after the test. +func NewMQNATSConfig(t *testing.T) mqs.QueueConfig { + stream := "test-" + uuid.New().String() + consumer := "test-" + uuid.New().String() + + cfg := mqs.QueueConfig{ + NATS: &mqs.NATSConfig{ + Servers: []string{EnsureNATS()}, + Accounts: []mqs.NATSAccountConfig{{ + Name: "test", + Stream: stream, + Consumer: consumer, + }}, + }, + } + + ctx := context.Background() + if err := testutil.DeclareTestNATSInfrastructure(ctx, cfg.NATS); err != nil { + panic(err) + } + t.Cleanup(func() { + if err := testutil.TeardownTestNATSInfrastructure(ctx, cfg.NATS); err != nil { + log.Println("Failed to teardown NATS infrastructure", err) + } + }) + return cfg +} + +var natsOnce sync.Once + +// EnsureNATS returns a NATS URL, starting a test container on first call +// unless TEST_NATS_URL is set in the test environment. +// +// The check for an existing NATSURL is performed *inside* sync.Once.Do so +// concurrent callers from t.Parallel() tests don't race against the +// container-start path (which writes cfg.NATSURL). +func EnsureNATS() string { + cfg := ReadConfig() + natsOnce.Do(func() { + if cfg.NATSURL == "" { + startNATSTestContainer(cfg) + } + }) + return cfg.NATSURL +} + +func startNATSTestContainer(cfg *Config) { + ctx := context.Background() + + req := testcontainers.ContainerRequest{ + Image: "nats:2.10-alpine", + ExposedPorts: []string{"4222/tcp"}, + Cmd: []string{"-js"}, + WaitingFor: wait.ForListeningPort("4222/tcp"), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + panic(err) + } + + endpoint, err := container.PortEndpoint(ctx, "4222/tcp", "nats") + if err != nil { + panic(err) + } + log.Printf("NATS running at %s", endpoint) + cfg.NATSURL = endpoint + cfg.cleanupFns = append(cfg.cleanupFns, func() { + if err := container.Terminate(ctx); err != nil { + log.Printf("failed to terminate nats container: %s", err) + } + }) +} diff --git a/internal/util/testinfra/testinfra.go b/internal/util/testinfra/testinfra.go index 6b4ce3f04..411e95fb9 100644 --- a/internal/util/testinfra/testinfra.go +++ b/internal/util/testinfra/testinfra.go @@ -27,6 +27,7 @@ type Config struct { LocalStackURL string RabbitMQURL string KafkaURL string + NATSURL string MockServerURL string GCPURL string AzureSBConnString string @@ -67,6 +68,10 @@ func initConfig() { if !strings.Contains(mockServerURL, "http://") { mockServerURL = "http://" + mockServerURL } + natsURL := v.GetString("TEST_NATS_URL") + if natsURL != "" && !strings.HasPrefix(natsURL, "nats://") && !strings.HasPrefix(natsURL, "tls://") { + natsURL = "nats://" + natsURL + } cfg = &Config{ TestInfra: v.GetBool("TESTINFRA"), TestAzure: v.GetBool("TESTAZURE"), @@ -77,6 +82,7 @@ func initConfig() { AzureSBConnString: v.GetString("TEST_AZURE_SB_CONNSTRING"), RabbitMQURL: rabbitmqURL, KafkaURL: v.GetString("TEST_KAFKA_URL"), + NATSURL: natsURL, MockServerURL: mockServerURL, } return @@ -92,6 +98,7 @@ func initConfig() { AzureSBConnString: "", RabbitMQURL: "", KafkaURL: "", + NATSURL: "", MockServerURL: "", } } diff --git a/internal/util/testutil/nats.go b/internal/util/testutil/nats.go new file mode 100644 index 000000000..8a3bea1a3 --- /dev/null +++ b/internal/util/testutil/nats.go @@ -0,0 +1,91 @@ +package testutil + +import ( + "context" + "errors" + "strings" + + "github.com/hookdeck/outpost/internal/mqs" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// DeclareTestNATSInfrastructure creates the JetStream stream + durable +// consumer named in cfg.Accounts[*] so the NATS queue driver can connect. +// Each account is set up with a single subject ".events". +func DeclareTestNATSInfrastructure(ctx context.Context, cfg *mqs.NATSConfig) error { + servers := strings.Join(cfg.Servers, ",") + nc, err := nats.Connect(servers) + if err != nil { + return err + } + defer nc.Close() + + js, err := jetstream.New(nc) + if err != nil { + return err + } + + for _, acc := range cfg.Accounts { + subject := acc.Stream + ".events" + stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: acc.Stream, + Subjects: []string{subject}, + Retention: jetstream.WorkQueuePolicy, + Storage: jetstream.MemoryStorage, + }) + if err != nil { + return err + } + if _, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Durable: acc.Consumer, + AckPolicy: jetstream.AckExplicitPolicy, + FilterSubject: subject, + }); err != nil { + return err + } + } + return nil +} + +// TeardownTestNATSInfrastructure removes the streams created by Declare. +func TeardownTestNATSInfrastructure(ctx context.Context, cfg *mqs.NATSConfig) error { + servers := strings.Join(cfg.Servers, ",") + nc, err := nats.Connect(servers) + if err != nil { + return err + } + defer nc.Close() + + js, err := jetstream.New(nc) + if err != nil { + return err + } + + for _, acc := range cfg.Accounts { + if err := js.DeleteStream(ctx, acc.Stream); err != nil { + // Best-effort teardown; ignore "not found" so re-runs don't fail. + if !errors.Is(err, jetstream.ErrStreamNotFound) { + return err + } + } + } + return nil +} + +// PublishToNATSStream publishes a JSON payload to a stream's events subject. +// Used by tests to inject events for the queue driver to consume. +func PublishToNATSStream(ctx context.Context, servers []string, stream string, payload []byte) error { + nc, err := nats.Connect(strings.Join(servers, ",")) + if err != nil { + return err + } + defer nc.Close() + + js, err := jetstream.New(nc) + if err != nil { + return err + } + _, err = js.Publish(ctx, stream+".events", payload) + return err +}