Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
330e018
docs: propose SQS-compatible adapter design
bootjp Apr 23, 2026
e474ecb
Merge branch 'main' into feat/sqs_compatible_adapter
bootjp Apr 23, 2026
4f26c07
docs: update stale S3 design doc paths after reorg
bootjp Apr 23, 2026
559dcbe
Merge branch 'main' into feat/sqs_compatible_adapter
bootjp Apr 23, 2026
c615838
docs(sqs): add operator console UI design section
bootjp Apr 24, 2026
e098bcf
docs(sqs): address codex + coderabbit review feedback
bootjp Apr 24, 2026
4206e63
docs(sqs): address codex review round 3
bootjp Apr 24, 2026
559d1d4
docs(sqs): address codex review round 4
bootjp Apr 24, 2026
feec378
docs(sqs): address codex review round 5
bootjp Apr 24, 2026
1d24525
docs(sqs): address codex review round 6
bootjp Apr 24, 2026
9f7e79f
feat(sqs): scaffold SQS-compatible adapter
bootjp Apr 23, 2026
e46ada9
feat(sqs): wire SQS adapter into main.go and test harness
bootjp Apr 24, 2026
9490452
feat(sqs): implement queue catalog CRUD
bootjp Apr 24, 2026
61fa917
feat(sqs): SigV4 verifier shared with S3, wire static credentials
bootjp Apr 24, 2026
387a19c
feat(sqs): SendMessage, ReceiveMessage, DeleteMessage, ChangeMessageV…
bootjp Apr 24, 2026
4bae6d0
fix(sqs): close receipt-token and visibility races under OCC
bootjp Apr 24, 2026
6685fbf
fix(sqs): make DeleteMessage idempotent on stale receipt handles
bootjp Apr 24, 2026
1846fd0
fix(sqs): pin OCC StartTS to validated-read snapshot
bootjp Apr 24, 2026
13e75f4
fix(sqs): pin catalog OCC StartTS to validated-read snapshot
bootjp Apr 24, 2026
c692465
feat(sqs): SQS-compatible adapter Milestone 1 (#610)
bootjp Apr 24, 2026
3b1d429
fix(sqs): implement long polling + require explicit FifoQueue=true
bootjp Apr 24, 2026
0d69c90
fix(sqs): address codex round 9 findings
bootjp Apr 24, 2026
e9f2d0c
Merge branch 'main' into feat/sqs_compatible_adapter
bootjp Apr 24, 2026
698270e
fix(sqs): address codex + coderabbit round 10 findings
bootjp Apr 24, 2026
d2d5696
fix(sqs): fence SendMessage and ChangeMessageVisibility against queue…
bootjp Apr 24, 2026
b9d4727
fix(sqs): address codex review 4171728561 (round 12)
bootjp Apr 24, 2026
94e3a3f
fix(sqs): propagate non-retryable receive rotation failures
bootjp Apr 24, 2026
55fed69
fix(sqs): address codex round 14 findings
bootjp Apr 24, 2026
fa7fdbf
fix(sqs): close DeleteQueue races on receive loop and ChangeMessageVi…
bootjp Apr 24, 2026
818c27c
fix(sqs): reject empty MessageBody + enforce retention on receive
bootjp Apr 24, 2026
2ce85e0
Merge branch 'main' into feat/sqs_compatible_adapter
bootjp Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 8 additions & 28 deletions adapter/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"math/big"
"net"
"net/http"
"net/http/httputil"
"net/url"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -294,33 +292,15 @@ func (d *DynamoDBServer) Stop() {
// (either proxied or an error response was written), false if the request
// should be handled locally (i.e. this node is the leader or no leader map is
// configured).
//
// Serving reads or writes locally on a follower would expose G2-item-realtime
// stale reads, so every follower request is forwarded to the leader.
func (d *DynamoDBServer) proxyToLeader(w http.ResponseWriter, r *http.Request) bool {
if len(d.leaderDynamo) == 0 || d.coordinator == nil {
return false
}
if d.coordinator.IsLeader() {
return false
}
// This node is a follower. All requests must be forwarded to the leader to
// preserve linearizability — serving reads or writes locally on a follower
// causes stale-read anomalies (G2-item-realtime).
leader := d.coordinator.RaftLeader()
if leader == "" {
writeDynamoError(w, http.StatusServiceUnavailable, dynamoErrInternal, "no raft leader currently available")
return true
}
targetAddr, ok := d.leaderDynamo[leader]
if !ok || strings.TrimSpace(targetAddr) == "" {
writeDynamoError(w, http.StatusServiceUnavailable, dynamoErrInternal, "leader dynamo address not found")
return true
}
target := &url.URL{Scheme: "http", Host: targetAddr}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.ErrorHandler = func(rw http.ResponseWriter, _ *http.Request, err error) {
writeDynamoError(rw, http.StatusServiceUnavailable, dynamoErrInternal, "leader proxy error: "+err.Error())
}
proxy.ServeHTTP(w, r)
return true
return proxyHTTPRequestToLeader(d.coordinator, d.leaderDynamo, dynamoLeaderProxyErrorWriter, w, r)
}

func dynamoLeaderProxyErrorWriter(w http.ResponseWriter, status int, message string) {
writeDynamoError(w, status, dynamoErrInternal, message)
}

func (d *DynamoDBServer) handle(w http.ResponseWriter, r *http.Request) {
Expand Down
64 changes: 64 additions & 0 deletions adapter/leader_http_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package adapter

import (
"net/http"
"net/http/httputil"
"net/url"
"strings"

"github.com/bootjp/elastickv/kv"
)

// httpLeaderErrorWriter writes an adapter-specific error envelope (JSON for
// DynamoDB/SQS, XML for S3, RESP for Redis, ...) when the HTTP leader proxy
// cannot forward a follower request. The adapter owns the shape of the body;
// this helper only decides when to call it.
type httpLeaderErrorWriter func(w http.ResponseWriter, status int, message string)

// proxyHTTPRequestToLeader forwards r to the current Raft leader's HTTP
// adapter endpoint when this node is a follower. It returns true when the
// request was handled (either proxied or an error body was written) and
// false when the caller should serve it locally (no leader map configured,
// no coordinator, or this node is the leader).
//
// Error paths:
// 1. no known Raft leader → 503 via errWriter("no raft leader currently available")
// 2. leader id missing from leaderMap → 503 via errWriter("leader address not found")
// 3. reverse-proxy dial/copy failure → 503 via errWriter("leader proxy error: <reason>")
//
// leaderMap keys are Raft addresses; values are the matching adapter HTTP
// addresses exported on that same node.
func proxyHTTPRequestToLeader(
coordinator kv.Coordinator,
leaderMap map[string]string,
errWriter httpLeaderErrorWriter,
w http.ResponseWriter,
r *http.Request,
) bool {
if len(leaderMap) == 0 || coordinator == nil {
return false
}
if coordinator.IsLeader() {
return false
}
// Follower ingress: forward to the leader so reads and writes both see a
// single serialization point. Serving locally on a follower would expose
// G2-item-realtime stale reads.
leader := coordinator.RaftLeader()
if leader == "" {
errWriter(w, http.StatusServiceUnavailable, "no raft leader currently available")
return true
}
targetAddr, ok := leaderMap[leader]
if !ok || strings.TrimSpace(targetAddr) == "" {
errWriter(w, http.StatusServiceUnavailable, "leader address not found")
return true
}
target := &url.URL{Scheme: "http", Host: targetAddr}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.ErrorHandler = func(rw http.ResponseWriter, _ *http.Request, err error) {
errWriter(rw, http.StatusServiceUnavailable, "leader proxy error: "+err.Error())
}
proxy.ServeHTTP(w, r)
return true
}
85 changes: 9 additions & 76 deletions adapter/s3_auth.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package adapter

import (
"context"
"crypto/subtle"
"net/http"
"net/url"
Expand All @@ -12,11 +11,12 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/bootjp/elastickv/kv"
"github.com/cockroachdb/errors"
)

const (
s3SigV4Algorithm = "AWS4-HMAC-SHA256"
// s3SigV4Algorithm is an alias of sigv4Algorithm kept for call-site
// readability in the S3 adapter.
s3SigV4Algorithm = sigv4Algorithm
// s3UnsignedPayload / s3Streaming* are sentinel values that AWS SDKs may
// place in X-Amz-Content-Sha256. None of them are a literal SHA-256 hash
// of the body, so the PUT pipeline must skip hash validation when it sees
Expand All @@ -27,8 +27,8 @@ const (
s3StreamingSignedPayload = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
s3StreamingSignedPayloadTrailer = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER"
s3EmptyPayloadHash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
s3DateHeaderFormat = "20060102T150405Z"
s3RequestTimeMaxSkew = 15 * time.Minute
s3DateHeaderFormat = sigv4DateHeaderFormat
s3RequestTimeMaxSkew = sigv4RequestTimeMaxSkew
)

// isS3PayloadMarker reports whether the given X-Amz-Content-Sha256 value is a
Expand Down Expand Up @@ -79,14 +79,6 @@ type s3AuthError struct {
Message string
}

type s3AuthorizationHeader struct {
Algorithm string
AccessKeyID string
Date string
Region string
Service string
}

func WithS3Region(region string) S3ServerOption {
return func(server *S3Server) {
if server == nil || strings.TrimSpace(region) == "" {
Expand Down Expand Up @@ -151,7 +143,7 @@ func (s *S3Server) authorizeRequest(r *http.Request) *s3AuthError {
}
}

parsed, err := parseS3AuthorizationHeader(authHeader)
parsed, err := parseSigV4AuthorizationHeader(authHeader)
if err != nil {
return &s3AuthError{
Status: http.StatusForbidden,
Expand Down Expand Up @@ -224,7 +216,7 @@ func (s *S3Server) authorizeRequest(r *http.Request) *s3AuthError {
}
}

expectedAuth, err := buildS3AuthorizationHeader(r, parsed.AccessKeyID, secretAccessKey, s.effectiveRegion(), signingTime, payloadHash)
expectedAuth, err := buildSigV4AuthorizationHeader(r, parsed.AccessKeyID, secretAccessKey, "s3", s.effectiveRegion(), signingTime, payloadHash)
if err != nil {
return &s3AuthError{
Status: http.StatusForbidden,
Expand All @@ -235,8 +227,8 @@ func (s *S3Server) authorizeRequest(r *http.Request) *s3AuthError {
// Compare only the Signature component to avoid false rejections caused by
// equivalent Authorization headers that differ in whitespace or parameter
// ordering (but carry the same cryptographic signature).
gotSig := extractS3Signature(authHeader)
expectedSig := extractS3Signature(expectedAuth)
gotSig := extractSigV4Signature(authHeader)
expectedSig := extractSigV4Signature(expectedAuth)
if gotSig == "" || subtle.ConstantTimeCompare([]byte(gotSig), []byte(expectedSig)) != 1 {
return &s3AuthError{
Status: http.StatusForbidden,
Expand All @@ -247,65 +239,6 @@ func (s *S3Server) authorizeRequest(r *http.Request) *s3AuthError {
return nil
}

func buildS3AuthorizationHeader(r *http.Request, accessKeyID string, secretAccessKey string, region string, signingTime time.Time, payloadHash string) (string, error) {
if r == nil {
return "", errors.New("request is required")
}
clone := r.Clone(context.Background())
clone.Host = r.Host
clone.Header = clone.Header.Clone()
clone.Header.Del("Authorization")

signer := v4.NewSigner(func(opts *v4.SignerOptions) {
opts.DisableURIPathEscaping = true
})
creds := aws.Credentials{
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
Source: "elastickv-s3",
}
if err := signer.SignHTTP(context.Background(), creds, clone, payloadHash, "s3", region, signingTime.UTC()); err != nil {
return "", errors.WithStack(err)
}
return strings.TrimSpace(clone.Header.Get("Authorization")), nil
}

//nolint:cyclop // AWS authorization parsing is branchy because malformed scopes must be rejected precisely.
func parseS3AuthorizationHeader(raw string) (s3AuthorizationHeader, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return s3AuthorizationHeader{}, errors.New("authorization header is required")
}
algorithm, rest, ok := strings.Cut(raw, " ")
if !ok {
return s3AuthorizationHeader{}, errors.New("authorization header is malformed")
}
out := s3AuthorizationHeader{Algorithm: strings.TrimSpace(algorithm)}
params := strings.Split(rest, ",")
for _, param := range params {
key, value, ok := strings.Cut(strings.TrimSpace(param), "=")
if !ok {
continue
}
if key != "Credential" {
continue
}
scope := strings.Split(value, "/")
if len(scope) != 5 || scope[4] != "aws4_request" {
return s3AuthorizationHeader{}, errors.New("credential scope is malformed")
}
out.AccessKeyID = scope[0]
out.Date = scope[1]
out.Region = scope[2]
out.Service = scope[3]
break
}
if out.AccessKeyID == "" || out.Date == "" || out.Region == "" || out.Service == "" {
return s3AuthorizationHeader{}, errors.New("credential scope is required")
}
return out, nil
}

func normalizeS3PayloadHash(raw string) string {
return strings.TrimSpace(raw)
}
Expand Down
2 changes: 1 addition & 1 deletion adapter/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func newSignedS3Request(
signingTime,
)
require.NoError(t, err)
expectedAuth, err := buildS3AuthorizationHeader(req, testS3AccessKey, testS3SecretKey, testS3Region, signingTime, payloadHash)
expectedAuth, err := buildSigV4AuthorizationHeader(req, testS3AccessKey, testS3SecretKey, "s3", testS3Region, signingTime, payloadHash)
require.NoError(t, err)
require.Equal(t, strings.TrimSpace(req.Header.Get("Authorization")), expectedAuth)
return req
Expand Down
Loading
Loading