diff --git a/.github/workflows/jepsen-test-scheduled.yml b/.github/workflows/jepsen-test-scheduled.yml index ad925f8f..46a8129e 100644 --- a/.github/workflows/jepsen-test-scheduled.yml +++ b/.github/workflows/jepsen-test-scheduled.yml @@ -101,6 +101,16 @@ jobs: --max-txn-length ${{ inputs.max-txn-length || '4' }} \ --ports 63791,63792,63793 \ --host 127.0.0.1 + - name: Run Redis ZSet safety Jepsen workload against elastickv + working-directory: jepsen + timeout-minutes: 10 + run: | + timeout 480 ~/lein run -m elastickv.redis-zset-safety-workload \ + --time-limit ${{ inputs.time-limit || '150' }} \ + --rate ${{ inputs.rate || '10' }} \ + --concurrency ${{ inputs.concurrency || '8' }} \ + --ports 63791,63792,63793 \ + --host 127.0.0.1 - name: Run DynamoDB Jepsen workload against elastickv working-directory: jepsen timeout-minutes: 10 diff --git a/.github/workflows/jepsen-test.yml b/.github/workflows/jepsen-test.yml index aeac9354..aeb4e2d4 100644 --- a/.github/workflows/jepsen-test.yml +++ b/.github/workflows/jepsen-test.yml @@ -90,6 +90,11 @@ jobs: timeout-minutes: 3 run: | timeout 120 ~/lein run -m elastickv.redis-workload --time-limit 5 --rate 5 --concurrency 5 --ports 63791,63792,63793 --host 127.0.0.1 + - name: Run Redis ZSet safety Jepsen workload against elastickv + working-directory: jepsen + timeout-minutes: 3 + run: | + timeout 120 ~/lein run -m elastickv.redis-zset-safety-workload --time-limit 5 --rate 5 --concurrency 5 --ports 63791,63792,63793 --host 127.0.0.1 - name: Run DynamoDB Jepsen workload against elastickv working-directory: jepsen timeout-minutes: 3 diff --git a/jepsen/src/elastickv/jepsen_test.clj b/jepsen/src/elastickv/jepsen_test.clj index 8b9df329..7999c89c 100644 --- a/jepsen/src/elastickv/jepsen_test.clj +++ b/jepsen/src/elastickv/jepsen_test.clj @@ -1,6 +1,7 @@ (ns elastickv.jepsen-test (:gen-class) (:require [elastickv.redis-workload :as redis-workload] + [elastickv.redis-zset-safety-workload :as zset-safety-workload] [elastickv.dynamodb-workload :as dynamodb-workload] [elastickv.s3-workload :as s3-workload] [jepsen.cli :as cli])) @@ -14,6 +15,53 @@ (defn elastickv-s3-test [] (s3-workload/elastickv-s3-test {})) +(defn elastickv-zset-safety-test [] + (zset-safety-workload/elastickv-zset-safety-test {})) + +(def ^:private test-fns + "Map of user-facing test names to their constructor fns. The first + positional CLI arg selects which workload runs; if absent or unknown, + we default to `elastickv-test` for backward compatibility with + pre-existing invocations." + {"elastickv-test" elastickv-test + "elastickv-zset-safety-test" elastickv-zset-safety-test + "elastickv-dynamodb-test" elastickv-dynamodb-test + "elastickv-s3-test" elastickv-s3-test}) + (defn -main + "Dispatch to a named workload. Usage: + + lein run -m elastickv.jepsen-test [jepsen-subcmd] [jepsen-opts ...] + + Supported s: elastickv-test, elastickv-zset-safety-test, + elastickv-dynamodb-test, elastickv-s3-test. When the first positional + arg is not a known test name, we default to `elastickv-test` for + backward compatibility and forward ALL args to jepsen.cli/run!. + + The jepsen subcommand (`test` or `analyze`) is auto-prepended when + missing, so `lein run elastickv-zset-safety-test --nodes n1,n2` works + without the user repeating `test`." [& args] - (cli/run! (cli/single-test-cmd {:test-fn elastickv-test}) args)) + (let [[head & tail] args + [selected-fn remaining-args] (if-let [f (get test-fns head)] + [f tail] + [elastickv-test args]) + ;; jepsen.cli/run! requires a subcommand ("test" or "analyze") + ;; as the first arg. Insert "test" only when the user clearly + ;; did NOT supply a subcommand: + ;; - remaining-args is empty, OR + ;; - the first token is an option (starts with "-") + ;; If the first token looks like a subcommand (any non-option + ;; word, e.g. "test", "analyze", "serve", or a future jepsen.cli + ;; subcommand we don't hard-code), leave it alone and let + ;; jepsen.cli/run! handle it (including producing a better + ;; error message for unknown subcommands than we could here). + [next-head & _] remaining-args + prepend-test? (or (empty? remaining-args) + (and (string? next-head) + (.startsWith ^String next-head "-"))) + final-args (if prepend-test? + (cons "test" remaining-args) + remaining-args)] + (cli/run! (cli/single-test-cmd {:test-fn selected-fn}) + final-args))) diff --git a/jepsen/src/elastickv/redis_zset_safety_workload.clj b/jepsen/src/elastickv/redis_zset_safety_workload.clj new file mode 100644 index 00000000..0478967e --- /dev/null +++ b/jepsen/src/elastickv/redis_zset_safety_workload.clj @@ -0,0 +1,953 @@ +(ns elastickv.redis-zset-safety-workload + "Jepsen workload verifying stronger safety properties of elastickv's + Redis ZSet (sorted set) implementation under faults. + + Beyond the simple visibility check in redis-zset-workload, this workload + exercises score correctness, ordering, range queries, phantom-member + freedom, and atomicity of compound ZSet mutations by using a custom, + model-based Checker. + + Operations (all target a single well-known key): + + {:f :zadd :value [member score]} ZADD key score member + {:f :zincrby :value [member delta]} ZINCRBY key delta member + {:f :zrem :value member} ZREM key member + {:f :zrange-all} ZRANGE key 0 -1 WITHSCORES + {:f :zrangebyscore :value [lo hi]} ZRANGEBYSCORE key lo hi WITHSCORES + + Semantics checked (see `zset-safety-checker`): + + 1. Score correctness: the score of any member observed by a :zrange-all + read must match the model's latest committed score for that member, + OR must match a score written by an operation that is concurrent with + the read (we cannot linearize concurrent writes to the same member, + so any such \"in-flight\" value is permitted). + 2. Order preservation: the result of :zrange-all must be sorted by + (score ascending, member lexicographically ascending). + 3. ZRANGEBYSCORE correctness: every member in a score-range read must + have a latest committed (or concurrent) score within [lo, hi]; and + every model member with a score in [lo, hi] must either be present + or be subject to a concurrent mutation. + 4. No phantom members: every member observed by a read must have been + introduced by some successful (or in-flight) operation. + 5. Atomicity: there is no explicit \"partial\" state to probe from the + client, but the checker treats every :ok operation as atomic — any + visible inconsistency (member present with no matching op, score + disagreeing with any known write, etc.) is reported." + (:require [clojure.string :as str] + [clojure.tools.logging :refer [warn]] + [elastickv.cli :as cli] + [elastickv.db :as ekdb] + [jepsen.db :as jdb] + [jepsen [checker :as checker] + [client :as client] + [generator :as gen] + [net :as net]] + [jepsen.checker.timeline :as timeline] + [jepsen.control :as control] + [jepsen.nemesis :as nemesis] + [jepsen.nemesis.combined :as combined] + [jepsen.os :as os] + [jepsen.os.debian :as debian] + [taoensso.carmine :as car])) + +;; --------------------------------------------------------------------------- +;; Constants +;; --------------------------------------------------------------------------- + +(def ^:private zset-key "jepsen-zset-safety") + +(def default-nodes ["n1" "n2" "n3" "n4" "n5"]) + +;; A small, fixed universe of members keeps contention high and makes the +;; model's state small enough to enumerate. +(def ^:private members + (mapv #(str "m" %) (range 16))) + +;; --------------------------------------------------------------------------- +;; Client +;; --------------------------------------------------------------------------- + +(defn- parse-double-safe + "Parse a Redis score string into a Double. Redis serializes infinite + scores as \"inf\" / \"+inf\" / \"-inf\", which Java's Double/parseDouble + does not accept (it expects \"Infinity\" / \"-Infinity\"). Handle both + encodings so the checker doesn't throw on infinite ZSET scores." + [s] + (let [raw (str s) + lower (str/lower-case raw)] + (cond + (or (= lower "inf") (= lower "+inf") (= lower "infinity") (= lower "+infinity")) + Double/POSITIVE_INFINITY + + (or (= lower "-inf") (= lower "-infinity")) + Double/NEGATIVE_INFINITY + + :else + (Double/parseDouble raw)))) + +(defn- coerce-zincrby-score + "Carmine's ZINCRBY reply is normally a score string, but under error / + timeout / protocol edge cases it may be nil, a numeric value, or + something else entirely. Stringifying nil produces \"nil\", which + parse-double-safe would then hand to Double/parseDouble and throw. + Explicitly classify the response so the invoke! path can record + :unknown-response as :info instead of masking it in a catch-all. + + Returns one of: + [:ok (double score)] + [:nil] ; nil response + [:error ] ; Carmine error reply + [:unexpected ] ; anything else" + [response] + (cond + (nil? response) + [:nil] + + (number? response) + [:ok (double response)] + + (string? response) + (try + [:ok (parse-double-safe response)] + (catch NumberFormatException _ + [:unexpected response])) + + ;; Carmine surfaces Redis error replies as exceptions by default, + ;; but some codepaths wrap them in an ex-info / Throwable value. + (instance? Throwable response) + [:error (or (.getMessage ^Throwable response) (str response))] + + :else + [:unexpected response])) + +(defn- coerce-zrem-count + "Carmine's ZREM reply is normally a Long (count of removed members), + but under protocol edge cases / Carmine versions / RESP2 vs RESP3 + differences it can also arrive as a numeric string (\"1\") or raw + bytes. Blindly calling `(long reply)` on those forms throws + ClassCastException, which would fall through to the general exception + handler and mask the real signal. + + Returns a non-negative long count. Unparseable or unexpected values + are treated as 0 (i.e. \"nothing removed\") so the op still resolves + as :ok -- matching the existing nil-guard behaviour. + " + [response] + (cond + (nil? response) + 0 + + (number? response) + (long response) + + (string? response) + (try + (Long/parseLong ^String response) + (catch NumberFormatException _ 0)) + + (bytes? response) + (try + (Long/parseLong (String. ^bytes response "UTF-8")) + (catch NumberFormatException _ 0)) + + :else + 0)) + +(defn- parse-withscores + "Carmine returns a flat [member score member score ...] vector for + ZRANGE WITHSCORES. Convert to a sorted vector of [member (double score)] + preserving server-returned order (score ascending, then member). + + Throws on odd-length payloads: a WITHSCORES reply with a dangling member + is a protocol violation and this workload is meant to surface exactly + that kind of anomaly, not silently drop evidence." + [flat] + (when (odd? (count flat)) + (throw (ex-info "WITHSCORES reply has odd element count" + {:count (count flat) + :payload flat}))) + (->> flat + (partition 2) + (mapv (fn [[m s]] + [(if (bytes? m) (String. ^bytes m "UTF-8") (str m)) + (parse-double-safe s)])))) + +(defn- zincrby! + "Executes a ZINCRBY against conn-spec and returns Carmine's raw reply + (normally a score string). Extracted so tests can stub the Redis call + without going through the `car/wcar` macro." + [conn-spec key delta member] + (car/wcar conn-spec (car/zincrby key (double delta) member))) + +(defn- zrem! + "Executes a ZREM against conn-spec and returns Carmine's raw reply + (normally an integer count of removed members). Extracted so tests + can stub the Redis call without going through the `car/wcar` macro." + [conn-spec key member] + (car/wcar conn-spec (car/zrem key member))) + +(defrecord ElastickvRedisZSetSafetyClient [node->port conn-spec] + client/Client + + (open! [this test node] + (let [port (get node->port node 6379) + host (or (:redis-host test) (name node))] + (assoc this :conn-spec {:pool {} :spec {:host host + :port port + :timeout-ms 10000}}))) + + (close! [this _test] this) + + (setup! [this _test] + ;; Hard-fail when :conn-spec is missing after open!. Silently (or + ;; even loudly) proceeding would leave stale data from a previous + ;; run under zset-key and risk false-positive checker results from + ;; that dirty state. Better to abort the run and surface the + ;; configuration problem. + (let [cs (or (:conn-spec this) + (throw (ex-info + (str "ZSet safety setup! cannot clear prior state:" + " :conn-spec is missing on client (open! did" + " not populate it). Aborting to avoid running" + " against stale data under " zset-key ".") + {:type ::missing-conn-spec + :zset-key zset-key})))] + ;; The cleanup DEL MUST succeed. If it fails (connection refused, + ;; Redis error reply, timeout, whatever), stale data from a prior + ;; run survives under zset-key and can produce false-positive + ;; safety verdicts in the checker. Log loudly AND re-throw so + ;; Jepsen aborts the run instead of silently running against + ;; dirty state. + (try + (car/wcar cs (car/del zset-key)) + (catch Throwable t + (warn t "ZSet safety setup! DEL failed -- aborting to avoid stale data") + (throw (ex-info + (str "ZSet safety setup! failed to clear prior state at " + zset-key ": " (or (.getMessage t) (str t)) + ". Refusing to run against potentially stale data.") + {:type ::cleanup-failed + :zset-key zset-key} + t))))) + this) + + (teardown! [this _test] this) + + (invoke! [_ _test op] + (let [cs conn-spec] + (try + (case (:f op) + :zadd + (let [[member score] (:value op)] + (car/wcar cs (car/zadd zset-key (double score) member)) + (assoc op :type :ok)) + + :zincrby + (let [[member delta] (:value op) + new-score (zincrby! cs zset-key delta member) + [tag v] (coerce-zincrby-score new-score)] + (case tag + :ok (assoc op :type :ok :value [member v]) + :nil (do (warn (str "ZSet safety ZINCRBY returned nil for " member)) + (assoc op :type :info + :error :nil-response)) + :error (do (warn (str "ZSet safety ZINCRBY returned error reply: " v)) + (assoc op :type :info + :error {:kind :error-response + :message v})) + :unexpected (do (warn (str "ZSet safety ZINCRBY returned unexpected reply: " (pr-str v))) + (assoc op :type :info + :error {:kind :unexpected-response + :value (pr-str v)})))) + + :zrem + (let [member (:value op) + ;; Carmine normally returns an integer count. Guard + ;; against nil / missing reply (protocol edge, closed + ;; connection, etc.) AND against non-numeric shapes + ;; (string "1", raw bytes) that some Carmine versions + ;; or RESP3 codepaths surface. A naked `(long reply)` + ;; would NPE on nil and ClassCastException on + ;; string/bytes, falling through to the general + ;; Exception handler and masking the real signal. + removed (zrem! cs zset-key member) + n (coerce-zrem-count removed)] + (assoc op :type :ok :value [member (pos? n)])) + + :zrange-all + (let [flat (car/wcar cs (car/zrange zset-key 0 -1 "WITHSCORES"))] + (assoc op :type :ok :value (parse-withscores flat))) + + :zrangebyscore + (let [[lo hi] (:value op) + flat (car/wcar cs (car/zrangebyscore zset-key + (double lo) + (double hi) + "WITHSCORES"))] + (assoc op :type :ok :value {:bounds [lo hi] + :members (parse-withscores flat)}))) + (catch Throwable t + (warn t (str "ZSet safety op failed: " (:f op))) + (assoc op :type :info :error (or (.getMessage ^Throwable t) (str t)))))))) + +;; --------------------------------------------------------------------------- +;; Generator +;; --------------------------------------------------------------------------- + +(defn- rand-member [] (rand-nth members)) + +(defn- gen-op [] + (let [roll (rand)] + (cond + (< roll 0.35) + {:f :zadd :value [(rand-member) (double (- (rand-int 200) 100))]} + + (< roll 0.55) + {:f :zincrby :value [(rand-member) + (double (- (rand-int 20) 10))]} + + (< roll 0.65) + {:f :zrem :value (rand-member)} + + (< roll 0.90) + {:f :zrange-all} + + :else + (let [a (- (rand-int 200) 100) + b (- (rand-int 200) 100)] + {:f :zrangebyscore :value [(double (min a b)) (double (max a b))]})))) + +(defn- op-generator [] + (reify gen/Generator + (op [this test ctx] + [(gen/fill-in-op (gen-op) ctx) this]) + (update [this _ _ _] this))) + +;; --------------------------------------------------------------------------- +;; Checker +;; --------------------------------------------------------------------------- + +(defn- sorted-by-score-then-member? + "Validates the zset invariant: (score, member) ascending, strict." + [entries] + (loop [prev nil + es entries] + (cond + (empty? es) true + (nil? prev) (recur (first es) (rest es)) + :else + (let [[pm ps] prev + [cm cs] (first es)] + (cond + (< ps cs) (recur (first es) (rest es)) + (> ps cs) false + ;; equal score: members must be strictly lexicographically ordered + (neg? (compare pm cm)) (recur (first es) (rest es)) + :else false))))) + +(defn- index-by-time + "Return a vector of ops sorted by :index." + [ops] + (vec (sort-by :index ops))) + +(defn- pair-invokes-with-completions + "Returns a sequence of {:invoke inv :complete cmp} pairs for each + completed op (ok/fail/info). Invokes without a matching completion are + paired with nil (still in flight at history end)." + [history] + (let [by-process (group-by :process history)] + (mapcat + (fn [[_p ops]] + (let [ops (index-by-time ops)] + (loop [ops ops acc []] + (if (empty? ops) acc + (let [[o & rest-ops] ops] + (cond + (= :invoke (:type o)) + (let [c (first rest-ops)] + (if (and c (#{:ok :fail :info} (:type c))) + (recur (drop 1 rest-ops) (conj acc {:invoke o :complete c})) + (recur rest-ops (conj acc {:invoke o :complete nil})))) + :else (recur rest-ops acc))))))) + by-process))) + +(defn- mutation? + [op] + (#{:zadd :zincrby :zrem} (:f op))) + +(defn- completed-mutation-window + "For each completed mutation, produce + {:member m :score s :zrem? bool? :unknown-score? bool? :invoke-idx i + :complete-idx j :type t}. + - :zadd: :score is the requested score (always known). + - :zincrby: when :ok, :score is the server-returned final score. When + :info or pending, the resulting score is unknown (depends on which + other ops were applied first); :unknown-score? is set so allowed- + scores-for-member can short-circuit the strict score check. + - :zrem: :removed? is the boolean returned by ZREM (true iff the + member existed). A no-op ZREM (returns 0) does NOT mutate state, so + the model must not treat it as a deletion. + :info / :pending mutations are still emitted so concurrent windows + account for their possible effect." + [pairs] + (keep + (fn [{:keys [invoke complete]}] + (when (and invoke (mutation? invoke)) + (let [f (:f invoke) + t (if complete (:type complete) :pending) + inv-idx (:index invoke) + cmp-idx (when complete (:index complete))] + (case f + :zadd + (let [[m s] (:value invoke)] + {:f :zadd :member m :score (double s) + :type t :invoke-idx inv-idx :complete-idx cmp-idx}) + + :zincrby + (let [[m _delta] (:value invoke) + ;; ZINCRBY's resulting score is only knowable from the + ;; server reply. For :info/:pending we don't have it. + ok? (= :ok t) + s (when (and ok? (vector? (:value complete))) + (second (:value complete)))] + {:f :zincrby :member m :score (some-> s double) + :unknown-score? (not (and ok? (some? s))) + :type t :invoke-idx inv-idx :complete-idx cmp-idx}) + + :zrem + (let [m (:value invoke) + ;; invoke! returns [member removed?]. For :info we don't + ;; know whether the member was removed. + removed? (cond + (and (= :ok t) + (vector? (:value complete))) + (boolean (second (:value complete))) + ;; pending / info: assume removal could have + ;; happened; the checker treats it as a + ;; possibly-concurrent deletion via the + ;; concurrent window. + :else true)] + {:f :zrem :member m :score nil + :zrem? true :removed? removed? + :type t :invoke-idx inv-idx :complete-idx cmp-idx}))))) + pairs)) + +(defn- mutations-by-member + [mutations] + (group-by :member mutations)) + +(defn- concurrent? + "A mutation m is concurrent with a read r iff m's invoke precedes r's + completion AND m's completion (or end-of-history) follows r's invoke." + [m read-inv-idx read-cmp-idx] + (and (<= (:invoke-idx m) read-cmp-idx) + (or (nil? (:complete-idx m)) + (>= (:complete-idx m) read-inv-idx)))) + +(defn- apply-mutation-to-state + "Fold one mutation into a per-member state {:present? bool :score s}. + A no-op ZREM (member did not exist; :removed? false) leaves state + unchanged so the checker doesn't falsely conclude the member is gone." + [st m] + (case (:f m) + :zadd {:present? true :score (:score m)} + :zincrby {:present? true :score (:score m)} + :zrem (if (:removed? m) + {:present? false :score nil} + st))) + +(defn- model-before + "Construct authoritative per-member state from mutations whose + completions strictly precede read-inv-idx. Returns + {member -> {:present? bool :score s}}. Only :ok mutations contribute; + :info / :pending are deferred to the concurrent-window check." + [mutations-by-m read-inv-idx] + (reduce-kv + (fn [model member muts] + (let [applied (->> muts + (filter #(and (= :ok (:type %)) + (some? (:complete-idx %)) + (< (:complete-idx %) read-inv-idx))) + (sort-by :complete-idx)) + state (reduce apply-mutation-to-state nil applied)] + (if state + (assoc model member state) + model))) + {} + mutations-by-m)) + +(defn- concurrent-mutations-for-member + "All mutations concurrent with the read window that could have taken + effect. :fail completions are excluded: in Jepsen, :fail means the op + definitively did NOT execute, so it contributes neither an allowed + score nor uncertainty about presence. :ok and :info/:pending are + included (either may be visible to the read)." + [muts read-inv-idx read-cmp-idx] + (filter #(and (not= :fail (:type %)) + (concurrent? % read-inv-idx read-cmp-idx)) + muts)) + +(defn- write-op? + "True iff the mutation adds/updates the member's score (i.e. would + make the member present). :zrem is NOT a write-op here." + [m] + (#{:zadd :zincrby} (:f m))) + +(defn- allowed-scores-for-member + "Compute the set of scores considered valid for `member` by a read + whose window is [read-inv-idx, read-cmp-idx], based on committed state + and any concurrent/uncertain mutations. + + Linearizability demands a read observes either (a) the latest committed + state in real-time order, or (b) the effect of a write still concurrent + with the read. We therefore restrict the committed score set to + 'candidates' — committed mutations NOT strictly followed in real time + by another committed mutation (i.e. no other committed op's invoke + comes after this op's completion). Scores from strictly superseded + committed mutations are NOT admissible. + + When multiple candidates remain (their windows overlap), they can + serialize in any real-time-consistent order: the read may legitimately + observe the outcome of any of them. Thus presence is required only + when EVERY admissible serialization leaves the member present; presence + is forbidden only when EVERY admissible serialization leaves it absent. + + Returns: + :scores - set of acceptable scores (from candidate + committed ops + pre-read :info + concurrent + writes with a known score). + :unknown-score? - true iff any concurrent / pre-read :info + ZINCRBY's resulting score is unknown. When set, + the caller MUST skip the strict score-membership + check to stay sound. + :can-be-present? - true iff SOME admissible linearization leaves + the member present. + :must-be-present? - true iff EVERY admissible linearization leaves + the member present (i.e. some candidate is a + write, no candidate is a ZREM, and no uncertain + ZREM could have applied before the read)." + [mutations-by-m member read-inv-idx read-cmp-idx] + (let [muts (get mutations-by-m member []) + ;; :ok mutations that completed strictly before the read. + preceding (->> muts + (filter #(and (= :ok (:type %)) + (some? (:complete-idx %)) + (< (:complete-idx %) read-inv-idx)))) + ;; Real-time "last-wins" / chain-tail candidate filter: a + ;; preceding mutation m is admissible iff no OTHER preceding + ;; mutation m' has m'.invoke-idx > m.complete-idx (i.e. m' + ;; strictly follows m in real time). Equivalent: + ;; m.complete-idx >= max(invoke-idx) over preceding. + ;; + ;; Importantly this applies to :zincrby as well: a sequentially + ;; committed ZINCRBY chain has a forced linearization (each + ;; :ok :zincrby pins the pre-op and post-op states), so only + ;; the latest chain tail's return value is a valid final score + ;; for a post-chain read. An intermediate ZINCRBY's return + ;; value is NOT admissible once another mutation strictly + ;; follows it and commits before the read. A ZADD that strictly + ;; follows a ZINCRBY likewise resets the chain (the ZADD's + ;; absolute score becomes the only candidate). + ;; + ;; When multiple candidates remain (their invoke/complete + ;; windows overlap), they may serialize in any real-time- + ;; consistent order and any of their return values is a valid + ;; final state. + max-inv (reduce max -1 (map :invoke-idx preceding)) + candidates (filterv #(>= (:complete-idx %) max-inv) preceding) + ;; :info mutations that completed before the read: they may or + ;; may not have taken effect server-side. + pre-read-info (->> muts + (filter #(and (= :info (:type %)) + (some? (:complete-idx %)) + (< (:complete-idx %) read-inv-idx)))) + ;; Concurrent mutations: windows overlap the read. Include both + ;; :ok and :info since either may have taken effect. + concurrent (concurrent-mutations-for-member muts read-inv-idx read-cmp-idx) + ;; Uncertain mutations: anything whose effect on the read is not + ;; fully determined by committed real-time order alone. + uncertain (concat pre-read-info concurrent) + + add-scores (fn [acc m] + (case (:f m) + :zadd (conj acc (:score m)) + :zincrby (cond-> acc (some? (:score m)) (conj (:score m))) + :zrem acc)) + ;; Admissible scores: candidate committed + pre-read :info + + ;; concurrent writes (with a known score). + scores (as-> #{} s + (reduce add-scores s candidates) + (reduce add-scores s uncertain)) + + has-unknown-incr? (fn [coll] + (some #(and (= :zincrby (:f %)) + (:unknown-score? %)) + coll)) + ;; Classify uncertain ZINCRBYs by whether their resulting score + ;; is known. The resulting score of a read relative to ZINCRBYs + ;; depends only on which of them took effect before the read + ;; observed state AND whether each such ZINCRBY's return value + ;; is recorded. + ;; * Any uncertain ZINCRBY with UNKNOWN score (:info/:pending): + ;; the post-op score is not recoverable from the history, so + ;; we must relax the strict score check -- any numeric score + ;; is admissible. + ;; * All uncertain ZINCRBYs :ok with known return values: + ;; every recorded return pins the ZINCRBY's post-op state + ;; and (because ZINCRBY reads-then-writes atomically) its + ;; pre-op state. Any real-time consistent linearization + ;; therefore ends on one of those known return values (or + ;; on a candidate's score). :scores already contains all + ;; of them via the add-scores reduction over `uncertain`, + ;; so the strict score check is sound. Intermediate + ;; "prefix-sum" values (pre + delta_i for just one of + ;; several concurrent zincrbys) are NOT admissible final + ;; states: the return values constrain the serialization + ;; order, and no legitimate read can observe a partial sum + ;; that doesn't match any recorded post-op score. + unknown-score? (has-unknown-incr? uncertain) + + any-candidate-write? (some write-op? candidates) + any-candidate-zrem? (some #(= :zrem (:f %)) candidates) + any-uncertain-write? (some write-op? uncertain) + any-uncertain-zrem? (some #(= :zrem (:f %)) uncertain) + + ;; Some linearization of candidates ends with the member + ;; present. Because candidates have overlapping windows (they + ;; all share the same max-inv), any of them can serialize last. + ;; So presence is allowed iff at least one candidate is a write. + candidate-can-be-present? (boolean any-candidate-write?) + ;; Some linearization of candidates ends with the member absent. + candidate-can-be-absent? (or (empty? candidates) + (boolean any-candidate-zrem?)) + + ;; can-be-present?: at least one admissible linearization + ;; (candidates + uncertain) ends with the member present. + ;; Presence REQUIRES a write-op (ZADD / ZINCRBY) somewhere in + ;; the admissible set -- either a candidate committed write or + ;; an uncertain concurrent/pre-read :info write. ZREM never + ;; contributes existence evidence: since `setup!` clears the + ;; key at test start, an observed member that never had a ZADD + ;; or ZINCRBY touch it must be a phantom regardless of any + ;; ZREM's :removed? flag (which may be defaulted to true on + ;; :info for uncertainty accounting only). + can-be-present? (or candidate-can-be-present? + any-uncertain-write?) + + ;; must-be-present?: EVERY admissible linearization ends with + ;; the member present. Requires the candidate outcome to be + ;; always-present (candidate write, no candidate zrem) AND no + ;; uncertain zrem that could reorder last to remove it. + must-be-present? (boolean (and any-candidate-write? + (not candidate-can-be-absent?) + (not any-uncertain-zrem?)))] + {:scores scores + :unknown-score? (boolean unknown-score?) + :can-be-present? (boolean can-be-present?) + :must-be-present? must-be-present?})) + +(defn- score-definitely-in-range? + "True iff the member's committed score is definitively in [lo, hi] + for the purposes of completeness: every candidate score is inside the + range AND no uncertain/concurrent mutation could have produced an + unknown or out-of-range score. Used by ZRANGEBYSCORE completeness." + [scores unknown-score? lo hi] + (boolean (and (not unknown-score?) + (seq scores) + (every? #(<= lo % hi) scores)))) + +(defn- duplicate-members + "Return the set of members that appear more than once in entries." + [entries] + (->> entries + (map first) + frequencies + (keep (fn [[m n]] (when (> n 1) m))) + set)) + +(defn- check-zrange-all + [mutations-by-m {:keys [invoke complete] :as _pair}] + (let [entries (:value complete) + inv-idx (:index invoke) + cmp-idx (:index complete) + errors (atom [])] + ;; 1. Ordering + (when-not (sorted-by-score-then-member? entries) + (swap! errors conj {:kind :unsorted + :index cmp-idx + :entries entries})) + ;; 1b. No duplicate members: a ZSet read must return each member at + ;; most once. A duplicate-member result could otherwise satisfy + ;; ordering and score-membership checks while hiding a real bug. + (let [dupes (duplicate-members entries)] + (when (seq dupes) + (swap! errors conj {:kind :duplicate-members + :index cmp-idx + :members dupes}))) + ;; 2. For each observed (member,score): validate presence + score. + ;; can-be-present? catches both phantoms (member never existed) + ;; and stale reads (member committed-removed before the read + ;; with no concurrent re-add). + (doseq [[member score] entries] + (let [{:keys [scores can-be-present? unknown-score?]} + (allowed-scores-for-member mutations-by-m member inv-idx cmp-idx)] + (cond + (not can-be-present?) + (swap! errors conj {:kind :unexpected-presence + :index cmp-idx + :member member + :score score}) + ;; Skip the strict score check when any concurrent ZINCRBY's + ;; resulting score is unknown: the read could legitimately + ;; observe any value the in-flight increment produces. + unknown-score? nil + (not (contains? scores score)) + (swap! errors conj {:kind :score-mismatch + :index cmp-idx + :member member + :observed score + :allowed scores})))) + ;; 3. Completeness: model-required members must appear. + ;; A member is required-present only if every admissible + ;; linearization leaves it present (must-be-present?). This + ;; correctly skips members that an :info or concurrent ZREM + ;; might have removed before the read. + (let [model (model-before mutations-by-m inv-idx) + observed-members (into #{} (map first) entries)] + (doseq [[member _] model] + (let [{:keys [must-be-present?]} + (allowed-scores-for-member mutations-by-m member inv-idx cmp-idx)] + (when (and must-be-present? + (not (contains? observed-members member))) + (swap! errors conj {:kind :missing-member + :index cmp-idx + :member member}))))) + @errors)) + +(defn- check-zrangebyscore + [mutations-by-m {:keys [invoke complete] :as _pair}] + (let [{:keys [bounds members]} (:value complete) + [lo hi] bounds + inv-idx (:index invoke) + cmp-idx (:index complete) + errors (atom [])] + (when-not (sorted-by-score-then-member? members) + (swap! errors conj {:kind :unsorted-range + :index cmp-idx + :bounds bounds + :members members})) + (let [dupes (duplicate-members members)] + (when (seq dupes) + (swap! errors conj {:kind :duplicate-members-range + :index cmp-idx + :bounds bounds + :members dupes}))) + ;; Observed members must be within bounds AND have a known allowed score. + (doseq [[member score] members] + (when (or (< score lo) (> score hi)) + (swap! errors conj {:kind :out-of-range + :index cmp-idx + :bounds bounds + :member member + :score score})) + (let [{:keys [scores can-be-present? unknown-score?]} + (allowed-scores-for-member mutations-by-m member inv-idx cmp-idx)] + (cond + (not can-be-present?) + (swap! errors conj {:kind :unexpected-presence-range + :index cmp-idx + :member member + :score score}) + unknown-score? nil + (not (contains? scores score)) + (swap! errors conj {:kind :score-mismatch-range + :index cmp-idx + :member member + :observed score + :allowed scores})))) + ;; Completeness within bounds: a model member must appear only when + ;; (a) every admissible linearization leaves it present + ;; (must-be-present?), AND + ;; (b) its score is definitively within [lo, hi] across all + ;; admissible linearizations (no uncertain ZINCRBY, every + ;; candidate score inside the bounds). + ;; Uncertain scores (concurrent/:info ZINCRBY) must NOT cause + ;; completeness failures when the resulting score is unknown. + (let [model (model-before mutations-by-m inv-idx) + observed-members (into #{} (map first) members)] + (doseq [[member _] model] + (let [{:keys [must-be-present? scores unknown-score?]} + (allowed-scores-for-member mutations-by-m member inv-idx cmp-idx)] + (when (and must-be-present? + (score-definitely-in-range? scores unknown-score? lo hi) + (not (contains? observed-members member))) + ;; Report the full set of admissible scores (:allowed), not + ;; just an arbitrary first element -- picking `(first + ;; scores)` on a multi-element set is misleading when + ;; concurrent writers leave several linearizations valid. + ;; :allowed matches the convention used by the sibling + ;; :score-mismatch-range error above. :expected-score is + ;; retained (as `(first scores)` for a single-element set, + ;; nil otherwise) for backward compatibility with any + ;; out-of-tree consumers. + (swap! errors conj {:kind :missing-member-range + :index cmp-idx + :bounds bounds + :member member + :allowed scores + :expected-score (when (= 1 (count scores)) + (first scores))}))))) + @errors)) + +(defn zset-safety-checker + "Custom Jepsen checker: validates ZSet safety properties using a + last-writer model combined with a concurrent-write relaxation." + [] + (reify checker/Checker + (check [_ _test history _opts] + (let [pairs (pair-invokes-with-completions history) + mutations (completed-mutation-window pairs) + mutations-by-m (mutations-by-member mutations) + read-pairs (filter (fn [{:keys [invoke complete]}] + (and invoke complete + (= :ok (:type complete)) + (#{:zrange-all :zrangebyscore} + (:f invoke)))) + pairs) + all-errors (reduce + (fn [acc {:keys [invoke] :as pair}] + (into acc + (case (:f invoke) + :zrange-all (check-zrange-all mutations-by-m pair) + :zrangebyscore (check-zrangebyscore mutations-by-m pair)))) + [] + read-pairs) + by-kind (group-by :kind all-errors) + ;; Vacuous-pass guard: if the run produced zero + ;; successful reads, we have no evidence that the system + ;; under test actually satisfies ZSet safety -- every op + ;; may have been downgraded to :info because Redis was + ;; unreachable or every read timed out. Returning + ;; `:valid? true` in that case would be a false-green. + ;; Emit `:valid? :unknown` with a diagnostic reason; the + ;; cli's `fail-on-invalid!` treats anything other than + ;; `true` as a failure (see elastickv.cli/fail-on-invalid!). + no-successful-reads? (zero? (count read-pairs)) + valid? (cond + (seq all-errors) false + no-successful-reads? :unknown + :else true)] + (cond-> {:valid? valid? + :reads (count read-pairs) + :mutations (count mutations) + :error-count (count all-errors) + :errors-by-kind (into {} (map (fn [[k v]] [k (count v)]) by-kind)) + :first-errors (take 20 all-errors)} + no-successful-reads? + (assoc :reason + (str "No successful :zrange-all / :zrangebyscore reads" + " completed -- cannot assert ZSet safety. Likely" + " Redis was unreachable or every read timed out;" + " re-run against a healthy cluster."))))))) + +;; --------------------------------------------------------------------------- +;; Workload +;; --------------------------------------------------------------------------- + +(defn elastickv-zset-safety-workload + [opts] + (let [node->port (or (:node->port opts) + (zipmap default-nodes (repeat 6379))) + client (->ElastickvRedisZSetSafetyClient node->port nil)] + {:client client + :checker (checker/compose + {:zset-safety (zset-safety-checker) + :timeline (timeline/html)}) + :generator (op-generator) + :final-generator (gen/once {:f :zrange-all})})) + +(defn elastickv-zset-safety-test + "Builds a Jepsen test map that drives elastickv's Redis ZSet safety + workload." + ([] (elastickv-zset-safety-test {})) + ([opts] + (let [nodes (or (:nodes opts) default-nodes) + redis-ports (or (:redis-ports opts) + (repeat (count nodes) (or (:redis-port opts) 6379))) + node->port (or (:node->port opts) + (cli/ports->node-map redis-ports nodes)) + local? (:local opts) + db (if local? + jdb/noop + (ekdb/db {:grpc-port (or (:grpc-port opts) 50051) + :redis-port node->port + :raft-groups (:raft-groups opts) + :shard-ranges (:shard-ranges opts)})) + rate (double (or (:rate opts) 10)) + time-limit (or (:time-limit opts) 60) + faults (if local? + [] + (cli/normalize-faults (or (:faults opts) [:partition :kill]))) + nemesis-p (when-not local? + (combined/nemesis-package {:db db + :faults faults + :interval (or (:fault-interval opts) 40)})) + nemesis-gen (if nemesis-p + (:generator nemesis-p) + (gen/once {:type :info :f :noop})) + workload (elastickv-zset-safety-workload + (assoc opts :node->port node->port))] + (merge workload + {:name (or (:name opts) "elastickv-redis-zset-safety") + :nodes nodes + :db db + :redis-host (:redis-host opts) + :os (if local? os/noop debian/os) + :net (if local? net/noop net/iptables) + :ssh (merge {:username "vagrant" + :private-key-path "/home/vagrant/.ssh/id_rsa" + :strict-host-key-checking false} + (when local? {:dummy true}) + (:ssh opts)) + :remote control/ssh + :nemesis (if nemesis-p (:nemesis nemesis-p) nemesis/noop) + ;; The inner workload's :final-generator is the trivially- + ;; serializable (gen/once {:f :zrange-all}) -- a single + ;; Limit defrecord wrapping a plain map. It round-trips + ;; through Jepsen 0.3.x's Fressian store cleanly + ;; (verified at 86 bytes), so we don't override it here. + :concurrency (or (:concurrency opts) 5) + :generator (->> (:generator workload) + (gen/nemesis nemesis-gen) + (gen/stagger (/ rate)) + (gen/time-limit time-limit))})))) + +;; --------------------------------------------------------------------------- +;; CLI +;; --------------------------------------------------------------------------- + +(def zset-safety-cli-opts + [[nil "--ports PORTS" "Comma-separated Redis ports (per node)." + :default nil + :parse-fn (fn [s] + (->> (str/split s #",") + (remove str/blank?) + (mapv #(Integer/parseInt %))))] + [nil "--redis-port PORT" "Redis port applied to all nodes." + :default 6379 + :parse-fn #(Integer/parseInt %)]]) + +(defn- prepare-zset-safety-opts [options] + (let [ports (or (:ports options) nil) + options (cli/parse-common-opts options ports)] + (assoc options + :redis-host (:host options) + :redis-ports ports + :redis-port (:redis-port options)))) + +(defn -main [& args] + (cli/run-workload! args + (into cli/common-cli-opts zset-safety-cli-opts) + prepare-zset-safety-opts + elastickv-zset-safety-test)) diff --git a/jepsen/test/elastickv/redis_zset_safety_workload_test.clj b/jepsen/test/elastickv/redis_zset_safety_workload_test.clj new file mode 100644 index 00000000..a6eae2ae --- /dev/null +++ b/jepsen/test/elastickv/redis_zset_safety_workload_test.clj @@ -0,0 +1,864 @@ +(ns elastickv.redis-zset-safety-workload-test + "Unit tests for the ZSet safety workload's test-spec construction and + the model-based checker's edge cases (no-op ZREM, :info ZINCRBY)." + (:require [clojure.test :refer :all] + [jepsen.checker :as checker] + [jepsen.client :as client] + [elastickv.redis-zset-safety-workload :as workload])) + +;; --------------------------------------------------------------------------- +;; Test-spec construction +;; --------------------------------------------------------------------------- + +(deftest builds-test-spec + (let [t (workload/elastickv-zset-safety-test {})] + (is (map? t)) + (is (= "elastickv-redis-zset-safety" (:name t))) + (is (= ["n1" "n2" "n3" "n4" "n5"] (:nodes t))) + (is (some? (:client t))) + (is (some? (:checker t))) + (is (some? (:generator t))))) + +(deftest custom-options-override-defaults + (let [t (workload/elastickv-zset-safety-test + {:time-limit 30 + :concurrency 8 + :rate 4})] + (is (= 8 (:concurrency t))))) + +;; --------------------------------------------------------------------------- +;; Checker edge cases +;; --------------------------------------------------------------------------- + +(defn- run-checker + "Run the workload's safety checker against an in-memory history. + Bypasses the composed timeline.html checker (which writes files to + the test store) so tests stay hermetic." + [history] + (checker/check (workload/zset-safety-checker) + (workload/elastickv-zset-safety-test {}) + history + nil)) + +(deftest noop-zrem-does-not-flag-correct-read + ;; ZREM of a member that was never added returns 0 (no-op). The model + ;; must not treat it as a deletion. A subsequent read showing the + ;; absence of that member is correct. + (let [history [{:type :invoke :process 0 :f :zrem :value "ghost" :index 0} + {:type :ok :process 0 :f :zrem :value ["ghost" false] :index 1} + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 3} + {:type :invoke :process 0 :f :zrange-all :index 4} + {:type :ok :process 0 :f :zrange-all :value [["m1" 1.0]] :index 5}] + result (run-checker history)] + (is (:valid? result) (str "expected valid, got: " result)))) + +(deftest info-zincrby-skips-strict-score-check + ;; ZINCRBY whose response was lost (:info) leaves the resulting score + ;; unknown. A read concurrent with such an op observing some derived + ;; score must NOT be flagged as a score mismatch. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 5] :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + {:type :ok :process 0 :f :zrange-all :value [["m1" 6.0]] :index 4} + {:type :info :process 1 :f :zincrby :value ["m1" 5] :index 5}] + result (run-checker history)] + (is (:valid? result) (str "expected valid, got: " result)))) + +(deftest score-mismatch-is-detected-when-no-uncertainty + ;; Sanity check: with all ops :ok and no concurrency, an obviously + ;; wrong observed score IS flagged. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zrange-all :index 2} + {:type :ok :process 0 :f :zrange-all :value [["m1" 999.0]] :index 3}] + result (run-checker history)] + (is (not (:valid? result)) (str "expected mismatch, got: " result)))) + +(deftest single-ok-concurrent-zincrby-still-validates-scores + ;; :unknown-score? must NOT be set when exactly one concurrent + ;; ZINCRBY is :ok (and therefore has a known resulting score). The + ;; read may observe either the pre-op score or the post-op score, + ;; both of which are in :scores. An arbitrary impossible score + ;; (e.g. 999.0) must still be flagged as a :score-mismatch, not + ;; waved through by `:unknown-score?`. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 5] :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + ;; Read observes 999.0 — not 1.0 (pre) or 6.0 (post). + {:type :ok :process 0 :f :zrange-all + :value [["m1" 999.0]] :index 4} + ;; ZINCRBY eventually completes :ok with known score 6. + {:type :ok :process 1 :f :zincrby :value ["m1" 6.0] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected score-mismatch to still fire, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +(deftest two-concurrent-zincrbys-relax-score-check + ;; Prefix-sum ordering matters: with two concurrent ZINCRBYs, the + ;; intermediate score (pre + one delta) is reachable and need not be + ;; in :scores. The checker must relax the strict score check. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + {:type :invoke :process 0 :f :zrange-all :index 4} + ;; Intermediate 3.0 = 1 + 2 (before +3 applied). + {:type :ok :process 0 :f :zrange-all + :value [["m1" 3.0]] :index 5} + {:type :ok :process 1 :f :zincrby :value ["m1" 3.0] :index 6} + {:type :ok :process 2 :f :zincrby :value ["m1" 6.0] :index 7}] + result (run-checker history)] + (is (:valid? result) + (str "expected relaxation for >=2 concurrent ZINCRBYs, got: " result)))) + +(deftest no-op-zrem-alone-does-not-false-positive + ;; CI-observed false positive: a member whose only prior ops are no-op + ;; ZREMs was classified as :score-mismatch with :allowed #{} instead + ;; of treated as never-existed (:phantom candidate, empty read -> OK). + ;; A read that observes NO such member must be accepted as valid. + (let [history [{:type :invoke :process 0 :f :zrem :value "never-added" :index 0} + {:type :invoke :process 1 :f :zrange-all :index 1} + {:type :ok :process 1 :f :zrange-all :value [] :index 2} + {:type :ok :process 0 :f :zrem :value ["never-added" false] :index 3}] + result (run-checker history)] + (is (:valid? result) (str "expected valid, got: " result)))) + +(deftest duplicate-members-are-flagged + ;; ZRANGE must not return the same member twice. + ;; With a hypothetical committed + concurrent score for the same + ;; member, a duplicate could sneak past sort + score-membership + ;; checks. Enforce distinctness explicitly. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zrange-all :index 2} + {:type :ok :process 0 :f :zrange-all + :value [["m1" 1.0] ["m1" 1.0]] :index 3}] + result (run-checker history)] + (is (not (:valid? result)) (str "expected duplicate-members error, got: " result)))) + +(deftest overlapping-committed-zadds-allow-either-score + ;; Two :ok ZADDs for the same member whose + ;; invoke/complete windows overlap have ambiguous serialization + ;; order. Either's resulting score is a valid post-state; the checker + ;; must not pin to the higher :complete-idx value only. + ;; Timeline (overlap between A's [invoke=0, complete=3] and + ;; B's [invoke=1, complete=2]): + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :invoke :process 1 :f :zadd :value ["m1" 9] :index 1} + {:type :ok :process 1 :f :zadd :value ["m1" 9] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 3} + ;; Post-commit: either 5 or 9 is a valid final score. + ;; A read observing 5 must NOT be flagged as mismatch. + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 5.0]] :index 5}] + result (run-checker history)] + (is (:valid? result) + (str "expected valid under overlapping-writes relaxation, got: " result)))) + +(deftest info-before-read-is-considered-uncertain + ;; An :info mutation that completed before a + ;; later read may have taken effect. It must be considered a possible + ;; source of state for that read, rather than being ignored by both + ;; model-before and the concurrent window. + (let [history [;; Add m1 with score 1. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZINCRBY m1 by 5 -- response lost, recorded :info. + {:type :invoke :process 1 :f :zincrby :value ["m1" 5] :index 2} + {:type :info :process 1 :f :zincrby :value ["m1" 5] :index 3} + ;; Later read observes m1 at score 6 (increment applied + ;; server-side before the response was lost). Valid. + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 6.0]] :index 5}] + result (run-checker history)] + (is (:valid? result) + (str "expected :info-before-read to skip strict score check, got: " result)))) + +;; --------------------------------------------------------------------------- +;; Stale-read / phantom / superseded-committed checks +;; --------------------------------------------------------------------------- + +(deftest phantom-member-is-flagged + ;; A read that observes a member which was never added + ;; (no ZADD/ZINCRBY/true-ZREM anywhere) must be rejected. + (let [history [{:type :invoke :process 0 :f :zrange-all :index 0} + {:type :ok :process 0 :f :zrange-all + :value [["never-added" 42.0]] :index 1}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) (str "expected phantom error, got: " result)) + (is (contains? kinds :unexpected-presence) + (str "expected :unexpected-presence, got kinds=" kinds)))) + +(deftest phantom-from-info-zrem-still-flagged + ;; An :info ZREM is the ONLY history contact + ;; with a member (no ZADD/ZINCRBY ever). Because completed-mutation- + ;; window defaults :removed? to true on :info ZREMs (for uncertainty + ;; accounting), the checker must NOT treat ZREM as proof the member + ;; ever existed. A read observing the member present must be flagged + ;; as :unexpected-presence. Since setup! clears the key at test + ;; start, every observed member must trace back to a successful (or + ;; in-flight) ZADD/ZINCRBY -- never to a ZREM. + (let [history [;; ZREM of a member that was never added. Invoked + ;; concurrently with the read, response eventually + ;; lost (:info). No ZADD/ZINCRBY anywhere in history. + {:type :invoke :process 0 :f :zrem :value "phantom" :index 0} + {:type :invoke :process 1 :f :zrange-all :index 1} + ;; Read observes the phantom present at some score. + {:type :ok :process 1 :f :zrange-all + :value [["phantom" 7.0]] :index 2} + {:type :info :process 0 :f :zrem :value "phantom" :index 3}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected :unexpected-presence for phantom, got: " result)) + (is (contains? kinds :unexpected-presence) + (str "expected :unexpected-presence, got kinds=" kinds)))) + +(deftest stale-read-after-committed-zrem-is-flagged + ;; Once a ZADD and a subsequent real (:removed? true) ZREM + ;; have BOTH committed (with no concurrent re-add), a later read that + ;; still sees the member must be rejected as a stale read. + (let [history [;; Add then remove m1 — both committed before any read. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zrem :value "m1" :index 2} + {:type :ok :process 0 :f :zrem :value ["m1" true] :index 3} + ;; Stale read: m1 somehow still appears. + {:type :invoke :process 1 :f :zrange-all :index 4} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 1.0]] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) (str "expected stale-read error, got: " result)) + (is (contains? kinds :unexpected-presence) + (str "expected :unexpected-presence, got kinds=" kinds)))) + +(deftest superseded-committed-score-is-not-allowed + ;; A ZADD committed BEFORE another ZADD for the same + ;; member whose invoke strictly followed it should not be treated as + ;; a valid post-state score. Only the latest committed score (plus + ;; concurrent in-flight) may be observed. + (let [history [;; ZADD m1 1 commits first ... + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ... then ZADD m1 2 is invoked strictly after, and + ;; also commits before the read. + {:type :invoke :process 0 :f :zadd :value ["m1" 2] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 2] :index 3} + ;; Read observing the SUPERSEDED score 1.0 — invalid. + {:type :invoke :process 1 :f :zrange-all :index 4} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 1.0]] :index 5}] + result (run-checker history)] + (is (not (:valid? result)) + (str "expected superseded-score mismatch, got: " result)))) + +;; --------------------------------------------------------------------------- +;; Infinity score parsing +;; --------------------------------------------------------------------------- + +;; --------------------------------------------------------------------------- +;; Linearization of concurrent ops / uncertain mutations +;; --------------------------------------------------------------------------- + +(deftest concurrent-zadd-zrem-both-completed-accepts-either-outcome + ;; ZADD and ZREM for the same member whose invoke/complete + ;; windows overlap (both commit before the read) have ambiguous + ;; linearization. A linearizable store may serialize either one last, + ;; so the read legitimately observes EITHER [["m1" 1.0]] OR []. + ;; Windows: ZADD=[inv=0, cmp=3], ZREM=[inv=1, cmp=2] — overlap. + (let [base [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :invoke :process 1 :f :zrem :value "m1" :index 1} + {:type :ok :process 1 :f :zrem :value ["m1" true] :index 2} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 3}] + hist-present (conj base + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [["m1" 1.0]] :index 5}) + hist-absent (conj base + {:type :invoke :process 2 :f :zrange-all :index 4} + {:type :ok :process 2 :f :zrange-all + :value [] :index 5})] + (is (:valid? (run-checker hist-present)) + "expected read observing ZADD's outcome to be accepted") + (is (:valid? (run-checker hist-absent)) + "expected read observing ZREM's outcome (absent) to be accepted"))) + +(deftest info-zrem-concurrent-with-read-allows-missing-member + ;; An :info ZREM that might have applied before a read + ;; leaves the member's presence uncertain. A ZRANGE that omits the + ;; member must NOT be flagged as a completeness failure. + (let [history [;; ZADD m1 committed before the read. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZREM m1 is invoked, then the read runs, then the + ;; ZREM response is lost (:info). The ZREM may or may + ;; not have applied server-side. + {:type :invoke :process 1 :f :zrem :value "m1" :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + {:type :ok :process 0 :f :zrange-all :value [] :index 4} + {:type :info :process 1 :f :zrem :value "m1" :index 5}] + result (run-checker history)] + (is (:valid? result) + (str "expected :info ZREM to make absence acceptable, got: " result)))) + +(deftest info-zincrby-does-not-flag-zrangebyscore-completeness + ;; A pre-read :info / concurrent ZINCRBY leaves the + ;; resulting score unknown. ZRANGEBYSCORE filtering on a specific + ;; range must not flag the member as missing, because its score may + ;; have moved outside [lo, hi]. + (let [history [;; ZADD m1 at score 1 (committed well before read). + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZINCRBY m1 +100 — response lost (:info) BEFORE read. + {:type :invoke :process 1 :f :zincrby :value ["m1" 100] :index 2} + {:type :info :process 1 :f :zincrby :value ["m1" 100] :index 3} + ;; ZRANGEBYSCORE [0, 10] — m1's score is uncertain; it + ;; may now be 101 (outside range) or still 1. The + ;; checker must not complain about m1's absence. + {:type :invoke :process 2 :f :zrangebyscore :value [0.0 10.0] :index 4} + {:type :ok :process 2 :f :zrangebyscore + :value {:bounds [0.0 10.0] :members []} :index 5}] + result (run-checker history)] + (is (:valid? result) + (str "expected :info ZINCRBY to skip completeness, got: " result)))) + +(deftest zrangebyscore-completeness-still-detects-truly-missing-member + ;; Sanity: when NO uncertainty exists and a model member's committed + ;; score is definitively inside [lo, hi], its absence IS flagged. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 1} + {:type :invoke :process 0 :f :zrangebyscore :value [0.0 10.0] :index 2} + {:type :ok :process 0 :f :zrangebyscore + :value {:bounds [0.0 10.0] :members []} :index 3}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) (str "expected missing-member-range, got: " result)) + (is (contains? kinds :missing-member-range) + (str "expected :missing-member-range, got kinds=" kinds)))) + +(deftest missing-member-range-error-reports-full-allowed-score-set + ;; When a member is missing from ZRANGEBYSCORE and multiple + ;; concurrent writers make several scores admissible, the error map + ;; must surface the FULL admissible set under :allowed (matching + ;; :score-mismatch-range convention) rather than pick an arbitrary + ;; single :expected-score. + (let [history [;; Two concurrent ZADDs for m1, both committed before + ;; the read. Either score (5 or 6) is admissible, both + ;; fall inside [0, 10]. + {:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :invoke :process 1 :f :zadd :value ["m1" 6] :index 1} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 2} + {:type :ok :process 1 :f :zadd :value ["m1" 6] :index 3} + ;; Read sees nothing -- m1 must appear under any + ;; admissible linearization, so :missing-member-range + ;; fires. + {:type :invoke :process 2 :f :zrangebyscore :value [0.0 10.0] :index 4} + {:type :ok :process 2 :f :zrangebyscore + :value {:bounds [0.0 10.0] :members []} :index 5}] + result (run-checker history) + miss (first (filter #(= :missing-member-range (:kind %)) + (:first-errors result)))] + (is (not (:valid? result))) + (is (some? miss) + (str "expected a :missing-member-range error, got: " (:first-errors result))) + (is (contains? miss :allowed) + (str "error map must include :allowed, got: " miss)) + (is (= #{5.0 6.0} (set (:allowed miss))) + (str "expected :allowed to contain both admissible scores, got: " miss)) + ;; :expected-score is retained for backcompat but MUST be nil when + ;; there is more than one admissible score, to avoid misleading + ;; consumers that read it. + (is (nil? (:expected-score miss)) + (str "expected :expected-score nil for multi-score set, got: " miss)))) + +(deftest missing-member-range-error-keeps-expected-score-when-single + ;; Backcompat: when the admissible set has exactly one score, + ;; :expected-score matches it. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 1} + {:type :invoke :process 0 :f :zrangebyscore :value [0.0 10.0] :index 2} + {:type :ok :process 0 :f :zrangebyscore + :value {:bounds [0.0 10.0] :members []} :index 3}] + result (run-checker history) + miss (first (filter #(= :missing-member-range (:kind %)) + (:first-errors result)))] + (is (some? miss)) + (is (= #{5.0} (set (:allowed miss)))) + (is (= 5.0 (:expected-score miss))))) + +(deftest zrange-completeness-still-detects-truly-missing-member + ;; Sanity: no uncertainty, member committed-present. Absence flagged. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 5] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 5] :index 1} + {:type :invoke :process 0 :f :zrange-all :index 2} + {:type :ok :process 0 :f :zrange-all :value [] :index 3}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) (str "expected missing-member, got: " result)) + (is (contains? kinds :missing-member) + (str "expected :missing-member, got kinds=" kinds)))) + +;; --------------------------------------------------------------------------- +;; Failed-concurrent mutations must not contribute to uncertainty +;; --------------------------------------------------------------------------- + +(deftest failed-concurrent-zrem-does-not-relax-must-be-present + ;; A concurrent ZREM that completes with :fail did NOT take + ;; effect. Its window must NOT make the member's presence uncertain, + ;; so a read that omits the member (which was ZADDed and committed + ;; beforehand) must be flagged as :missing-member. + (let [history [;; ZADD m1 committed before the read. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZREM m1 is invoked concurrently with the read but + ;; ultimately :fails -- the op definitively did NOT run. + {:type :invoke :process 1 :f :zrem :value "m1" :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + ;; Read observes m1 ABSENT -- without the fix, the + ;; failed ZREM would admit this as "possibly removed". + {:type :ok :process 0 :f :zrange-all :value [] :index 4} + {:type :fail :process 1 :f :zrem :value "m1" :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected :missing-member despite failed ZREM, got: " result)) + (is (contains? kinds :missing-member) + (str "expected :missing-member, got kinds=" kinds)))) + +(deftest failed-concurrent-zadd-does-not-contribute-allowed-score + ;; A concurrent ZADD that completes with :fail did NOT take + ;; effect. Its score must NOT be added to the allowed set. A read + ;; observing that score must be flagged as :score-mismatch rather than + ;; being waved through by the failed ZADD's ghost contribution. + (let [history [;; ZADD m1 at score 1 committed before the read. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; Concurrent ZADD m1 at score 42 ultimately :fails. + {:type :invoke :process 1 :f :zadd :value ["m1" 42] :index 2} + {:type :invoke :process 0 :f :zrange-all :index 3} + ;; Read observes score 42 -- only valid if the failed + ;; ZADD is (incorrectly) admitted as a possible write. + {:type :ok :process 0 :f :zrange-all + :value [["m1" 42.0]] :index 4} + {:type :fail :process 1 :f :zadd :value ["m1" 42] :index 5}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected :score-mismatch ignoring failed ZADD, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +;; --------------------------------------------------------------------------- +;; Chained committed ZINCRBYs: only the linearization-chain tail is a +;; valid final score. Earlier intermediate return values are stale. +;; --------------------------------------------------------------------------- + +(deftest chained-committed-zincrby-rejects-stale-intermediate + ;; Sequential committed ZINCRBYs form a forced linearization + ;; chain. The first ZINCRBY's return value is an intermediate that no + ;; post-chain read may observe. Expect :score-mismatch on the stale + ;; intermediate. + (let [history [;; Start with score 1. + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZINCRBY +2 -> ok=3 (committed). + {:type :invoke :process 0 :f :zincrby :value ["m1" 2] :index 2} + {:type :ok :process 0 :f :zincrby :value ["m1" 3.0] :index 3} + ;; ZINCRBY +3 -> ok=6 (committed). Strictly follows the + ;; previous ZINCRBY in real time (invoke 4 > complete 3). + {:type :invoke :process 0 :f :zincrby :value ["m1" 3] :index 4} + {:type :ok :process 0 :f :zincrby :value ["m1" 6.0] :index 5} + ;; Read AFTER the whole chain observes the stale + ;; intermediate 3.0 -- not admissible under any + ;; linearization. + {:type :invoke :process 1 :f :zrange-all :index 6} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 3.0]] :index 7}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected stale-intermediate to be flagged, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +(deftest chained-committed-zincrby-accepts-latest + ;; Same history but the read observes the LATEST chain tail + ;; (6.0) -- accept as valid. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 0 :f :zincrby :value ["m1" 2] :index 2} + {:type :ok :process 0 :f :zincrby :value ["m1" 3.0] :index 3} + {:type :invoke :process 0 :f :zincrby :value ["m1" 3] :index 4} + {:type :ok :process 0 :f :zincrby :value ["m1" 6.0] :index 5} + {:type :invoke :process 1 :f :zrange-all :index 6} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 6.0]] :index 7}] + result (run-checker history)] + (is (:valid? result) + (str "expected chain-tail score to be accepted, got: " result)))) + +(deftest concurrent-zincrby-both-admissible + ;; Two overlapping-in-real-time ZINCRBYs whose returned + ;; scores are BOTH candidate final states under some linearization. + ;; Read observing either value must be accepted. + ;; Overlap: A=[inv=2, cmp=5], B=[inv=3, cmp=4]. + (let [base [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + ;; B completes first with ok=4 (delta applied to score 1). + {:type :ok :process 2 :f :zincrby :value ["m1" 4.0] :index 4} + ;; A completes with ok=6 (delta applied after B). + {:type :ok :process 1 :f :zincrby :value ["m1" 6.0] :index 5}] + read-a (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 4.0]] :index 7}) + read-b (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 6.0]] :index 7})] + (is (:valid? (run-checker read-a)) + "expected B's return value (4.0) admissible under overlap") + (is (:valid? (run-checker read-b)) + "expected A's return value (6.0) admissible under overlap"))) + +(deftest zadd-resets-zincrby-chain + ;; A committed ZADD between ZINCRBYs resets the chain -- + ;; subsequent ZINCRBYs operate on the new ZADD'd value. The pre-reset + ;; ZINCRBY score is NOT a valid read after the chain completes. + (let [base [;; ZADD m1 1 + {:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; ZINCRBY +2 -> 3 + {:type :invoke :process 0 :f :zincrby :value ["m1" 2] :index 2} + {:type :ok :process 0 :f :zincrby :value ["m1" 3.0] :index 3} + ;; ZADD m1 10 -- chain reset to absolute value. + {:type :invoke :process 0 :f :zadd :value ["m1" 10] :index 4} + {:type :ok :process 0 :f :zadd :value ["m1" 10] :index 5} + ;; ZINCRBY +1 -> 11 + {:type :invoke :process 0 :f :zincrby :value ["m1" 1] :index 6} + {:type :ok :process 0 :f :zincrby :value ["m1" 11.0] :index 7}] + read-ok (conj base + {:type :invoke :process 1 :f :zrange-all :index 8} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 11.0]] :index 9}) + read-bad (conj base + {:type :invoke :process 1 :f :zrange-all :index 8} + {:type :ok :process 1 :f :zrange-all + :value [["m1" 3.0]] :index 9})] + (is (:valid? (run-checker read-ok)) + "expected post-reset chain tail (11.0) to be accepted") + (is (not (:valid? (run-checker read-bad))) + "expected pre-reset intermediate (3.0) to be flagged"))) + +;; --------------------------------------------------------------------------- +;; unknown-score? gate: restricted to :info ZINCRBYs only. Two concurrent +;; :ok ZINCRBYs with known return values do NOT make the score check +;; unknown -- their return values pin the linearization and the +;; admissible score set is constrained by :scores (candidates + uncertain +;; ok return values). +;; --------------------------------------------------------------------------- + +(deftest two-ok-concurrent-zincrbys-reject-impossible-score + ;; Two overlapping :ok ZINCRBYs with known return values + ;; (3 and 6) constrain the admissible post-chain read set to {1,3,6}. + ;; A read of 999 is impossible under any linearization; the checker + ;; must flag it as :score-mismatch (no longer swallowed by the old + ;; "2+ uncertain zincrbys -> unknown-score?" shortcut). + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; Two concurrent ZINCRBYs. Windows overlap the read. + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + {:type :ok :process 1 :f :zincrby :value ["m1" 3.0] :index 4} + {:type :ok :process 2 :f :zincrby :value ["m1" 6.0] :index 5} + ;; Read observes an impossible score. + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 999.0]] :index 7}] + result (run-checker history) + kinds (set (map :kind (:first-errors result)))] + (is (not (:valid? result)) + (str "expected impossible score to be flagged, got: " result)) + (is (contains? kinds :score-mismatch) + (str "expected :score-mismatch, got kinds=" kinds)))) + +(deftest two-ok-concurrent-zincrbys-accept-known-chain-tail + ;; Same concurrent :ok ZINCRBY history, but the read + ;; observes one of the recorded return values. Both 3.0 (linearization + ;; where +3 ran first, then +2) and 6.0 (the other order) must be + ;; accepted as valid. + (let [base [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + {:type :ok :process 1 :f :zincrby :value ["m1" 3.0] :index 4} + {:type :ok :process 2 :f :zincrby :value ["m1" 6.0] :index 5}] + read-6 (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 6.0]] :index 7}) + read-3 (conj base + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 3.0]] :index 7})] + (is (:valid? (run-checker read-6)) + "expected 6.0 (one linearization) to be accepted") + (is (:valid? (run-checker read-3)) + "expected 3.0 (other linearization) to be accepted"))) + +(deftest info-plus-ok-concurrent-zincrby-stays-unknown + ;; When at least one concurrent ZINCRBY is :info (unknown + ;; post-op score), the strict score check must be relaxed regardless + ;; of how many other :ok ZINCRBYs are concurrent. Any numeric score + ;; must be accepted for this read. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :ok :process 0 :f :zadd :value ["m1" 1] :index 1} + ;; One :info ZINCRBY (unknown outcome). + {:type :invoke :process 1 :f :zincrby :value ["m1" 2] :index 2} + ;; One :ok ZINCRBY with known return value. + {:type :invoke :process 2 :f :zincrby :value ["m1" 3] :index 3} + {:type :ok :process 2 :f :zincrby :value ["m1" 4.0] :index 4} + {:type :info :process 1 :f :zincrby :value ["m1" 2] + :error "conn reset" :index 5} + ;; Read observes an "arbitrary" score -- admissible + ;; because the :info ZINCRBY could have produced any + ;; post-op state visible to the read. + {:type :invoke :process 3 :f :zrange-all :index 6} + {:type :ok :process 3 :f :zrange-all + :value [["m1" 42.0]] :index 7}]] + (is (:valid? (run-checker history)) + "expected any score accepted when :info ZINCRBY is concurrent"))) + +;; --------------------------------------------------------------------------- +;; Infinity score parsing +;; --------------------------------------------------------------------------- + +;; --------------------------------------------------------------------------- +;; Client setup! / invoke! robustness +;; --------------------------------------------------------------------------- + +(deftest setup-bang-hard-fails-when-conn-spec-missing + ;; If open! failed to populate :conn-spec (unresolvable + ;; host, etc.), setup! MUST throw rather than silently proceed. + ;; Continuing with a no-op setup would leave stale data from a prior + ;; run under zset-key and risk false-positive checker verdicts from + ;; that dirty state. We want Jepsen to surface the failure. + (let [client (workload/->ElastickvRedisZSetSafetyClient {} nil)] + (is (thrown-with-msg? clojure.lang.ExceptionInfo + #":conn-spec is missing" + (client/setup! client {})) + "setup! must throw ex-info when :conn-spec is nil"))) + +(deftest setup-bang-hard-fails-when-cleanup-del-errors + ;; Even when :conn-spec is populated, if the actual + ;; cleanup (DEL zset-key) fails or errors, setup! must NOT silently + ;; proceed. Stale data surviving from a prior run under zset-key + ;; would cause false-positive safety verdicts. Propagate the + ;; exception so Jepsen aborts the run. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "127.0.0.1" + :port 1 ; guaranteed unreachable + :timeout-ms 100}})] + (is (thrown? Throwable + (client/setup! client {})) + "setup! must propagate cleanup failures, not swallow them"))) + +(deftest zincrby-invoke-handles-nil-response + ;; If car/wcar for ZINCRBY returns nil (error reply + ;; coerced, unexpected protocol edge), the op must complete as :info + ;; with a structured :error, not throw NumberFormatException from + ;; parse-double-safe swallowing (str nil) -> "nil". + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zincrby :value ["m1" 1.0] :process 0 :index 0}] + (with-redefs [workload/zincrby! (fn [& _] nil)] + (let [result (client/invoke! client {} op)] + (is (= :info (:type result)) + (str "expected :info on nil ZINCRBY reply, got: " result)) + (is (some? (:error result)) + (str "expected :error to be populated, got: " result)))))) + +(deftest zincrby-invoke-handles-unexpected-response + ;; Same guard, but for a non-string / non-number reply. + ;; Must complete :info rather than propagate a parse failure. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zincrby :value ["m1" 1.0] :process 0 :index 0}] + (with-redefs [workload/zincrby! (fn [& _] {:unexpected :map})] + (let [result (client/invoke! client {} op)] + (is (= :info (:type result)) + (str "expected :info on unexpected ZINCRBY reply, got: " result)))))) + +(deftest zincrby-invoke-accepts-numeric-response + ;; Sanity: some Carmine versions coerce integer scores to longs. + ;; Must parse cleanly to a Double and complete :ok. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zincrby :value ["m1" 1.0] :process 0 :index 0}] + (with-redefs [workload/zincrby! (fn [& _] 7)] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result)) + (str "expected :ok on numeric reply, got: " result)) + (is (= ["m1" 7.0] (:value result))))))) + +;; --------------------------------------------------------------------------- +;; Vacuous-pass guard +;; --------------------------------------------------------------------------- + +(deftest empty-history-is-unknown-not-valid + ;; An empty history (e.g. Redis unreachable, all ops + ;; downgraded to :info) produces zero successful reads. The checker + ;; MUST NOT return :valid? true in that case -- that would be a + ;; false-green. Expect :valid? :unknown plus a diagnostic :reason. + (let [result (run-checker [])] + (is (= :unknown (:valid? result)) + (str "expected :unknown on empty history, got: " result)) + (is (string? (:reason result)) + (str "expected :reason to be populated, got: " result)) + (is (zero? (:reads result))))) + +(deftest all-info-history-is-unknown-not-valid + ;; A run where every operation was downgraded to :info + ;; (Redis unreachable / every read timed out) still has read-pairs + ;; filtered down to zero :ok reads. Must surface as :valid? :unknown. + (let [history [{:type :invoke :process 0 :f :zadd :value ["m1" 1] :index 0} + {:type :info :process 0 :f :zadd :value ["m1" 1] :index 1 + :error "conn refused"} + {:type :invoke :process 0 :f :zrange-all :index 2} + {:type :info :process 0 :f :zrange-all :index 3 + :error "conn refused"}] + result (run-checker history)] + (is (= :unknown (:valid? result)) + (str "expected :unknown when all ops are :info, got: " result)) + (is (string? (:reason result))))) + +(deftest one-successful-read-is-enough-to-validate + ;; Sanity: the vacuous-pass guard must only kick in when there are + ;; ZERO successful reads. A single :ok read with no errors is a + ;; legitimate :valid? true. + (let [history [{:type :invoke :process 0 :f :zrange-all :index 0} + {:type :ok :process 0 :f :zrange-all :value [] :index 1}] + result (run-checker history)] + (is (true? (:valid? result)) + (str "expected :valid? true with one :ok read, got: " result)))) + +(deftest zrem-invoke-handles-nil-response + ;; If car/wcar for ZREM returns nil (protocol edge, + ;; closed connection, etc.), `(long nil)` would throw NPE and the + ;; op would be logged as a generic failure via the general Exception + ;; handler. Guard with `(or removed 0)` so the op resolves cleanly + ;; as :ok [member false]. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "ghost" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] nil)] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result)) + (str "expected :ok on nil ZREM reply, got: " result)) + (is (= ["ghost" false] (:value result)) + (str "expected removed? false on nil reply, got: " result)))))) + +(deftest zrem-invoke-handles-numeric-response + ;; Sanity: ZREM's normal reply is an integer count. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "m1" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] 1)] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result))) + (is (= ["m1" true] (:value result))))))) + +(deftest zrem-invoke-handles-string-response + ;; Some Carmine versions / RESP3 codepaths surface ZREM's count as a + ;; numeric string rather than a Long. `(long \"1\")` would throw + ;; ClassCastException; the coerce-zrem-count helper must parse the + ;; string and the op must still resolve as :ok with removed? true. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "m1" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] "1")] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result)) + (str "expected :ok on string ZREM reply, got: " result)) + (is (= ["m1" true] (:value result)) + (str "expected removed? true on string \"1\", got: " result)))))) + +(deftest zrem-invoke-handles-string-zero-response + ;; String "0" must be parsed as removed? false (not truthy because it + ;; is a non-empty string). + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "ghost" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] "0")] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result))) + (is (= ["ghost" false] (:value result))))))) + +(deftest zrem-invoke-handles-bytes-response + ;; Raw-bytes numeric reply (RESP binary-safe path) must be decoded as + ;; UTF-8 and parsed. "1" => removed? true. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "m1" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] (.getBytes "1" "UTF-8"))] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result))) + (is (= ["m1" true] (:value result))))))) + +(deftest zrem-invoke-handles-unparseable-response + ;; Totally unexpected reply shape: treat as 0 (nothing removed) rather + ;; than throw. Keeps the op :ok and records removed? false. + (let [client (workload/->ElastickvRedisZSetSafetyClient + {} {:pool {} :spec {:host "localhost" :port 6379 + :timeout-ms 100}}) + op {:type :invoke :f :zrem :value "ghost" :process 0 :index 0}] + (with-redefs [workload/zrem! (fn [& _] :weird)] + (let [result (client/invoke! client {} op)] + (is (= :ok (:type result))) + (is (= ["ghost" false] (:value result))))))) + +(deftest parse-withscores-handles-inf-strings + ;; Redis returns "inf" / "+inf" / "-inf" for infinite + ;; ZSET scores. Double/parseDouble expects "Infinity"; the workload's + ;; parser must normalize both encodings instead of throwing. + (let [flat ["m-pos" "inf" + "m-pos2" "+inf" + "m-neg" "-inf" + "m-jvm" "Infinity" + "m-num" "3.5"] + parsed (#'workload/parse-withscores flat)] + (is (= [["m-pos" Double/POSITIVE_INFINITY] + ["m-pos2" Double/POSITIVE_INFINITY] + ["m-neg" Double/NEGATIVE_INFINITY] + ["m-jvm" Double/POSITIVE_INFINITY] + ["m-num" 3.5]] + parsed)))) + +(deftest parse-withscores-rejects-odd-length-payload + ;; A WITHSCORES reply with a dangling member (odd element count) is a + ;; protocol violation. The checker must surface it rather than let + ;; `(partition 2)` silently drop evidence of the anomaly. + (is (thrown-with-msg? + clojure.lang.ExceptionInfo + #"odd element count" + (#'workload/parse-withscores ["m1" "1.0" "m2-dangling"]))))