From 40ab7582e4e62309e9b8094515b12b9afd5ca079 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 5 May 2026 01:58:30 +0900 Subject: [PATCH 1/6] feat(sqs): Jepsen HT-FIFO workload (Phase 3.D PR 7b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the Jepsen workload that stresses partitioned-FIFO (HT-FIFO) queues against the three contracts AWS HT-FIFO is supposed to honour even under partition and node-loss faults — within-group ordering, no message loss, no duplicates. Pattern follows aphyr's classic Jepsen RabbitMQ analysis (the user pointed at https://aphyr.com/posts/315-jepsen-rabbitmq as a starting point): track every :send and :recv in the history, then a custom checker verifies the contracts against the recorded events. Files: - jepsen/project.clj — adds com.cognitect.aws/sqs at the same version as the existing dynamodb dep, so the SDK wire protocol (auth, retry classification, error parsing) is exercised end-to-end against elastickv rather than a hand-rolled HTTP layer. - jepsen/src/elastickv/db.clj — extends start-node! to accept :sqs-port (port spec like dynamo-port) and :sqs-region. Both are optional so existing dynamodb / s3 / redis test specs are byte-identical at the args level when sqs-port is absent. - jepsen/src/elastickv/jepsen_test.clj — registers the new workload under elastickv-sqs-htfifo-test alongside the other workloads. - jepsen/src/elastickv/sqs_htfifo_workload.clj — the workload itself (~430 lines). Uses cognitect/aws-api SQS, creates an HT-FIFO queue with PartitionCount=4 + ContentBasedDeduplication, runs sends and receives across N MessageGroupId values, and the custom ht-fifo-checker validates the three contracts. - jepsen/test/elastickv/sqs_htfifo_workload_test.clj — pure-function tests for the checker (synthetic histories pin clean / loss / duplicate / ordering-violation / cross-group / info-send-not-loss / failed-send-not-counted / empty-receive cases) and integration smoke tests for the test-spec builder. Open-endpoint setup: the elastickv server starts without --sqsCredentialsFile so the SQS adapter accepts any signed request (matching how the S3 adapter is wired in jepsen today). The SDK client signs with dummy credentials so the SigV4 path still exercises end-to-end at the protocol level. Tests run: lein test elastickv.sqs-htfifo-workload-test passes 11 tests / 27 assertions. Full non-redis suite (sqs / dynamodb / dynamodb-types / s3 / cli) passes 21 tests / 41 assertions. The elastickv.redis-workload namespace fails to load due to the empty redis/src/ tree, which is pre-existing on main and unrelated to this PR. Out of scope for this PR (next milestones): - Wiring the workload into scripts/run-jepsen-local.sh — the existing script is dynamodb-only; an sqs counterpart can land as a follow-up. - Multi-shard cluster topology that lands distinct partitions on distinct Raft groups. This PR's PartitionCount=4 routes to the default group on a single-shard cluster — partitioning logic (different keys per partition, ordering preserved within group) is fully exercised, but the cross-shard scaling story is gated on separate work. - Design-doc lifecycle rename (proposed → partial) — that is §11 PR 8 in the design doc and is tracked separately. Refs: docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md §11 PR 7. --- jepsen/project.clj | 1 + jepsen/src/elastickv/db.clj | 6 +- jepsen/src/elastickv/jepsen_test.clj | 5 + jepsen/src/elastickv/sqs_htfifo_workload.clj | 523 ++++++++++++++++++ .../elastickv/sqs_htfifo_workload_test.clj | 136 +++++ 5 files changed, 670 insertions(+), 1 deletion(-) create mode 100644 jepsen/src/elastickv/sqs_htfifo_workload.clj create mode 100644 jepsen/test/elastickv/sqs_htfifo_workload_test.clj diff --git a/jepsen/project.clj b/jepsen/project.clj index 5a4fab3f4..4d5ebe1cc 100644 --- a/jepsen/project.clj +++ b/jepsen/project.clj @@ -13,5 +13,6 @@ [com.cognitect.aws/api "0.8.692"] [com.cognitect.aws/endpoints "1.1.12.626"] [com.cognitect.aws/dynamodb "847.2.1365.0"] + [com.cognitect.aws/sqs "847.2.1365.0"] [org.slf4j/slf4j-nop "2.0.9"]] :main elastickv.jepsen-test) diff --git a/jepsen/src/elastickv/db.clj b/jepsen/src/elastickv/db.clj index 84bb4ad87..33130023d 100644 --- a/jepsen/src/elastickv/db.clj +++ b/jepsen/src/elastickv/db.clj @@ -97,7 +97,7 @@ (clojure.string/join ",")))) (defn- start-node! - [test node {:keys [bootstrap-node grpc-port redis-port dynamo-port s3-port data-dir raft-groups shard-ranges raft-engine]}] + [test node {:keys [bootstrap-node grpc-port redis-port dynamo-port s3-port sqs-port sqs-region data-dir raft-groups shard-ranges raft-engine]}] (when (and (seq raft-groups) (> (count raft-groups) 1) (nil? shard-ranges)) @@ -110,6 +110,8 @@ (node-addr node (port-for dynamo-port node))) s3 (when s3-port (node-addr node (port-for s3-port node))) + sqs (when sqs-port + (node-addr node (port-for sqs-port node))) raft-redis-map (build-raft-redis-map (:nodes test) grpc-port redis-port raft-groups) bootstrap? (= node bootstrap-node) args (cond-> [server-bin @@ -121,6 +123,8 @@ "--raftRedisMap" raft-redis-map] dynamo (conj "--dynamoAddress" dynamo) s3 (conj "--s3Address" s3) + sqs (conj "--sqsAddress" sqs) + (and sqs sqs-region) (conj "--sqsRegion" sqs-region) (seq raft-groups) (conj "--raftGroups" (build-raft-groups-arg node raft-groups)) (seq shard-ranges) (conj "--shardRanges" shard-ranges) bootstrap? (conj "--raftBootstrap"))] diff --git a/jepsen/src/elastickv/jepsen_test.clj b/jepsen/src/elastickv/jepsen_test.clj index 01b9a9300..b368b716c 100644 --- a/jepsen/src/elastickv/jepsen_test.clj +++ b/jepsen/src/elastickv/jepsen_test.clj @@ -4,6 +4,7 @@ [elastickv.dynamodb-workload :as dynamodb-workload] [elastickv.dynamodb-types-workload :as dynamodb-types-workload] [elastickv.s3-workload :as s3-workload] + [elastickv.sqs-htfifo-workload :as sqs-htfifo-workload] [jepsen.cli :as cli])) (defn elastickv-test [] @@ -19,6 +20,10 @@ (defn elastickv-s3-test [] (s3-workload/elastickv-s3-test {})) +(defn elastickv-sqs-htfifo-test + ([] (elastickv-sqs-htfifo-test {})) + ([opts] (sqs-htfifo-workload/elastickv-sqs-htfifo-test opts))) + (defn -main [& args] (cli/run! (cli/single-test-cmd {:test-fn elastickv-test}) args)) diff --git a/jepsen/src/elastickv/sqs_htfifo_workload.clj b/jepsen/src/elastickv/sqs_htfifo_workload.clj new file mode 100644 index 000000000..04a54dee5 --- /dev/null +++ b/jepsen/src/elastickv/sqs_htfifo_workload.clj @@ -0,0 +1,523 @@ +(ns elastickv.sqs-htfifo-workload + "Jepsen workload for elastickv's HT-FIFO (high-throughput FIFO) SQS-compatible + queues — partitioned FIFO queues created with PartitionCount > 1. + + Pattern follows aphyr's classic Jepsen RabbitMQ analysis: track every + :send and :recv in the history, then a custom checker verifies three + contracts that AWS HT-FIFO is supposed to honour even under partition + and node-loss faults: + + 1. Within-group ordering — for any MessageGroupId, the sequence of + received seq values (sorted by global completion time across all + consumers) is monotonically non-decreasing. + 2. No loss — every (group, seq) successfully :sent eventually appears + in the :recv history. Sends with :info status are treated as + possibly-committed and not counted as lost. + 3. No duplicates — every (group, seq) appears at most once in the + :recv history. ContentBasedDeduplication on the queue + a unique + (group, seq) body is what enforces this server-side, so a duplicate + here is a real bug (e.g. a deletion that did not commit). + + Each MessageGroupId is hashed by partitionFor (FNV-1a) onto one of N + partitions; with several distinct groups the workload exercises + cross-partition delivery, while ContentBasedDeduplication + per-group + monotonic seqs keeps the assertions checkable from the client side." + (:gen-class) + (:require [clojure.set :as cset] + [clojure.string :as str] + [clojure.tools.logging :refer [info]] + [cognitect.aws.client.api :as aws] + [cognitect.aws.credentials :as creds] + [elastickv.cli :as cli] + [elastickv.db :as ekdb] + [jepsen [checker :as checker] + [client :as client] + [generator :as gen] + [net :as net]] + [jepsen.control :as control] + [jepsen.db :as jdb] + [jepsen.nemesis :as nemesis] + [jepsen.nemesis.combined :as combined] + [jepsen.os :as os] + [jepsen.os.debian :as debian])) + +;; --------------------------------------------------------------------------- +;; Constants +;; --------------------------------------------------------------------------- + +(def ^:private default-sqs-port 9324) +(def ^:private queue-name "jepsen-htfifo.fifo") +(def ^:private default-partition-count 4) +(def ^:private default-group-count 8) +(def ^:private receive-batch-size 10) +;; WaitTimeSeconds=1: elastickv's receive path is short-poll-only today, so +;; this is a no-op at the wire but keeps clients SDK-portable if long-poll +;; lands later. Visibility 30s is long enough for delete to land between +;; receive and the next pass even under partition. +(def ^:private receive-wait-seconds 1) +(def ^:private visibility-timeout-seconds 30) + +;; --------------------------------------------------------------------------- +;; SQS client construction +;; --------------------------------------------------------------------------- + +(defn- make-sqs-client + "Returns a cognitect/aws-api SQS client pointed at http://host:port. + Dummy credentials match the elastickv server's open-endpoint mode (no + --sqsCredentialsFile passed → adapter accepts any signed request)." + [host port region] + (aws/client + {:api :sqs + :region (or region "us-east-1") + :credentials-provider (creds/basic-credentials-provider + {:access-key-id "dummy" + :secret-access-key "dummy"}) + :endpoint-override {:protocol :http + :hostname host + :port port}})) + +(defn- anomaly? [resp] + (contains? resp :cognitect.anomalies/category)) + +(defn- sqs-invoke! + "Invoke op against sqs-client, returning the parsed response. + Throws ex-info on any error (SQS API error or transport failure). + ex-data: :type (SQS __type), :category (anomaly), :resp (raw)." + [sqs op request] + (let [resp (aws/invoke sqs {:op op :request request})] + (if (anomaly? resp) + (let [err-type (:__type resp) + category (:cognitect.anomalies/category resp) + msg (or (:message resp) + (:Message resp) + (:cognitect.anomalies/message resp) + "")] + (throw (ex-info (str "SQS " (or err-type category) ": " msg) + {:type err-type + :category category + :resp resp}))) + resp))) + +;; --------------------------------------------------------------------------- +;; Queue setup +;; --------------------------------------------------------------------------- + +(defn- create-htfifo-queue! + "Idempotently create the HT-FIFO test queue. Returns the QueueUrl. + Tolerates QueueAlreadyExists (the test queue may survive across restarts + of the same workload run)." + [sqs partition-count] + (let [attrs {"FifoQueue" "true" + "ContentBasedDeduplication" "true" + "PartitionCount" (str partition-count) + "FifoThroughputLimit" "perMessageGroupId" + "DeduplicationScope" "messageGroup"} + resp (try + (sqs-invoke! sqs :CreateQueue + {:QueueName queue-name + :Attributes attrs}) + (catch clojure.lang.ExceptionInfo e + (let [err-type (:type (ex-data e))] + (if (or (= "QueueAlreadyExists" err-type) + (= "QueueNameExists" err-type)) + (sqs-invoke! sqs :GetQueueUrl {:QueueName queue-name}) + (throw e)))))] + (or (:QueueUrl resp) + (throw (ex-info "CreateQueue did not return QueueUrl" {:resp resp}))))) + +;; --------------------------------------------------------------------------- +;; Per-group sequence counters (shared across all client workers) +;; +;; A single test-wide atom maps group-id → next sequence number. The atom is +;; constructed fresh per test run via fresh-seq-counters and shared with all +;; ClientRecord instances via the workload map's :seq-counters field. +;; --------------------------------------------------------------------------- + +(defn- fresh-seq-counters + "Build the shared seq-counter atom for the workload. Each group-id maps + to a long (next seq to assign)." + [groups] + (atom (zipmap groups (repeat 0)))) + +(defn- next-seq! + "Atomically increment the counter for `group` and return the previous + value. Stable monotonic seqs across all workers." + [counters group] + (let [next-state (swap! counters update group inc)] + ;; Returned seq = post-state - 1 = the value that was assigned. + (dec (get next-state group)))) + +(defn- encode-body + "Encode (group, seq) into the message body. Uses a simple `g:s` form + (no JSON to avoid an extra dep). The encoding is the only thing the + server sees; the checker decodes it on receive to reconstruct the + logical (group, seq) tuple." + [group seq-num] + (str group ":" seq-num)) + +(defn- decode-body + "Decode a body produced by encode-body. Returns nil if the payload + doesn't match the expected shape so a corrupted body surfaces as a + single failed assertion instead of crashing the checker." + [body] + (when (string? body) + (when-let [[group seq-str] (str/split body #":" 2)] + (when (and (not (str/blank? group)) + (not (str/blank? seq-str))) + (try + {:group group + :seq (Long/parseLong seq-str)} + (catch NumberFormatException _ nil)))))) + +;; --------------------------------------------------------------------------- +;; Jepsen client +;; --------------------------------------------------------------------------- + +(defrecord HTFIFOClient [node->port region groups seq-counters sqs queue-url partition-count] + client/Client + + (open! [this test node] + (let [port (get node->port node default-sqs-port) + host (or (:sqs-host test) (name node))] + (assoc this :sqs (make-sqs-client host port region)))) + + (setup! [this _test] + (let [url (create-htfifo-queue! sqs partition-count)] + (info "HT-FIFO test queue ready" url "partitions=" partition-count) + (assoc this :queue-url url))) + + (teardown! [_this _test] + ;; Leave the queue around — the test cluster is torn down by db/teardown!. + ;; A best-effort DeleteQueue here would race the partition-isolated nodes. + nil) + + (close! [this _test] + (when sqs (aws/stop sqs)) + (assoc this :sqs nil :queue-url nil)) + + (invoke! [_this _test op] + (try + (case (:f op) + :send + (let [group (rand-nth groups) + seq-num (next-seq! seq-counters group) + body (encode-body group seq-num)] + (sqs-invoke! sqs :SendMessage + {:QueueUrl queue-url + :MessageBody body + :MessageGroupId group}) + (assoc op :type :ok :value [group seq-num])) + + :recv + (let [resp (sqs-invoke! sqs :ReceiveMessage + {:QueueUrl queue-url + :MaxNumberOfMessages receive-batch-size + :VisibilityTimeout visibility-timeout-seconds + :WaitTimeSeconds receive-wait-seconds}) + msgs (or (:Messages resp) []) + parsed (keep (fn [m] + (when-let [decoded (decode-body (:Body m))] + (assoc decoded + :receipt-handle (:ReceiptHandle m) + :message-id (:MessageId m)))) + msgs)] + (doseq [{:keys [receipt-handle]} parsed] + (try + (sqs-invoke! sqs :DeleteMessage + {:QueueUrl queue-url + :ReceiptHandle receipt-handle}) + (catch clojure.lang.ExceptionInfo _ + ;; A failed delete leaves the message visible after the + ;; visibility window — the next receive will see it again. + ;; The checker will count it as a duplicate, which is the + ;; correct signal: an at-least-once delivery on a FIFO + ;; queue indicates a delete-side bug. + nil))) + (assoc op :type :ok + :value (mapv (fn [{:keys [group seq]}] [group seq]) parsed)))) + + (catch clojure.lang.ExceptionInfo e + (let [data (ex-data e) + err-type (:type data) + category (:category data)] + (cond + ;; Transport faults (network partition, kill, peer down). + ;; :info: the operation may or may not have committed. + (and (nil? err-type) + (#{:cognitect.anomalies/fault + :cognitect.anomalies/unavailable + :cognitect.anomalies/interrupted} category)) + (assoc op :type :info :error :network-error) + + ;; Server-side InternalFailure / 5xx — possibly committed. + (#{"InternalFailure" "InternalServerError" "ServiceUnavailable"} err-type) + (assoc op :type :info :error (str err-type)) + + ;; Definite client-side rejection — operation did not commit. + (#{"InvalidParameterValue" "QueueDoesNotExist" + "ReceiptHandleIsInvalid" "InvalidIdFormat"} err-type) + (assoc op :type :fail :error (str err-type)) + + :else + (assoc op :type :info :error (or err-type + category + (.getMessage e)))))) + + (catch Exception e + (assoc op :type :info :error (.getMessage e)))))) + +;; --------------------------------------------------------------------------- +;; Checker — within-group ordering + no loss + no duplicates +;; --------------------------------------------------------------------------- + +(defn- collect-sends + "Return the set of (group, seq) tuples successfully :sent. :info sends + are returned separately as the in-flight set (their commit status is + unknown)." + [history] + (let [sends (filter #(= :send (:f %)) history)] + {:committed (->> sends + (filter #(= :ok (:type %))) + (map :value) + set) + :in-flight (->> sends + (filter #(= :info (:type %))) + (map :value) + set)})) + +(defn- collect-receives + "Return a list of {:group g :seq s :time t} maps in completion-time + order, one per (group, seq) tuple actually surfaced by a successful + :recv op. Each tuple carries the op's :time so per-group ordering can + be checked against a globally-consistent timeline." + [history] + (->> history + (filter #(and (= :recv (:f %)) (= :ok (:type %)))) + (mapcat (fn [op] + (map (fn [[g s]] + {:group g :seq s :time (:time op)}) + (:value op)))) + (sort-by :time))) + +(defn- ordering-violations + "For each group, return the list of out-of-order pairs in the + completion-time-ordered receive sequence. Returns a map of + group → [{:prev p :curr c} ...] (empty if no violation)." + [received-events] + (let [per-group (group-by :group received-events)] + (->> per-group + (keep (fn [[group events]] + (let [seqs (mapv :seq events) + pairs (map vector seqs (rest seqs)) + breaks (filter (fn [[p c]] (>= p c)) pairs)] + (when (seq breaks) + [group (mapv (fn [[p c]] {:prev p :curr c}) breaks)])))) + (into {})))) + +(defn- duplicate-receives + "Return the set of (group, seq) tuples that appeared more than once in + the receive history (FIFO contract violation)." + [received-events] + (->> received-events + (group-by (juxt :group :seq)) + (keep (fn [[k events]] (when (> (count events) 1) k))) + set)) + +(defn ht-fifo-checker + "Custom Jepsen checker for the HT-FIFO contract. Returns + {:valid? bool :sent N :received N :lost #{...} :duplicates #{...} + :ordering-violations {...}}." + [] + (reify checker/Checker + (check [_ _test history _opts] + (let [{:keys [committed in-flight]} (collect-sends history) + received-events (collect-receives history) + received-tuples (set (map (fn [{:keys [group seq]}] [group seq]) + received-events)) + ;; "lost" = committed sends that never arrived AND were not + ;; in-flight at the end. We exclude in-flight since their + ;; commit status is undefined. + lost (cset/difference committed received-tuples in-flight) + dups (duplicate-receives received-events) + ord (ordering-violations received-events)] + {:valid? (and (empty? lost) + (empty? dups) + (empty? ord)) + :committed-sends (count committed) + :in-flight-sends (count in-flight) + :received (count received-tuples) + :lost lost + :duplicates dups + :ordering-violations ord})))) + +;; --------------------------------------------------------------------------- +;; Generator +;; --------------------------------------------------------------------------- + +(defn- send-op [_t _p] {:f :send}) +(defn- recv-op [_t _p] {:f :recv}) + +(defn- mixed-generator + "Mix sends and receives. send-fraction in [0, 1] picks a :send with + that probability. Default 0.5. Receives are essential to drain the + queue; too low a fraction starves consumers and inflates the + in-flight count." + [send-fraction] + (gen/mix + (concat (repeat (max 1 (Math/round (* 10.0 (double send-fraction)))) send-op) + (repeat (max 1 (- 10 (Math/round (* 10.0 (double send-fraction))))) recv-op)))) + +(defn- drain-generator + "Drain phase: only :recv ops, run after the main generator finishes so + the checker sees the in-flight messages get delivered." + [] + (gen/repeat {:f :recv})) + +;; --------------------------------------------------------------------------- +;; Workload & Test builders +;; --------------------------------------------------------------------------- + +(def default-nodes ["n1" "n2" "n3" "n4" "n5"]) + +(defn- group-ids + "Return [g0 g1 ... g(n-1)] used as MessageGroupId values." + [n] + (mapv #(str "g" %) (range n))) + +(defn sqs-htfifo-workload + "Builds the HT-FIFO workload map with custom client, generator, and + checker. Shared seq-counters atom is constructed here so every client + worker increments the same per-group counter." + [opts] + (let [partition-count (or (:partition-count opts) default-partition-count) + group-count (or (:group-count opts) default-group-count) + send-fraction (or (:send-fraction opts) 0.5) + groups (group-ids group-count) + seq-counters (fresh-seq-counters groups) + client (->HTFIFOClient (or (:node->port opts) + (zipmap default-nodes (repeat default-sqs-port))) + (:sqs-region opts) + groups + seq-counters + nil ; sqs (per-worker) + nil ; queue-url + partition-count)] + {:client client + :generator (mixed-generator send-fraction) + :checker (ht-fifo-checker)})) + +(defn elastickv-sqs-htfifo-test + "Builds a Jepsen test map that drives elastickv's HT-FIFO SQS endpoint." + ([] (elastickv-sqs-htfifo-test {})) + ([opts] + (let [nodes (or (:nodes opts) default-nodes) + sqs-ports (or (:sqs-ports opts) + (repeat (count nodes) (or (:sqs-port opts) default-sqs-port))) + node->port (or (:node->port opts) (cli/ports->node-map sqs-ports nodes)) + sqs-region (or (:sqs-region opts) "us-east-1") + local? (:local opts) + db (if local? + jdb/noop + (ekdb/db {:grpc-port (or (:grpc-port opts) 50051) + :redis-port (or (:redis-port opts) 6379) + :sqs-port node->port + :sqs-region sqs-region + :raft-groups (:raft-groups opts) + :shard-ranges (:shard-ranges opts)})) + rate (double (or (:rate opts) 5)) + time-limit (or (:time-limit opts) 30) + drain-time (or (:drain-time opts) (max 5 (quot time-limit 6))) + faults (if local? + [] + (cli/normalize-faults (or (:faults opts) [:partition :kill]))) + nemesis-p (when-not local? + (combined/nemesis-package {:db db + :faults faults + :interval (or (:fault-interval opts) 40)})) + nemesis-gen (if nemesis-p + (:generator nemesis-p) + (gen/once {:type :info :f :noop})) + workload (sqs-htfifo-workload (assoc opts :node->port node->port)) + main-gen (->> (:generator workload) + (gen/nemesis nemesis-gen) + (gen/stagger (/ rate)) + (gen/time-limit time-limit)) + drain-gen (->> (drain-generator) + (gen/stagger (/ rate)) + (gen/time-limit drain-time))] + (merge workload + {:name (or (:name opts) "elastickv-sqs-htfifo") + :nodes nodes + :db db + :sqs-host (:sqs-host opts) + :os (if local? os/noop debian/os) + :net (if local? net/noop net/iptables) + :ssh (merge {:username "vagrant" + :private-key-path "/home/vagrant/.ssh/id_rsa" + :strict-host-key-checking false} + (when local? {:dummy true}) + (:ssh opts)) + :remote control/ssh + :nemesis (if nemesis-p (:nemesis nemesis-p) nemesis/noop) + :final-generator nil + :concurrency (or (:concurrency opts) 8) + :generator (gen/phases main-gen drain-gen)})))) + +;; --------------------------------------------------------------------------- +;; CLI +;; --------------------------------------------------------------------------- + +(def sqs-cli-opts + "SQS-specific CLI options, appended to common opts." + [[nil "--sqs-ports PORTS" "Comma-separated SQS ports (one per node)." + :default nil + :parse-fn (fn [s] + (->> (str/split s #",") + (remove str/blank?) + (mapv #(Integer/parseInt %))))] + [nil "--sqs-port PORT" "SQS port (applied to all nodes)." + :default default-sqs-port + :parse-fn #(Integer/parseInt %)] + [nil "--sqs-region REGION" "AWS region the SDK signs against." + :default "us-east-1"] + [nil "--redis-port PORT" "Redis port." + :default 6379 + :parse-fn #(Integer/parseInt %)] + [nil "--partition-count N" "PartitionCount for the test queue (1, 2, 4, 8, 16, 32)." + :default default-partition-count + :parse-fn #(Integer/parseInt %)] + [nil "--group-count N" "Number of distinct MessageGroupId values to spread sends across." + :default default-group-count + :parse-fn #(Integer/parseInt %)] + [nil "--send-fraction F" "Probability a generator op is :send (rest are :recv)." + :default 0.5 + :parse-fn #(Double/parseDouble %)] + [nil "--drain-time SECONDS" "Receive-only drain phase after the main generator finishes." + :default nil + :parse-fn #(Integer/parseInt %)]]) + +(defn- prepare-sqs-opts + "Translate parsed CLI options into the map elastickv-sqs-htfifo-test wants." + [options] + (let [sqs-ports (:sqs-ports options) + options (cli/parse-common-opts options sqs-ports) + node->port (if sqs-ports + (cli/ports->node-map sqs-ports (:nodes options)) + (zipmap (:nodes options) (repeat (:sqs-port options))))] + (assoc options + :sqs-host (:host options) + :node->port node->port + :sqs-port (:sqs-port options) + :sqs-region (:sqs-region options) + :redis-port (:redis-port options) + :partition-count (:partition-count options) + :group-count (:group-count options) + :send-fraction (:send-fraction options) + :drain-time (:drain-time options)))) + +(defn -main + [& args] + (cli/run-workload! args + (into cli/common-cli-opts sqs-cli-opts) + prepare-sqs-opts + elastickv-sqs-htfifo-test)) diff --git a/jepsen/test/elastickv/sqs_htfifo_workload_test.clj b/jepsen/test/elastickv/sqs_htfifo_workload_test.clj new file mode 100644 index 000000000..54f7ce2fa --- /dev/null +++ b/jepsen/test/elastickv/sqs_htfifo_workload_test.clj @@ -0,0 +1,136 @@ +(ns elastickv.sqs-htfifo-workload-test + (:require [clojure.test :refer :all] + [jepsen.checker :as checker] + [jepsen.client :as client] + [elastickv.sqs-htfifo-workload :as workload])) + +(deftest builds-test-spec + (let [test-map (workload/elastickv-sqs-htfifo-test {})] + (is (map? test-map)) + (is (= "elastickv-sqs-htfifo" (:name test-map))) + (is (= ["n1" "n2" "n3" "n4" "n5"] (:nodes test-map))))) + +(deftest custom-options-override-defaults + (let [test-map (workload/elastickv-sqs-htfifo-test + {:time-limit 60 + :concurrency 12 + :sqs-port 12345 + :partition-count 8 + :group-count 12})] + (is (= 12 (:concurrency test-map))))) + +(deftest host-override-creates-client + ;; Verify open! produces an HTFIFOClient with a live cognitect/aws-api + ;; SQS client when a host/port override is supplied. + (let [test-map (workload/elastickv-sqs-htfifo-test + {:sqs-host "127.0.0.1" + :node->port {"n1" 9324 "n2" 9325}}) + c (:client test-map) + opened (client/open! c test-map "n1")] + (is (some? (:sqs opened))))) + +;; --------------------------------------------------------------------------- +;; Checker tests — pure-function pinning of the three contracts +;; --------------------------------------------------------------------------- + +(defn- check-history + "Run the workload's checker against a synthetic history and return the + result map. Mirrors how Jepsen drives checker/check at the end of a run." + [history] + (checker/check (workload/ht-fifo-checker) {} history {})) + +(defn- send-op [t group seq-num & {:keys [type] :or {type :ok}}] + {:type type :f :send :time t :value [group seq-num]}) + +(defn- recv-op [t tuples & {:keys [type] :or {type :ok}}] + {:type type :f :recv :time t :value tuples}) + +(deftest checker-clean-history-is-valid + (let [hist [(send-op 100 "g0" 0) + (send-op 200 "g0" 1) + (send-op 300 "g1" 0) + (recv-op 400 [["g0" 0] ["g0" 1]]) + (recv-op 500 [["g1" 0]])] + r (check-history hist)] + (is (:valid? r)) + (is (= 3 (:committed-sends r))) + (is (= 3 (:received r))) + (is (empty? (:lost r))) + (is (empty? (:duplicates r))) + (is (empty? (:ordering-violations r))))) + +(deftest checker-detects-loss + ;; g0:1 sent OK but never received — must show up as :lost. + (let [hist [(send-op 100 "g0" 0) + (send-op 200 "g0" 1) + (recv-op 400 [["g0" 0]])] + r (check-history hist)] + (is (false? (:valid? r))) + (is (= #{["g0" 1]} (:lost r))))) + +(deftest checker-info-send-is-not-loss + ;; A :send with :info status (network failure mid-send) is not counted + ;; as :lost even if it never arrives — its commit status is undefined. + (let [hist [(send-op 100 "g0" 0) + (send-op 200 "g0" 1 :type :info) + (recv-op 400 [["g0" 0]])] + r (check-history hist)] + (is (:valid? r)) + (is (empty? (:lost r))) + (is (= 1 (:in-flight-sends r))))) + +(deftest checker-detects-duplicates + ;; The same (group, seq) appearing twice in the receive history is a + ;; FIFO contract violation (delete-side bug). + (let [hist [(send-op 100 "g0" 0) + (recv-op 200 [["g0" 0]]) + (recv-op 300 [["g0" 0]])] + r (check-history hist)] + (is (false? (:valid? r))) + (is (= #{["g0" 0]} (:duplicates r))))) + +(deftest checker-detects-within-group-ordering-violation + ;; g0 receives seq=1 BEFORE seq=0 — within-group ordering broken. + (let [hist [(send-op 100 "g0" 0) + (send-op 200 "g0" 1) + (recv-op 300 [["g0" 1]]) + (recv-op 400 [["g0" 0]])] + r (check-history hist)] + (is (false? (:valid? r))) + (is (contains? (:ordering-violations r) "g0")))) + +(deftest checker-cross-group-receives-do-not-violate-ordering + ;; Different groups can interleave freely — only WITHIN-group ordering + ;; is constrained. The receive sequence below is fine even though g0 + ;; and g1 messages alternate. + (let [hist [(send-op 100 "g0" 0) + (send-op 110 "g1" 0) + (send-op 200 "g0" 1) + (send-op 210 "g1" 1) + (recv-op 300 [["g1" 0]]) + (recv-op 400 [["g0" 0]]) + (recv-op 500 [["g1" 1]]) + (recv-op 600 [["g0" 1]])] + r (check-history hist)] + (is (:valid? r)) + (is (empty? (:ordering-violations r))))) + +(deftest checker-failed-sends-are-not-counted + ;; A :send op with :type :fail did not commit; it should not appear in + ;; the committed-sends count and the receiver isn't expected to see it. + (let [hist [(send-op 100 "g0" 0) + (send-op 200 "g0" 1 :type :fail) + (recv-op 400 [["g0" 0]])] + r (check-history hist)] + (is (:valid? r)) + (is (= 1 (:committed-sends r))) + (is (empty? (:lost r))))) + +(deftest checker-empty-receives-do-not-pollute + ;; A :recv that returned 0 messages is a no-op for the checker. + (let [hist [(send-op 100 "g0" 0) + (recv-op 200 []) + (recv-op 300 [["g0" 0]])] + r (check-history hist)] + (is (:valid? r)) + (is (= 1 (:received r))))) From a46f8128197d413d71f738a73fc444fe90587a63 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 5 May 2026 02:35:16 +0900 Subject: [PATCH 2/6] jepsen(sqs): polish per review (PR #738, round 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses CodeRabbit / Gemini / Claude findings on PR #738. Severity- ordered: Major (CodeRabbit + Gemini both flagged): 1. Drain phase shorter than visibility timeout. Default drain-time was max(5, time-limit/6); for the default 30s test that's 5s, while visibility-timeout-seconds is 30s. A delete that fails or a worker killed mid-receive leaves messages invisible for 30s; if the drain phase is only 5s, those messages can never reappear before the checker runs and they get reported as :lost. Fix: max with visibility-timeout-seconds. High (Gemini): 2. DeleteMessage failure was silently swallowed and the recv op marked :ok, so the undeleted message reappearing in a later :recv was scored as a duplicate. Real SQS contract: a message becomes "received" (and the no-duplicate-delivery contract activates) only AFTER a successful DeleteMessage acks it. Fix: only successfully-deleted tuples enter :value (so the checker sees them), failed deletes are logged via clojure.tools.logging/warn and dropped from :value (the message will reappear naturally — that's correct SQS redelivery, not a duplicate the checker should flag). Major (CodeRabbit) — defensive, no immediate bug: 3. :send exceptions previously dropped the [group seq-num] tuple from the op's :value. Today the checker's "lost" formula is committed-only (committed and in-flight are disjoint by construction), so this didn't false-positive — but the standard Jepsen pattern is to keep :value on every op so the history is reasonable for future analysis. Fix: localize the :send try/catch so the op carries :value [group seq-num] through both :ok and :info paths; on exception, re-raise with the enriched op stashed in ex-data so the outer error-classification cond preserves the tuple while still routing through the existing :info / :fail branches. P2 polish (Claude): 4. Dead in-flight subtraction in the loss formula. committed and in-flight are disjoint by construction (next-seq! assigns each tuple exactly once and an op is either :ok or :info), so the subtraction was a no-op. Removed; comment now explains the disjoint invariant rather than implying the subtraction does work. 5. Docstring said "monotonically non-decreasing" while the checker uses `(>= p c)`, which enforces strictly increasing. Updated the docstring to match the implementation and noted that equal seqs would already be flagged as duplicates by contract 3. 6. collect-receives sort key was just :time. Two workers polling at the same wall-clock nanosecond, OR two messages within one batch sharing the op's :time, could produce a non-deterministic order that spuriously trips the per-group ordering check. Added :process and a per-batch :index to the sort key (juxt :time :process :index) so within-batch order is preserved (matches the server's FIFO response order) and cross-process ties are stable. Test additions: - TestSQSHTFIFO_checker-info-recv-is-ignored — pin that an :info recv (delete failed mid-batch) is dropped by collect-receives so the next :ok recv records the redelivery without a duplicate flag. - TestSQSHTFIFO_checker-same-time-batch-preserves-within-group-order — pin the new :index sort key by feeding a batch of three same-time same-group messages and asserting ordering-violations stays empty. - send-op / recv-op test helpers now include :process so synthetic histories satisfy the new sort key. Caller audit per the standing semantic-change rule: - :recv :value semantics now mean "tuples whose delete acked"; consumer is collect-receives only, single caller. Aligns with the SQS at-least-once-delivery / at-most-once-after-ack model. - collect-sends in-flight set semantics unchanged; the checker reports its size for diagnostics but does not subtract it from loss anymore. - :send exception path now uses ex-data :enriched-op as a stash; the outer catch is the only consumer. Tests: lein test elastickv.sqs-htfifo-workload-test passes 13/32. Full non-redis suite (sqs / dynamodb / dynamodb-types / s3 / cli) passes 23/46. Refs: PR #738 review thread; Claude review at run 25331884279. --- jepsen/src/elastickv/sqs_htfifo_workload.clj | 144 +++++++++++++----- .../elastickv/sqs_htfifo_workload_test.clj | 33 +++- 2 files changed, 131 insertions(+), 46 deletions(-) diff --git a/jepsen/src/elastickv/sqs_htfifo_workload.clj b/jepsen/src/elastickv/sqs_htfifo_workload.clj index 04a54dee5..a85a676b4 100644 --- a/jepsen/src/elastickv/sqs_htfifo_workload.clj +++ b/jepsen/src/elastickv/sqs_htfifo_workload.clj @@ -9,7 +9,10 @@ 1. Within-group ordering — for any MessageGroupId, the sequence of received seq values (sorted by global completion time across all - consumers) is monotonically non-decreasing. + consumers) is strictly increasing. Strict (rather than merely + non-decreasing) is what the checker enforces, since seqs are + assigned monotonically by next-seq! and equal seqs would already + be flagged as duplicates by contract 3. 2. No loss — every (group, seq) successfully :sent eventually appears in the :recv history. Sends with :info status are treated as possibly-committed and not counted as lost. @@ -25,7 +28,7 @@ (:gen-class) (:require [clojure.set :as cset] [clojure.string :as str] - [clojure.tools.logging :refer [info]] + [clojure.tools.logging :refer [info warn]] [cognitect.aws.client.api :as aws] [cognitect.aws.credentials :as creds] [elastickv.cli :as cli] @@ -199,14 +202,33 @@ (try (case (:f op) :send + ;; Compute (group, seq, body) BEFORE the SQS call so the + ;; op carries :value [group seq-num] regardless of whether + ;; the call succeeded — :info sends with their tuple intact + ;; let a future checker reason about in-flight messages + ;; (today's "lost" formula is committed-only, but matching + ;; Jepsen's standard convention keeps the history + ;; interpretable). Re-throw on send failure so the outer + ;; catch performs the existing error classification, but + ;; with the enriched op as context (passed via ex-data so + ;; the outer catch can pull it out). (let [group (rand-nth groups) seq-num (next-seq! seq-counters group) - body (encode-body group seq-num)] - (sqs-invoke! sqs :SendMessage - {:QueueUrl queue-url - :MessageBody body - :MessageGroupId group}) - (assoc op :type :ok :value [group seq-num])) + body (encode-body group seq-num) + op' (assoc op :value [group seq-num])] + (try + (sqs-invoke! sqs :SendMessage + {:QueueUrl queue-url + :MessageBody body + :MessageGroupId group}) + (assoc op' :type :ok) + (catch clojure.lang.ExceptionInfo e + ;; Re-raise with the enriched op stashed so the outer + ;; catch returns it instead of the bare op. The outer + ;; catch checks for :enriched-op in ex-data first. + (throw (ex-info (.getMessage e) + (merge (ex-data e) {:enriched-op op'}) + e))))) :recv (let [resp (sqs-invoke! sqs :ReceiveMessage @@ -221,25 +243,37 @@ :receipt-handle (:ReceiptHandle m) :message-id (:MessageId m)))) msgs)] - (doseq [{:keys [receipt-handle]} parsed] - (try - (sqs-invoke! sqs :DeleteMessage - {:QueueUrl queue-url - :ReceiptHandle receipt-handle}) - (catch clojure.lang.ExceptionInfo _ - ;; A failed delete leaves the message visible after the - ;; visibility window — the next receive will see it again. - ;; The checker will count it as a duplicate, which is the - ;; correct signal: an at-least-once delivery on a FIFO - ;; queue indicates a delete-side bug. - nil))) - (assoc op :type :ok - :value (mapv (fn [{:keys [group seq]}] [group seq]) parsed)))) + ;; SQS contract: a message is "received" (and the duplicate- + ;; detection contract activates) only after a successful + ;; DeleteMessage acks it. A failed delete (transport fault, + ;; partition mid-ack) leaves the message visible — the next + ;; receive WILL see it again, and that re-delivery is correct + ;; SQS behaviour, not a duplicate the checker should flag. + ;; So: include only successfully-deleted tuples in :value. + ;; Tuples whose delete failed are dropped here and naturally + ;; reappear in a subsequent :recv. We log the failure so a + ;; spike in the warn rate (vs duplicate signal in the report) + ;; is the right triage cue. + (let [acked (volatile! [])] + (doseq [{:keys [group seq receipt-handle]} parsed] + (try + (sqs-invoke! sqs :DeleteMessage + {:QueueUrl queue-url + :ReceiptHandle receipt-handle}) + (vswap! acked conj [group seq]) + (catch clojure.lang.ExceptionInfo e + (warn e "DeleteMessage failed; tuple will be redelivered" + {:group group :seq seq})))) + (assoc op :type :ok :value @acked)))) (catch clojure.lang.ExceptionInfo e (let [data (ex-data e) err-type (:type data) - category (:category data)] + category (:category data) + ;; If the :send branch attached an enriched op (with + ;; [group seq-num] :value), use it so the resulting :info + ;; / :fail op still carries the tuple the checker can see. + base (or (:enriched-op data) op)] (cond ;; Transport faults (network partition, kill, peer down). ;; :info: the operation may or may not have committed. @@ -247,21 +281,21 @@ (#{:cognitect.anomalies/fault :cognitect.anomalies/unavailable :cognitect.anomalies/interrupted} category)) - (assoc op :type :info :error :network-error) + (assoc base :type :info :error :network-error) ;; Server-side InternalFailure / 5xx — possibly committed. (#{"InternalFailure" "InternalServerError" "ServiceUnavailable"} err-type) - (assoc op :type :info :error (str err-type)) + (assoc base :type :info :error (str err-type)) ;; Definite client-side rejection — operation did not commit. (#{"InvalidParameterValue" "QueueDoesNotExist" "ReceiptHandleIsInvalid" "InvalidIdFormat"} err-type) - (assoc op :type :fail :error (str err-type)) + (assoc base :type :fail :error (str err-type)) :else - (assoc op :type :info :error (or err-type - category - (.getMessage e)))))) + (assoc base :type :info :error (or err-type + category + (.getMessage e)))))) (catch Exception e (assoc op :type :info :error (.getMessage e)))))) @@ -286,18 +320,31 @@ set)})) (defn- collect-receives - "Return a list of {:group g :seq s :time t} maps in completion-time - order, one per (group, seq) tuple actually surfaced by a successful - :recv op. Each tuple carries the op's :time so per-group ordering can - be checked against a globally-consistent timeline." + "Return a list of {:group g :seq s :time t :process p :index i} maps + in completion-time order, one per (group, seq) tuple actually + surfaced by a successful :recv op. Each tuple carries the op's + :time / :process / its position within the batch so per-group + ordering can be checked against a globally-consistent timeline. + + Sort key is (juxt :time :process :index) — :time is the primary, + :process tiebreaks two workers polling at the same nanosecond + (rare but possible), and :index preserves the within-batch order + the server returned (which is the FIFO order for messages in the + same batch). Without :index, sort-by would only be stable across + the input ordering and the per-group seqs from one batch could + appear out-of-order in the sorted output." [history] (->> history (filter #(and (= :recv (:f %)) (= :ok (:type %)))) (mapcat (fn [op] - (map (fn [[g s]] - {:group g :seq s :time (:time op)}) - (:value op)))) - (sort-by :time))) + (map-indexed + (fn [i [g s]] + {:group g :seq s + :time (:time op) + :process (:process op) + :index i}) + (:value op)))) + (sort-by (juxt :time :process :index)))) (defn- ordering-violations "For each group, return the list of out-of-order pairs in the @@ -331,13 +378,18 @@ (reify checker/Checker (check [_ _test history _opts] (let [{:keys [committed in-flight]} (collect-sends history) + ;; in-flight is reported in the result map for diagnostics + ;; (operators want to see how many sends were ambiguous); + ;; the loss formula does not subtract it because + ;; committed/in-flight are disjoint by construction. received-events (collect-receives history) received-tuples (set (map (fn [{:keys [group seq]}] [group seq]) received-events)) - ;; "lost" = committed sends that never arrived AND were not - ;; in-flight at the end. We exclude in-flight since their - ;; commit status is undefined. - lost (cset/difference committed received-tuples in-flight) + ;; "lost" = committed sends that never arrived. :info sends + ;; (in-flight) are excluded from `committed` at collection + ;; time, not subtracted here, so committed and in-flight are + ;; always disjoint. + lost (cset/difference committed received-tuples) dups (duplicate-receives received-events) ord (ordering-violations received-events)] {:valid? (and (empty? lost) @@ -426,7 +478,15 @@ :shard-ranges (:shard-ranges opts)})) rate (double (or (:rate opts) 5)) time-limit (or (:time-limit opts) 30) - drain-time (or (:drain-time opts) (max 5 (quot time-limit 6))) + ;; Drain must outlast the visibility-timeout window, otherwise + ;; a message that sat invisible past the main phase (worker + ;; killed mid-receive, delete that didn't commit) can never + ;; reappear before the checker runs and gets reported as :lost + ;; even though the server still owns it. Also keeps a floor of + ;; (max 5, time-limit/6) so short tests still drain meaningfully. + drain-time (or (:drain-time opts) + (max visibility-timeout-seconds + (max 5 (quot time-limit 6)))) faults (if local? [] (cli/normalize-faults (or (:faults opts) [:partition :kill]))) diff --git a/jepsen/test/elastickv/sqs_htfifo_workload_test.clj b/jepsen/test/elastickv/sqs_htfifo_workload_test.clj index 54f7ce2fa..5837485b2 100644 --- a/jepsen/test/elastickv/sqs_htfifo_workload_test.clj +++ b/jepsen/test/elastickv/sqs_htfifo_workload_test.clj @@ -39,11 +39,11 @@ [history] (checker/check (workload/ht-fifo-checker) {} history {})) -(defn- send-op [t group seq-num & {:keys [type] :or {type :ok}}] - {:type type :f :send :time t :value [group seq-num]}) +(defn- send-op [t group seq-num & {:keys [type process] :or {type :ok process 0}}] + {:type type :f :send :time t :process process :value [group seq-num]}) -(defn- recv-op [t tuples & {:keys [type] :or {type :ok}}] - {:type type :f :recv :time t :value tuples}) +(defn- recv-op [t tuples & {:keys [type process] :or {type :ok process 0}}] + {:type type :f :recv :time t :process process :value tuples}) (deftest checker-clean-history-is-valid (let [hist [(send-op 100 "g0" 0) @@ -134,3 +134,28 @@ r (check-history hist)] (is (:valid? r)) (is (= 1 (:received r))))) + +(deftest checker-info-recv-is-ignored + ;; A :recv with :info status (delete failed mid-batch) is dropped by + ;; collect-receives. The message will be redelivered, the next :ok + ;; recv records it, and the checker sees no duplicate. + (let [hist [(send-op 100 "g0" 0) + (recv-op 200 [["g0" 0]] :type :info) + (recv-op 300 [["g0" 0]])] + r (check-history hist)] + (is (:valid? r)) + (is (= 1 (:received r))) + (is (empty? (:duplicates r))))) + +(deftest checker-same-time-batch-preserves-within-group-order + ;; A single :recv batch may carry multiple messages from the same + ;; group with the SAME completion time. The checker must respect + ;; the within-batch order (FIFO of the server's response) rather + ;; than reordering them by sort-by stability alone. + (let [hist [(send-op 100 "g0" 0) + (send-op 110 "g0" 1) + (send-op 120 "g0" 2) + (recv-op 200 [["g0" 0] ["g0" 1] ["g0" 2]])] + r (check-history hist)] + (is (:valid? r) (str "got " (pr-str r))) + (is (empty? (:ordering-violations r))))) From 1bb55e6b9017dc7591b9f26a02564800e25e4faf Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 5 May 2026 02:43:35 +0900 Subject: [PATCH 3/6] jepsen(sqs): polish per Claude review (PR #738, round 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three findings on round 2; all addressed plus a docstring nit. Major: 1. Drain window equals (not exceeds) the visibility timeout. With defaults (time-limit=30, visibility-timeout-seconds=30), drain was max(30, max(5, 5)) = 30s. A message that becomes invisible right at drain-start reappears at exactly drain-end (or later), and gen/time-limit can fire before the next :recv surfaces it — the checker would then report :lost while the server still owns the message. Fix: drain-time = visibility-timeout-seconds + 10s. The +10s buffer also dominates the (max 5, time-limit/6) short- test floor at any reasonable time-limit, so the floor is gone. Major (latent, --no-cluster mode): 2. Test queue persists across runs. The previous queue-name was a top-level constant ("jepsen-htfifo.fifo"), so re-running the workload against the same cluster (e.g. --no-rebuild --no-cluster) left old messages in the queue. They'd appear in the receive history without corresponding committed sends in the current run, inflating :received and — because seqs reset to 0 each run — potentially trip per-group ordering checks. Worse, the 5-minute ContentBasedDeduplication window could block current- run sends that match prior-run bodies. Fix: per-run unique queue name "jepsen-htfifo-.fifo" generated at workload construction time and stashed on the HTFIFOClient record so every worker's setup! converges on the same name. New :queue-name opt provides an explicit override knob. Caller audit per the standing semantic-change rule: - queue-name was a private def used only in create-htfifo-queue! and the setup! info log. Both are inside the same namespace. After the change, both read the value from the client record. No external callers; no test changes needed (the unit tests exercise the checker on synthetic histories, not the queue setup path). - create-htfifo-queue! gained a queue-name parameter; the only caller is setup!. P3 polish: 3. when-let on (str/split body #":" 2) was misleading — str/split never returns nil, so when-let always entered the body. The actual nil-safety lived in the str/blank? checks below. Replaced with plain let so the intent is explicit (str/blank? handles the nil seq-str from a no-colon body correctly). Nit (Claude): 4. Added a docstring to elastickv-sqs-htfifo-test in jepsen_test.clj pointing at the workload's own -main as the entry point, matching the existing dynamodb / s3 pattern. Top-level -main still dispatches Redis only; this is consistent and intentional. Tests: lein test elastickv.sqs-htfifo-workload-test passes 13/32. Full non-redis suite (sqs / dynamodb / s3 / cli) passes 23/46. Refs: PR #738 round 2 review at run 25333566777. --- jepsen/src/elastickv/jepsen_test.clj | 4 ++ jepsen/src/elastickv/sqs_htfifo_workload.clj | 56 ++++++++++++++------ 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/jepsen/src/elastickv/jepsen_test.clj b/jepsen/src/elastickv/jepsen_test.clj index b368b716c..0468a8681 100644 --- a/jepsen/src/elastickv/jepsen_test.clj +++ b/jepsen/src/elastickv/jepsen_test.clj @@ -21,6 +21,10 @@ (s3-workload/elastickv-s3-test {})) (defn elastickv-sqs-htfifo-test + "HT-FIFO Jepsen test (PR 7b). Run via the workload's own -main: + `lein run -m elastickv.sqs-htfifo-workload [opts]`. Same pattern + as elastickv-dynamodb-test / elastickv-s3-test — each workload + exposes its own -main so this -main only dispatches Redis." ([] (elastickv-sqs-htfifo-test {})) ([opts] (sqs-htfifo-workload/elastickv-sqs-htfifo-test opts))) diff --git a/jepsen/src/elastickv/sqs_htfifo_workload.clj b/jepsen/src/elastickv/sqs_htfifo_workload.clj index a85a676b4..889994c9c 100644 --- a/jepsen/src/elastickv/sqs_htfifo_workload.clj +++ b/jepsen/src/elastickv/sqs_htfifo_workload.clj @@ -49,7 +49,6 @@ ;; --------------------------------------------------------------------------- (def ^:private default-sqs-port 9324) -(def ^:private queue-name "jepsen-htfifo.fifo") (def ^:private default-partition-count 4) (def ^:private default-group-count 8) (def ^:private receive-batch-size 10) @@ -60,6 +59,17 @@ (def ^:private receive-wait-seconds 1) (def ^:private visibility-timeout-seconds 30) +(defn- fresh-queue-name + "Build a per-run unique queue name. Includes a millisecond timestamp + so re-running the workload against an already-running cluster + (--no-rebuild --no-cluster) starts with a fresh queue: prior-run + messages cannot inflate :received, drift the per-group seq numbering, + or block sends via the 5-minute ContentBasedDeduplication window. + AWS SQS queue names admit [A-Za-z0-9_-] plus the .fifo suffix; the + timestamp sits inside that alphabet." + [] + (str "jepsen-htfifo-" (System/currentTimeMillis) ".fifo")) + ;; --------------------------------------------------------------------------- ;; SQS client construction ;; --------------------------------------------------------------------------- @@ -107,9 +117,10 @@ (defn- create-htfifo-queue! "Idempotently create the HT-FIFO test queue. Returns the QueueUrl. - Tolerates QueueAlreadyExists (the test queue may survive across restarts - of the same workload run)." - [sqs partition-count] + Tolerates QueueAlreadyExists so concurrent workers calling setup! + in parallel converge on the same QueueUrl (each worker calls + setup! independently in Jepsen's lifecycle)." + [sqs queue-name partition-count] (let [attrs {"FifoQueue" "true" "ContentBasedDeduplication" "true" "PartitionCount" (str partition-count) @@ -164,7 +175,12 @@ single failed assertion instead of crashing the checker." [body] (when (string? body) - (when-let [[group seq-str] (str/split body #":" 2)] + ;; str/split always returns a vector — never nil — so the + ;; nil-safety here lives in the str/blank? checks below, not + ;; in a when-let on the destructure. Plain let makes that + ;; explicit; "g0" with no colon binds seq-str=nil, which + ;; str/blank? treats as blank and the when guards skip. + (let [[group seq-str] (str/split body #":" 2)] (when (and (not (str/blank? group)) (not (str/blank? seq-str))) (try @@ -176,7 +192,7 @@ ;; Jepsen client ;; --------------------------------------------------------------------------- -(defrecord HTFIFOClient [node->port region groups seq-counters sqs queue-url partition-count] +(defrecord HTFIFOClient [node->port region groups seq-counters queue-name sqs queue-url partition-count] client/Client (open! [this test node] @@ -185,7 +201,7 @@ (assoc this :sqs (make-sqs-client host port region)))) (setup! [this _test] - (let [url (create-htfifo-queue! sqs partition-count)] + (let [url (create-htfifo-queue! sqs queue-name partition-count)] (info "HT-FIFO test queue ready" url "partitions=" partition-count) (assoc this :queue-url url))) @@ -438,19 +454,23 @@ (defn sqs-htfifo-workload "Builds the HT-FIFO workload map with custom client, generator, and - checker. Shared seq-counters atom is constructed here so every client - worker increments the same per-group counter." + checker. Shared seq-counters atom and per-run queue name are + constructed here once so every client worker sees the same values + (workers fan out via Jepsen's open!/setup! lifecycle, all reading + the same record fields)." [opts] (let [partition-count (or (:partition-count opts) default-partition-count) group-count (or (:group-count opts) default-group-count) send-fraction (or (:send-fraction opts) 0.5) groups (group-ids group-count) seq-counters (fresh-seq-counters groups) + queue-name (or (:queue-name opts) (fresh-queue-name)) client (->HTFIFOClient (or (:node->port opts) (zipmap default-nodes (repeat default-sqs-port))) (:sqs-region opts) groups seq-counters + queue-name nil ; sqs (per-worker) nil ; queue-url partition-count)] @@ -478,15 +498,17 @@ :shard-ranges (:shard-ranges opts)})) rate (double (or (:rate opts) 5)) time-limit (or (:time-limit opts) 30) - ;; Drain must outlast the visibility-timeout window, otherwise - ;; a message that sat invisible past the main phase (worker - ;; killed mid-receive, delete that didn't commit) can never - ;; reappear before the checker runs and gets reported as :lost - ;; even though the server still owns it. Also keeps a floor of - ;; (max 5, time-limit/6) so short tests still drain meaningfully. + ;; Drain must outlast the visibility-timeout window plus a + ;; safety buffer. Otherwise a message that becomes invisible + ;; right at drain start reappears at exactly drain-end (or + ;; later) and gen/time-limit can fire before the next :recv + ;; surfaces it — the checker would then report a :lost that + ;; the server still owns. visibility-timeout + 10s closes + ;; that race. The +10s also dominates the (max 5, + ;; time-limit/6) short-test floor at any reasonable + ;; time-limit, so the floor is no longer needed. drain-time (or (:drain-time opts) - (max visibility-timeout-seconds - (max 5 (quot time-limit 6)))) + (+ visibility-timeout-seconds 10)) faults (if local? [] (cli/normalize-faults (or (:faults opts) [:partition :kill]))) From 8b237a68e52a4f729c7dae1fee7694763f1a20d7 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 5 May 2026 02:51:01 +0900 Subject: [PATCH 4/6] jepsen(sqs): show drain-time default in CLI help (PR #738, round 3) Round 3 review verdict: ready to merge. One cosmetic nit: the --drain-time CLI option had :default nil in the spec, so the computed default (visibility-timeout + 10s = 40s) was invisible to users running --help. Updated the description string to spell it out. No behaviour change. Refs: PR #738 round 3 review at run 25333949895. --- jepsen/src/elastickv/sqs_htfifo_workload.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jepsen/src/elastickv/sqs_htfifo_workload.clj b/jepsen/src/elastickv/sqs_htfifo_workload.clj index 889994c9c..d16750724 100644 --- a/jepsen/src/elastickv/sqs_htfifo_workload.clj +++ b/jepsen/src/elastickv/sqs_htfifo_workload.clj @@ -574,7 +574,7 @@ [nil "--send-fraction F" "Probability a generator op is :send (rest are :recv)." :default 0.5 :parse-fn #(Double/parseDouble %)] - [nil "--drain-time SECONDS" "Receive-only drain phase after the main generator finishes." + [nil "--drain-time SECONDS" "Receive-only drain phase after the main generator finishes (default: visibility-timeout + 10s = 40s)." :default nil :parse-fn #(Integer/parseInt %)]]) From 52c6e41056bef5609a7863fff141437e858726a1 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 5 May 2026 03:05:04 +0900 Subject: [PATCH 5/6] ci(jepsen): wire HT-FIFO workload into per-push and VM CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the new SQS HT-FIFO workload (PR #738) to the GitHub Actions Jepsen runs so it gets exercised on every push and on the self-hosted VM lane. - .github/workflows/jepsen-test.yml (per-push, ubuntu-latest): - Cluster startup adds --sqsAddress 127.0.0.1:6350N for each of the 3 nodes plus --raftSqsMap for the leader-SQS lookup table. New port band 63501-63503 keeps SQS clear of redis (63791-3), dynamo (63801-3), and s3 (63901-3). - Wait-for-listeners loop now also checks the SQS ports. - New step "Run SQS HT-FIFO Jepsen workload against elastickv" runs `lein run -m elastickv.sqs-htfifo-workload --local --time-limit 5 --rate 5 --concurrency 5 --partition-count 4 --group-count 6 --sqs-ports 63501,63502,63503 --host 127.0.0.1` with a 120s outer timeout (3min job timeout). Same shape as the redis / dynamodb / s3 workload steps. - .github/workflows/jepsen.yml (VM-based, self-hosted): - New step "Run SQS HT-FIFO Jepsen workload" inside the Vagrant ctrl node, using the same nodes/time-limit/rate/ faults dispatch inputs as the redis / dynamodb / s3 steps. Routing flows through jepsen/src/elastickv/db.clj's existing :sqs-port + :sqs-region wiring (added in PR #738). Not in scope (follow-up): - .github/workflows/jepsen-test-scheduled.yml (the 6-hourly stress run) currently launches via cmd/server/demo.go, which doesn't speak SQS yet. Wiring HT-FIFO into the scheduled stress run would require either adding the SQS adapter to demo.go or switching the scheduled lane to the manual binary launch pattern that jepsen-test.yml uses. That change is larger than this PR's CI-wiring scope and is left as a follow-up. The capability gate (PR 5b-3, #734) accepts CreateQueue with PartitionCount > 1 on this single-shard 3-node cluster because no --sqsFifoPartitionMap is supplied: validateHTFIFORoutingCoverage returns nil when partitionResolver is nil, and the peer poll succeeds because every node is the same binary. The data plane exercises all the partitioned-FIFO key shapes (msg, vis, group, dedup) and the new partition-aware dispatch helpers; the storage groups still consolidate to the default Raft group on this topology. Refs: docs/design/2026_04_26_proposed_sqs_split_queue_fifo.md §11 PR 7. --- .github/workflows/jepsen-test.yml | 20 ++++++++++++++++++-- .github/workflows/jepsen.yml | 16 ++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/.github/workflows/jepsen-test.yml b/.github/workflows/jepsen-test.yml index e6424c907..93346d316 100644 --- a/.github/workflows/jepsen-test.yml +++ b/.github/workflows/jepsen-test.yml @@ -49,6 +49,7 @@ jobs: RAFT_REDIS_MAP="127.0.0.1:50051=127.0.0.1:63791,127.0.0.1:50052=127.0.0.1:63792,127.0.0.1:50053=127.0.0.1:63793" RAFT_S3_MAP="127.0.0.1:50051=127.0.0.1:63901,127.0.0.1:50052=127.0.0.1:63902,127.0.0.1:50053=127.0.0.1:63903" RAFT_DYNAMO_MAP="127.0.0.1:50051=127.0.0.1:63801,127.0.0.1:50052=127.0.0.1:63802,127.0.0.1:50053=127.0.0.1:63803" + RAFT_SQS_MAP="127.0.0.1:50051=127.0.0.1:63501,127.0.0.1:50052=127.0.0.1:63502,127.0.0.1:50053=127.0.0.1:63503" : > /tmp/elastickv-demo.pid for node in 1 2 3; do @@ -57,6 +58,7 @@ jobs: --redisAddress "127.0.0.1:6379${node}" \ --dynamoAddress "127.0.0.1:6380${node}" \ --s3Address "127.0.0.1:6390${node}" \ + --sqsAddress "127.0.0.1:6350${node}" \ --metricsAddress "" \ --pprofAddress "" \ --raftId "n${node}" \ @@ -65,15 +67,17 @@ jobs: --raftRedisMap "$RAFT_REDIS_MAP" \ --raftS3Map "$RAFT_S3_MAP" \ --raftDynamoMap "$RAFT_DYNAMO_MAP" \ + --raftSqsMap "$RAFT_SQS_MAP" \ > "/tmp/elastickv-demo-n${node}.log" 2>&1 & echo $! >> /tmp/elastickv-demo.pid done - echo "Waiting for redis (63791-63793), dynamo (63801-63803), and s3 (63901-63903) listeners..." + echo "Waiting for redis (63791-63793), dynamo (63801-63803), s3 (63901-63903), and sqs (63501-63503) listeners..." for i in {1..90}; do if nc -z 127.0.0.1 63791 && nc -z 127.0.0.1 63792 && nc -z 127.0.0.1 63793 \ && nc -z 127.0.0.1 63801 && nc -z 127.0.0.1 63802 && nc -z 127.0.0.1 63803 \ - && nc -z 127.0.0.1 63901 && nc -z 127.0.0.1 63902 && nc -z 127.0.0.1 63903; then + && nc -z 127.0.0.1 63901 && nc -z 127.0.0.1 63902 && nc -z 127.0.0.1 63903 \ + && nc -z 127.0.0.1 63501 && nc -z 127.0.0.1 63502 && nc -z 127.0.0.1 63503; then echo "Cluster is up" exit 0 fi @@ -142,6 +146,18 @@ jobs: timeout-minutes: 3 run: | timeout 120 ~/lein run -m elastickv.s3-workload --local --time-limit 5 --rate 10 --concurrency 10 --s3-ports 63901,63902,63903 --host 127.0.0.1 + - name: Run SQS HT-FIFO Jepsen workload against elastickv + working-directory: jepsen + # The HT-FIFO workload runs sends and receives across a 4-partition + # FIFO queue with content-based deduplication. The custom checker + # validates within-group ordering, no loss, and no duplicates. + # See jepsen/src/elastickv/sqs_htfifo_workload.clj. + timeout-minutes: 3 + run: | + timeout 120 ~/lein run -m elastickv.sqs-htfifo-workload --local \ + --time-limit 5 --rate 5 --concurrency 5 \ + --partition-count 4 --group-count 6 \ + --sqs-ports 63501,63502,63503 --host 127.0.0.1 - name: Stop demo cluster if: always() run: | diff --git a/.github/workflows/jepsen.yml b/.github/workflows/jepsen.yml index c749a5a66..54fece225 100644 --- a/.github/workflows/jepsen.yml +++ b/.github/workflows/jepsen.yml @@ -84,6 +84,22 @@ jobs: --faults ${{ github.event.inputs['faults'] || github.event.inputs.faults }} \ --concurrency 10" + - name: Run SQS HT-FIFO Jepsen workload + working-directory: jepsen + env: + HOME: ${{ github.workspace }}/jepsen/tmp-home + LEIN_HOME: ${{ github.workspace }}/jepsen/.lein + LEIN_JVM_OPTS: -Duser.home=${{ github.workspace }}/jepsen/tmp-home + run: | + vagrant ssh ctrl -c "cd ~/elastickv/jepsen && \ + lein run -m elastickv.sqs-htfifo-workload \ + --nodes n1,n2,n3,n4,n5 \ + --time-limit ${{ github.event.inputs['time-limit'] || github.event.inputs.time-limit }} \ + --rate ${{ github.event.inputs['rate'] || github.event.inputs.rate }} \ + --faults ${{ github.event.inputs['faults'] || github.event.inputs.faults }} \ + --partition-count 4 --group-count 8 \ + --concurrency 8" + - name: Collect Jepsen artifacts if: always() working-directory: jepsen From de3eb7c2d15fa062ec55490bdb539d7bbc1e707b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Tue, 5 May 2026 03:28:08 +0900 Subject: [PATCH 6/6] jepsen(sqs): polish per Claude review (PR #738, round 5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 5 verdict: Ready to merge. Two findings, both addressed: 1. (Actionable) CI shell timeout was tight against the workload's default 40s drain phase. Active test window was time-limit (5s) + drain-time (40s) = 45s, plus ~15-20s Lein JVM startup ≈ 60s inside a 120s shell timeout. Pass --drain-time 15 in the CI step: in --local mode the nemesis is a no-op so no message can become invisible due to partition/kill, making the 40s default (which exists to outlast the visibility-timeout window during real faults) overkill. 15s gives the same correctness with comfortable headroom under the 120s shell timeout. 2. (Pre-existing minor) --send-fraction CLI description said "Probability a generator op is :send" but the implementation clamps both ends with (max 1 ...) to prevent generator starvation, so 0.0 still emits ~9% sends and 1.0 still emits ~9% receives. Updated the description string to spell out the clamping at the boundaries. Tests: lein test elastickv.sqs-htfifo-workload-test passes 13/32. YAML lint clean. Refs: PR #738 round 5 review at run 25335805377. --- .github/workflows/jepsen-test.yml | 8 ++++++++ jepsen/src/elastickv/sqs_htfifo_workload.clj | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/jepsen-test.yml b/.github/workflows/jepsen-test.yml index 93346d316..67e70aeb1 100644 --- a/.github/workflows/jepsen-test.yml +++ b/.github/workflows/jepsen-test.yml @@ -152,11 +152,19 @@ jobs: # FIFO queue with content-based deduplication. The custom checker # validates within-group ordering, no loss, and no duplicates. # See jepsen/src/elastickv/sqs_htfifo_workload.clj. + # + # --drain-time 15: in --local mode the nemesis is a no-op, so no + # message can become invisible due to partition/kill — the 40s + # default drain (which protects against fault-induced + # visibility-timeout races) is overkill here. 15s leaves ample + # headroom under the 120s shell timeout against JVM startup and + # the 5s main phase. timeout-minutes: 3 run: | timeout 120 ~/lein run -m elastickv.sqs-htfifo-workload --local \ --time-limit 5 --rate 5 --concurrency 5 \ --partition-count 4 --group-count 6 \ + --drain-time 15 \ --sqs-ports 63501,63502,63503 --host 127.0.0.1 - name: Stop demo cluster if: always() diff --git a/jepsen/src/elastickv/sqs_htfifo_workload.clj b/jepsen/src/elastickv/sqs_htfifo_workload.clj index d16750724..9942972a5 100644 --- a/jepsen/src/elastickv/sqs_htfifo_workload.clj +++ b/jepsen/src/elastickv/sqs_htfifo_workload.clj @@ -571,7 +571,7 @@ [nil "--group-count N" "Number of distinct MessageGroupId values to spread sends across." :default default-group-count :parse-fn #(Integer/parseInt %)] - [nil "--send-fraction F" "Probability a generator op is :send (rest are :recv)." + [nil "--send-fraction F" "Probability a generator op is :send (rest are :recv). 0.0 and 1.0 still emit at least 1 op of each kind to prevent generator starvation; pass values in (0, 1) for a true mix." :default 0.5 :parse-fn #(Double/parseDouble %)] [nil "--drain-time SECONDS" "Receive-only drain phase after the main generator finishes (default: visibility-timeout + 10s = 40s)."