Skip to content

feat(publishmq): NATS JetStream source with per-account multi-tenancy#910

Open
michaeldoehler wants to merge 7 commits into
hookdeck:mainfrom
michaeldoehler:feat/publishmq-nats-jetstream
Open

feat(publishmq): NATS JetStream source with per-account multi-tenancy#910
michaeldoehler wants to merge 7 commits into
hookdeck:mainfrom
michaeldoehler:feat/publishmq-nats-jetstream

Conversation

@michaeldoehler
Copy link
Copy Markdown

Adds NATS JetStream as a publish-mq source for Outpost. Outpost reads
events from one or more pre-provisioned JetStream consumers; streams
and consumers are operator-owned. The driver is built for the common
SaaS pattern of one NATS Account per Outpost tenant.

Why

NATS JetStream is a popular event bus in self-hosted Kubernetes / on-prem
setups, and a natural source for Outpost's webhook-fanout role. The driver
slots into the existing PublishMQ interface alongside RabbitMQ, AWS SQS,
GCP Pub/Sub and Azure Service Bus.

Scope is intentionally limited to publish-mq only — Outpost does not run
its internal delivery/log queues on NATS. That keeps the surface area
small and avoids dragging JetStream into Outpost's auto-provisioning
contract.

Design highlights

  • Multi-account: each + "PublishNATSAccountConfig" + holds its own credentials
    file and is dialled on its own NATS connection. Pull loops run in
    parallel and feed into a single + "Subscription.Receive" + channel.
  • Tenant isolation: when an account sets + "tenant_id" + (recommended),
    Outpost overrides the payload's + "tenant_id" + so a publisher with creds
    for Account A can only produce events for the mapped tenant.
  • Dynamic accounts: an optional + "accounts_dir" + is watched via
    + "fsnotify" + with a 250ms debounce. New tenant subdirectories trigger a
    new connection without restarting Outpost; removed dirs drain the
    connection. Provisioning flow: mint NATS account JWT → push to resolver
    → drop + "meta.yaml" + + + ".creds" + into + "accounts_dir" + → Outpost picks it up.
  • Auth: + "credentials_file" + (.creds) is the primary mode. It is
    optional — empty means no-auth (valid for trusted-network setups or
    token-via-URL).
  • Operator-owned infra: streams and durable consumers must exist
    before Outpost starts. + "Init" + verifies both and fails loudly on any
    missing piece. + "AckWait" + / + "MaxDeliver" + are configured on the consumer.
  • No publish from Outpost: + "Publish()" + intentionally returns an error;
    JetStream is read-only from Outpost's side.

Configuration

+ "```yaml" +
publishmq:
nats:
servers:
- nats://nats:4222
accounts_dir: /etc/outpost/nats-accounts
accounts:
- name: acme
credentials_file: /etc/outpost/acme.creds
stream: events
consumer: outpost
tenant_id: acme
+ "```" +

Env vars: + "PUBLISH_NATS_SERVERS" + (comma-separated cluster URLs),
+ "PUBLISH_NATS_ACCOUNTS_DIR" + .

Accounts-dir layout:

+ "```" +
/etc/outpost/nats-accounts/
├── acme/
│ ├── user.creds # optional NATS .creds (JWT + NKey seed)
│ └── meta.yaml # name/stream/consumer/tenant_id
└── globex/
├── user.creds
└── meta.yaml
+ "```" +

Full guide added at + "docs/content/publishing/publish-from-nats.mdoc" + .

Commit walk-through

  • + "feat(mqs): add NATS JetStream queue driver" + — driver skeleton,
    + "NATSConfig" + / + "NATSAccountConfig" + , pull-consumer subscription, tenant
    override, wired into + "mqs.QueueConfig" + and + "NewQueue" + .
  • + "feat(mqs): NATS accounts directory watcher" + — refactors connection
    storage to a name-keyed map and adds the + "fsnotify" + -based watcher
    with dynamic add/remove.
  • + "feat(config): wire PublishNATSConfig into publishmq" + — config
    surface area plus + "GetInfraType" + / + "GetQueueConfig" + branches.
  • + "feat(cmd/publish): NATS JetStream dev publish helper" + — adds
    + "method=nats" + to the existing publish dev service.
  • + "test(mqs): NATS JetStream integration tests + testinfra" + — four
    integration tests (basic, tenant-override, multi-account, watcher),
    + "nats:2.10-alpine" + testcontainer, declare/teardown helpers.
  • + "docs: publish-from-nats guide + env/yaml examples + compose" +
    user-facing guide, + ".env.example" + , + ".outpost.yaml.example" + ,
    + "contributing/mq.md" + update, and a docker-compose example with
    + "nats:2.10-alpine" + + helper script.

Verification

  • + "go build ./..." + + + "go vet ./..." + clean.
  • + "go test -run TestIntegrationMQ_NATS -count=1 ./internal/mqs/..." +
    passes locally (testcontainer + "nats:2.10-alpine -js" + ):
    • + "TestIntegrationMQ_NATS" + — basic publish/receive/ack
    • + "TestIntegrationMQ_NATS_TenantOverride" + — payload tenant_id is
      overridden by account config
    • + "TestIntegrationMQ_NATS_MultiAccount" + — two accounts consumed in
      parallel, each tagged with its own tenant
    • + "TestIntegrationMQ_NATS_AccountsDir" + — watcher picks up an
      account dir created after Init

Dependencies

Promotes + "github.com/nats-io/nats.go" + from indirect to direct (already
present in + "go.sum" + via transitive deps; no version bump). Promotes
+ "github.com/fsnotify/fsnotify" + from indirect to direct.

Out of scope

  • Internal MQ support (delivery/log). Could be added later but is a
    separate decision around JetStream's storage/replication model.
  • Kafka publish-mq (separately tracked in + "contributing/mq.md" + ).
  • Credentials rotation without re-creating the account directory.

Happy to split commits differently or rework any part — the staging is
designed to keep each commit independently reviewable.

Michael Döhler added 6 commits May 23, 2026 00:56
Adds a publish-mq-only driver for NATS JetStream. Outpost reads events
from one or more pre-provisioned JetStream consumers via a pull-based
multiplexed subscription.

Key design points:
- Multi-account: one NATS Account per Outpost tenant. Each account gets
  its own connection and pull loop; messages are merged into a single
  Receive channel.
- Account.TenantID overrides the tenant_id field on incoming payloads,
  so an Account can only ever produce events for its mapped tenant.
- Stream and Consumer are operator-provisioned. Outpost only verifies
  existence on Init and fails loudly if either is missing.
- Auth via credentials_file (.creds, Operator/JWT-resolver mode).
- ConcurrentSubscription: pull concurrency is bounded by PullMaxMessages
  per account; upstream consumer skips its own semaphore.
- Publish() is intentionally unimplemented; JetStream is read-only here.
Adds dynamic add/remove of NATS Accounts at runtime via a watched
directory. Layout under accounts_dir:

  <account-name>/
    user.creds      NATS .creds (JWT + NKey seed)
    meta.yaml       stream/consumer/tenant_id metadata

The watcher debounces filesystem events (250ms) and triggers a
reconcile against the current connection set. Static accounts from
config.Accounts are preserved across reconciles; only dir-derived
accounts are added or removed.

Refactors NATSQueue internals to keep connections in a map keyed by
account name, with safe add/remove that also starts/stops the
per-account pump when a subscription is active.
Adds PublishNATSConfig + PublishNATSAccountConfig to the PublishMQ
config, plus GetInfraType / GetQueueConfig branches that map them
onto the mqs.NATSConfig the driver expects.

Static account lists and the watched accounts_dir can be used
independently or combined; the queue treats them as additive.
Adds a small NATS publisher to the local dev publish service, matching
the existing rabbitmq/aws_sqs/gcp_pubsub helpers. Reads URL/subject/
stream/consumer/creds from environment with defaults matching the
docker-compose example.

declareNATS() creates a work-queue stream + durable consumer so a
fresh local NATS server is usable in seconds.
Adds four integration tests covering the NATS driver:

- TestIntegrationMQ_NATS: basic publish + receive + ack via JetStream
- TestIntegrationMQ_NATS_TenantOverride: account.TenantID rewrites
  the payload's tenant_id field even when payload contains a value
- TestIntegrationMQ_NATS_MultiAccount: two accounts consumed in
  parallel, each tagged with its own tenant_id
- TestIntegrationMQ_NATS_AccountsDir: directory watcher picks up
  an account directory created after Init and starts consuming
  from it within a few seconds

Supporting infrastructure:

- internal/util/testinfra/nats.go: nats:2.10-alpine testcontainer
  with JetStream enabled
- internal/util/testutil/nats.go: stream/consumer declare + teardown
  helpers plus a small publish helper for injecting test events
- testinfra.Config.NATSURL + TEST_NATS_URL in .env.test

Drive-by: relax NATSAccountConfig validation so credentials_file is
optional (no-auth and token-via-URL deployments are legitimate);
the accounts-dir loader only defaults to user.creds when the file
actually exists in the account directory.
- docs/content/publishing/publish-from-nats.mdoc: new guide covering
  message structure, prerequisites, configuration (env + yaml), the
  accounts-dir layout, and the multi-tenancy / NATS-account pattern.
- .env.example: PUBLISH_NATS_SERVERS / PUBLISH_NATS_ACCOUNTS_DIR.
- .outpost.yaml.example: full publishmq.nats block under publishmq.
- contributing/mq.md: tick NATS in the supported-MQ list and add
  a section describing scope, configuration, infra ownership, and
  retry/visibility behavior.
- examples/docker-compose/compose-publish-nats.yml + helper script:
  single-node JetStream container for local development, paired with
  the existing publish dev service (method=nats).
Copilot AI review requested due to automatic review settings May 22, 2026 23:11
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds NATS JetStream as a publish-mq source, including runtime account discovery via a watched accounts directory, integration test coverage, and developer tooling/docs for local setup.

Changes:

  • Implement NATS JetStream queue driver with multi-account support and optional accounts_dir hot-add/remove.
  • Add test infrastructure utilities (testcontainers + JetStream provisioning helpers) and integration tests for NATS behaviors.
  • Extend publishmq config, docs, and examples to support NATS JetStream.

Reviewed changes

Copilot reviewed 19 out of 20 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
internal/util/testutil/nats.go Test helper to provision/teardown JetStream streams/consumers and publish test messages.
internal/util/testinfra/testinfra.go Adds TEST_NATS_URL support to test infra config.
internal/util/testinfra/nats.go Testcontainers-based NATS JetStream bring-up + per-test queue config generator.
internal/mqs/queue_nats_test.go Integration tests for NATS driver: basic consume/ack, tenant overrides, multi-account, accounts_dir watcher.
internal/mqs/queue_nats.go New NATS JetStream queue driver implementation with per-account connections and multiplexed subscription.
internal/mqs/queue.go Wires NATS into NewQueue selection logic.
internal/mqs/nats_accounts.go Accounts directory loader + fsnotify watcher with debounce.
internal/config/publishmq.go Adds publishmq NATS config schema and mapping to mqs.QueueConfig.
go.mod Adds direct dependencies for fsnotify and nats.go.
go.sum Adds checksums for new NATS dependencies.
examples/docker-compose/start-nats-publish.sh Convenience script to start a local JetStream container for publishing examples.
examples/docker-compose/compose-publish-nats.yml Docker Compose service definition for local NATS JetStream.
docs/content/publishing/publish-from-nats.mdoc End-user documentation for configuring publish-from-NATS JetStream.
contributing/mq.md Documents NATS JetStream support in contributor MQ guide.
cmd/publish/publish_nats.go Adds dev publish/declare helper for NATS JetStream.
cmd/publish/publish_handler.go Routes method=nats publish requests to NATS implementation.
cmd/publish/declare_handler.go Routes method=nats declare requests to NATS implementation.
.outpost.yaml.example Adds example publishmq NATS config block.
.env.test Adds TEST_NATS_URL placeholder.
.env.example Adds example publishmq NATS env vars.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +432 to +438
jmsg, err := iter.Next()
if err != nil {
if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
return
}
continue
}
Comment on lines +84 to +89
cfg.NATSURL = endpoint
cfg.cleanupFns = append(cfg.cleanupFns, func() {
if err := container.Terminate(ctx); err != nil {
log.Printf("failed to terminate nats container: %s", err)
}
})
Comment thread internal/mqs/nats_accounts.go Outdated
accountDir := filepath.Join(dir, e.Name())
metaPath := filepath.Join(accountDir, "meta.yaml")
if _, err := os.Stat(metaPath); err != nil {
continue
Comment thread internal/mqs/nats_accounts.go Outdated
Comment on lines +146 to +155
defer w.w.Close()

var (
timer *time.Timer
timerC <-chan time.Time
armTimer = func() {
if timer != nil {
timer.Stop()
}
timer = time.NewTimer(debounceWindow)
Comment on lines +65 to +70
if err := js.DeleteStream(ctx, acc.Stream); err != nil {
// Best-effort teardown; ignore "not found" so re-runs don't fail.
if !strings.Contains(err.Error(), "stream not found") {
return err
}
}
Comment thread internal/util/testinfra/testinfra.go Outdated
mockServerURL = "http://" + mockServerURL
}
natsURL := v.GetString("TEST_NATS_URL")
if natsURL != "" && !strings.Contains(natsURL, "nats://") {
Comment thread internal/mqs/queue_nats.go Outdated
if _, alreadyHave := current[acc.Name]; alreadyHave {
continue
}
_ = q.addAccount(context.Background(), acc)
- queue_nats.go pump: add 250ms backoff between non-fatal iter.Next()
  errors so the loop doesn't busy-spin during transient consumer
  unavailability (e.g. leadership change, connection blip).
- queue_nats.go reconcileFromDir: log addAccount failures instead of
  silently dropping them, so operators can spot bad creds / missing
  streams / unreachable servers when a tenant directory lands.
- nats_accounts.go loadAccountsFromDir: surface non-ENOENT os.Stat
  errors (permission, transient IO) instead of treating them as
  'subdirectory has no meta.yaml'.
- nats_accounts.go watcher: switch to a single reusable timer with
  Reset/Stop and proper channel drain, eliminating per-event timer
  allocations and the Stop-without-drain edge case.
- testutil/nats.go: replace strings.Contains substring matching on
  'stream not found' with errors.Is(err, jetstream.ErrStreamNotFound).
- testinfra/nats.go EnsureNATS: move the cfg.NATSURL check inside
  sync.Once.Do to close a data race between concurrent t.Parallel()
  callers and the container-start write path. Verified with -race.
- testinfra/testinfra.go: use strings.HasPrefix (nats:// and tls://)
  instead of strings.Contains for the scheme normalization.
@michaeldoehler
Copy link
Copy Markdown
Author

Thanks for the review. Pushed 59fd97b8 addressing six of the seven points:

  • queue_nats.go:438 busy-spin — added a 250ms backoff between non-fatal iter.Next() errors. Loop stays responsive to shutdown via select on s.done.
  • nats_accounts.go:49 silent stat errors — only os.ErrNotExist is treated as "skip"; other errors propagate so operators see permission/IO issues.
  • nats_accounts.go:155 timer leak — switched to a single reusable timer with Reset/Stop + proper channel drain.
  • testutil/nats.go:70 substring error match — replaced with errors.Is(err, jetstream.ErrStreamNotFound).
  • testinfra.go:72 scheme detection — now strings.HasPrefix checking nats:// and tls://.
  • queue_nats.go:278 dropped reconcile errorsaddAccount failures during directory reconciliation are now logged with the account name so bad creds / missing streams don't silently disappear.

On the remaining point:

  • testinfra/nats.go:89 unsynchronized cfg.cleanupFns append — the race detector actually flagged a sibling issue: a classic double-checked-locking race in EnsureNATS (read of cfg.NATSURL outside sync.Once.Do). That's now fixed by moving the check inside Once.Do, and tests pass under -race. The broader concern about concurrent append(cfg.cleanupFns, …) across different Ensure* helpers (RabbitMQ + Kafka + NATS in parallel) is a real pre-existing pattern in this file — same shape in rabbitmq.go, kafka.go, etc. Happy to follow up with a separate PR that mutex-protects Config.cleanupFns (or replaces it with a registration method) so all the test-infra helpers benefit consistently — felt out of scope for this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants