Skip to content
36 changes: 8 additions & 28 deletions adapter/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"math/big"
"net"
"net/http"
"net/http/httputil"
"net/url"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -294,33 +292,15 @@ func (d *DynamoDBServer) Stop() {
// (either proxied or an error response was written), false if the request
// should be handled locally (i.e. this node is the leader or no leader map is
// configured).
//
// Serving reads or writes locally on a follower would expose G2-item-realtime
// stale reads, so every follower request is forwarded to the leader.
func (d *DynamoDBServer) proxyToLeader(w http.ResponseWriter, r *http.Request) bool {
if len(d.leaderDynamo) == 0 || d.coordinator == nil {
return false
}
if d.coordinator.IsLeader() {
return false
}
// This node is a follower. All requests must be forwarded to the leader to
// preserve linearizability — serving reads or writes locally on a follower
// causes stale-read anomalies (G2-item-realtime).
leader := d.coordinator.RaftLeader()
if leader == "" {
writeDynamoError(w, http.StatusServiceUnavailable, dynamoErrInternal, "no raft leader currently available")
return true
}
targetAddr, ok := d.leaderDynamo[leader]
if !ok || strings.TrimSpace(targetAddr) == "" {
writeDynamoError(w, http.StatusServiceUnavailable, dynamoErrInternal, "leader dynamo address not found")
return true
}
target := &url.URL{Scheme: "http", Host: targetAddr}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.ErrorHandler = func(rw http.ResponseWriter, _ *http.Request, err error) {
writeDynamoError(rw, http.StatusServiceUnavailable, dynamoErrInternal, "leader proxy error: "+err.Error())
}
proxy.ServeHTTP(w, r)
return true
return proxyHTTPRequestToLeader(d.coordinator, d.leaderDynamo, dynamoLeaderProxyErrorWriter, w, r)
}

func dynamoLeaderProxyErrorWriter(w http.ResponseWriter, status int, message string) {
writeDynamoError(w, status, dynamoErrInternal, message)
}

func (d *DynamoDBServer) handle(w http.ResponseWriter, r *http.Request) {
Expand Down
64 changes: 64 additions & 0 deletions adapter/leader_http_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package adapter

import (
"net/http"
"net/http/httputil"
"net/url"
"strings"

"github.com/bootjp/elastickv/kv"
)

// httpLeaderErrorWriter writes an adapter-specific error envelope (JSON for
// DynamoDB/SQS, XML for S3, RESP for Redis, ...) when the HTTP leader proxy
// cannot forward a follower request. The adapter owns the shape of the body;
// this helper only decides when to call it.
type httpLeaderErrorWriter func(w http.ResponseWriter, status int, message string)

// proxyHTTPRequestToLeader forwards r to the current Raft leader's HTTP
// adapter endpoint when this node is a follower. It returns true when the
// request was handled (either proxied or an error body was written) and
// false when the caller should serve it locally (no leader map configured,
// no coordinator, or this node is the leader).
//
// Error paths:
// 1. no known Raft leader → 503 via errWriter("no raft leader currently available")
// 2. leader id missing from leaderMap → 503 via errWriter("leader address not found")
// 3. reverse-proxy dial/copy failure → 503 via errWriter("leader proxy error: <reason>")
//
// leaderMap keys are Raft addresses; values are the matching adapter HTTP
// addresses exported on that same node.
func proxyHTTPRequestToLeader(
coordinator kv.Coordinator,
leaderMap map[string]string,
errWriter httpLeaderErrorWriter,
w http.ResponseWriter,
r *http.Request,
) bool {
if len(leaderMap) == 0 || coordinator == nil {
return false
}
if coordinator.IsLeader() {
return false
}
// Follower ingress: forward to the leader so reads and writes both see a
// single serialization point. Serving locally on a follower would expose
// G2-item-realtime stale reads.
leader := coordinator.RaftLeader()
if leader == "" {
errWriter(w, http.StatusServiceUnavailable, "no raft leader currently available")
return true
}
targetAddr, ok := leaderMap[leader]
if !ok || strings.TrimSpace(targetAddr) == "" {
errWriter(w, http.StatusServiceUnavailable, "leader address not found")
return true
}
target := &url.URL{Scheme: "http", Host: targetAddr}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.ErrorHandler = func(rw http.ResponseWriter, _ *http.Request, err error) {
errWriter(rw, http.StatusServiceUnavailable, "leader proxy error: "+err.Error())
}
proxy.ServeHTTP(w, r)
return true
}
85 changes: 9 additions & 76 deletions adapter/s3_auth.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package adapter

import (
"context"
"crypto/subtle"
"net/http"
"net/url"
Expand All @@ -12,11 +11,12 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/bootjp/elastickv/kv"
"github.com/cockroachdb/errors"
)

const (
s3SigV4Algorithm = "AWS4-HMAC-SHA256"
// s3SigV4Algorithm is an alias of sigv4Algorithm kept for call-site
// readability in the S3 adapter.
s3SigV4Algorithm = sigv4Algorithm
// s3UnsignedPayload / s3Streaming* are sentinel values that AWS SDKs may
// place in X-Amz-Content-Sha256. None of them are a literal SHA-256 hash
// of the body, so the PUT pipeline must skip hash validation when it sees
Expand All @@ -27,8 +27,8 @@ const (
s3StreamingSignedPayload = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
s3StreamingSignedPayloadTrailer = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER"
s3EmptyPayloadHash = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
s3DateHeaderFormat = "20060102T150405Z"
s3RequestTimeMaxSkew = 15 * time.Minute
s3DateHeaderFormat = sigv4DateHeaderFormat
s3RequestTimeMaxSkew = sigv4RequestTimeMaxSkew
)

// isS3PayloadMarker reports whether the given X-Amz-Content-Sha256 value is a
Expand Down Expand Up @@ -79,14 +79,6 @@ type s3AuthError struct {
Message string
}

type s3AuthorizationHeader struct {
Algorithm string
AccessKeyID string
Date string
Region string
Service string
}

func WithS3Region(region string) S3ServerOption {
return func(server *S3Server) {
if server == nil || strings.TrimSpace(region) == "" {
Expand Down Expand Up @@ -151,7 +143,7 @@ func (s *S3Server) authorizeRequest(r *http.Request) *s3AuthError {
}
}

parsed, err := parseS3AuthorizationHeader(authHeader)
parsed, err := parseSigV4AuthorizationHeader(authHeader)
if err != nil {
return &s3AuthError{
Status: http.StatusForbidden,
Expand Down Expand Up @@ -224,7 +216,7 @@ func (s *S3Server) authorizeRequest(r *http.Request) *s3AuthError {
}
}

expectedAuth, err := buildS3AuthorizationHeader(r, parsed.AccessKeyID, secretAccessKey, s.effectiveRegion(), signingTime, payloadHash)
expectedAuth, err := buildSigV4AuthorizationHeader(r, parsed.AccessKeyID, secretAccessKey, "s3", s.effectiveRegion(), signingTime, payloadHash)
if err != nil {
return &s3AuthError{
Status: http.StatusForbidden,
Expand All @@ -235,8 +227,8 @@ func (s *S3Server) authorizeRequest(r *http.Request) *s3AuthError {
// Compare only the Signature component to avoid false rejections caused by
// equivalent Authorization headers that differ in whitespace or parameter
// ordering (but carry the same cryptographic signature).
gotSig := extractS3Signature(authHeader)
expectedSig := extractS3Signature(expectedAuth)
gotSig := extractSigV4Signature(authHeader)
expectedSig := extractSigV4Signature(expectedAuth)
if gotSig == "" || subtle.ConstantTimeCompare([]byte(gotSig), []byte(expectedSig)) != 1 {
return &s3AuthError{
Status: http.StatusForbidden,
Expand All @@ -247,65 +239,6 @@ func (s *S3Server) authorizeRequest(r *http.Request) *s3AuthError {
return nil
}

func buildS3AuthorizationHeader(r *http.Request, accessKeyID string, secretAccessKey string, region string, signingTime time.Time, payloadHash string) (string, error) {
if r == nil {
return "", errors.New("request is required")
}
clone := r.Clone(context.Background())
clone.Host = r.Host
clone.Header = clone.Header.Clone()
clone.Header.Del("Authorization")

signer := v4.NewSigner(func(opts *v4.SignerOptions) {
opts.DisableURIPathEscaping = true
})
creds := aws.Credentials{
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
Source: "elastickv-s3",
}
if err := signer.SignHTTP(context.Background(), creds, clone, payloadHash, "s3", region, signingTime.UTC()); err != nil {
return "", errors.WithStack(err)
}
return strings.TrimSpace(clone.Header.Get("Authorization")), nil
}

//nolint:cyclop // AWS authorization parsing is branchy because malformed scopes must be rejected precisely.
func parseS3AuthorizationHeader(raw string) (s3AuthorizationHeader, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return s3AuthorizationHeader{}, errors.New("authorization header is required")
}
algorithm, rest, ok := strings.Cut(raw, " ")
if !ok {
return s3AuthorizationHeader{}, errors.New("authorization header is malformed")
}
out := s3AuthorizationHeader{Algorithm: strings.TrimSpace(algorithm)}
params := strings.Split(rest, ",")
for _, param := range params {
key, value, ok := strings.Cut(strings.TrimSpace(param), "=")
if !ok {
continue
}
if key != "Credential" {
continue
}
scope := strings.Split(value, "/")
if len(scope) != 5 || scope[4] != "aws4_request" {
return s3AuthorizationHeader{}, errors.New("credential scope is malformed")
}
out.AccessKeyID = scope[0]
out.Date = scope[1]
out.Region = scope[2]
out.Service = scope[3]
break
}
if out.AccessKeyID == "" || out.Date == "" || out.Region == "" || out.Service == "" {
return s3AuthorizationHeader{}, errors.New("credential scope is required")
}
return out, nil
}

func normalizeS3PayloadHash(raw string) string {
return strings.TrimSpace(raw)
}
Expand Down
2 changes: 1 addition & 1 deletion adapter/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func newSignedS3Request(
signingTime,
)
require.NoError(t, err)
expectedAuth, err := buildS3AuthorizationHeader(req, testS3AccessKey, testS3SecretKey, testS3Region, signingTime, payloadHash)
expectedAuth, err := buildSigV4AuthorizationHeader(req, testS3AccessKey, testS3SecretKey, "s3", testS3Region, signingTime, payloadHash)
require.NoError(t, err)
require.Equal(t, strings.TrimSpace(req.Header.Get("Authorization")), expectedAuth)
return req
Expand Down
Loading
Loading