Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 105 additions & 5 deletions adapter/redis_compat_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,23 @@ func normalizeStartTS(ts uint64) uint64 {
// correctly detect collections whose fields were all deleted (metadata key exists but
// no member keys) or newly created collections that only have delta keys.
func (r *RedisServer) detectWideColumnType(ctx context.Context, key []byte, readTS uint64) (redisValueType, error) {
if typ, err := r.detectWideColumnTypeSkipZSet(ctx, key, readTS); err != nil || typ != redisTypeNone {
return typ, err
}
if found, err := r.wideColumnTypeExists(ctx, key, readTS, store.ZSetMemberScanPrefix, store.ZSetMetaKey, store.ZSetMetaDeltaScanPrefix); err != nil {
return redisTypeNone, err
} else if found {
return redisTypeZSet, nil
}
return redisTypeNone, nil
}

// detectWideColumnTypeSkipZSet runs the wide-column hash / set probes
// only. Callers that have already eliminated ZSet (e.g.
// rawZSetPhysTypeAt's fallback after the member-prefix and meta/delta
// scans came back empty) use this to avoid re-issuing the three
// ZSet-side probes detectWideColumnType would otherwise repeat.
func (r *RedisServer) detectWideColumnTypeSkipZSet(ctx context.Context, key []byte, readTS uint64) (redisValueType, error) {
if found, err := r.wideColumnTypeExists(ctx, key, readTS, store.HashFieldScanPrefix, store.HashMetaKey, store.HashMetaDeltaScanPrefix); err != nil {
return redisTypeNone, err
} else if found {
Expand All @@ -71,11 +88,6 @@ func (r *RedisServer) detectWideColumnType(ctx context.Context, key []byte, read
} else if found {
return redisTypeSet, nil
}
if found, err := r.wideColumnTypeExists(ctx, key, readTS, store.ZSetMemberScanPrefix, store.ZSetMetaKey, store.ZSetMetaDeltaScanPrefix); err != nil {
return redisTypeNone, err
} else if found {
return redisTypeZSet, nil
}
return redisTypeNone, nil
}

Expand Down Expand Up @@ -110,6 +122,94 @@ func (r *RedisServer) prefixExistsAt(ctx context.Context, prefix []byte, readTS
return len(kvs) > 0, nil
}

// zsetStorageHint bundles the storage-probe results needed by zsetState so that
// the member-prefix scan is performed at most once.
type zsetStorageHint struct {
physType redisValueType // type ignoring TTL expiry
logType redisValueType // type after TTL check (redisTypeNone if expired)
memberFound bool // true when the member-prefix scan returned ≥1 key
}

// zsetStorageHintAt probes storage for ZSet data at readTS.
// It performs the ZSetMemberScanPrefix scan only once, so callers (zsetState)
// do not need a second ScanAt to determine wide-column vs legacy-blob format.
// For non-ZSet keys the full rawKeyTypeAt path is used for correctness.
func (r *RedisServer) zsetStorageHintAt(ctx context.Context, key []byte, readTS uint64) (zsetStorageHint, error) {
physType, memberFound, err := r.rawZSetPhysTypeAt(ctx, key, readTS)
if err != nil {
return zsetStorageHint{}, err
}
h := zsetStorageHint{physType: physType, logType: physType, memberFound: memberFound}
if physType != redisTypeNone {
// Known-physType TTL probe: for collection types the embedded
// TTL only lives under the collection-side key, so we can skip
// the `!redis|str|` probe (nonStringOnly=true). For any string
// types we reach via rawZSetPhysTypeAt's fallback (mixed
// corruption), we still need the string-side check too.
expired, err := r.hasExpired(ctx, key, readTS, isNonStringCollectionType(physType))
if err != nil {
return zsetStorageHint{}, err
}
if expired {
h.logType = redisTypeNone
}
}
return h, nil
}

// rawZSetPhysTypeAt detects whether a ZSet exists physically at readTS (ignoring
// TTL) and whether the detection was via the member-prefix scan (memberFound).
// For non-ZSet keys it falls back to rawKeyTypeAt.
func (r *RedisServer) rawZSetPhysTypeAt(ctx context.Context, key []byte, readTS uint64) (redisValueType, bool, error) {
// Single scan: probe member prefix (common path).
memberFound, err := r.prefixExistsAt(ctx, store.ZSetMemberScanPrefix(key), readTS)
if err != nil {
return redisTypeNone, false, err
}
if memberFound {
return redisTypeZSet, true, nil
}
// No member rows — check meta/delta for a memberless wide-column ZSet.
zsetOnly, err := r.zsetMetaOrDeltaExistsAt(ctx, key, readTS)
if err != nil {
return redisTypeNone, false, err
}
if zsetOnly {
return redisTypeZSet, false, nil
}
// Not a wide-column ZSet — probe other types without re-scanning
// the three ZSet-side prefixes we already ruled out above.
physType, err := r.rawKeyTypeAtSkipZSet(ctx, key, readTS)
return physType, false, err
}
Comment on lines +160 to +184
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Look for existing tests that exercise the mixed string+zset corruption path
# to confirm whether the new zsetStorageHintAt ordering is already covered.
rg -nP --type=go -C3 '(string.*wins|mixed.*corrupt|replaceWithStringTxn|redisStrKey.*ZSet|ZSet.*redisStrKey)'
rg -nP --type=go -C2 '\brawKeyTypeAt\b' -g '*_test.go'

Repository: bootjp/elastickv

Length of output: 3049


🏁 Script executed:

# Search for tests involving mixed string and zset scenarios
rg -nP --type=go 'test.*[Zz]set.*[Ss]tring|test.*[Ss]tring.*[Zz]set' -A 5 -B 2

Repository: bootjp/elastickv

Length of output: 42


🏁 Script executed:

# Look for tests that may cover corruption recovery with multiple types
rg -nP --type=go -l 'corruption|mixed.*type' -g '*_test.go'

Repository: bootjp/elastickv

Length of output: 205


🏁 Script executed:

# Find zsetState function in lua_context and check its implementation
rg -nP --type=go -A 15 'func.*zsetState' adapter/

Repository: bootjp/elastickv

Length of output: 3070


🏁 Script executed:

# Compare ordering in rawZSetPhysTypeAt vs rawKeyTypeAt implementations
rg -nP --type=go -A 30 'func.*rawKeyTypeAt\(' adapter/redis_compat_helpers.go

Repository: bootjp/elastickv

Length of output: 1368


🏁 Script executed:

# Check if hasHigherPriorityStringEncoding is used to enforce string priority elsewhere
rg -nP --type=go 'hasHigherPriorityStringEncoding' -B 2 -A 5

Repository: bootjp/elastickv

Length of output: 7127


🏁 Script executed:

# Find zsetStorageHintAt implementation
rg -nP --type=go -A 20 'func.*zsetStorageHintAt'

Repository: bootjp/elastickv

Length of output: 1693


🏁 Script executed:

# Search for ZSet tests that might cover string+zset corruption
rg -nP --type=go -B 3 -A 10 'TestRedis.*[Zz]set.*[Cc]orrup|TestRedis.*[Dd]ual.*[Zz]set' -g '*_test.go'

Repository: bootjp/elastickv

Length of output: 42


🏁 Script executed:

# Check if there are any ZADD tests that exercise mixed encoding
rg -nP --type=go -B 2 -A 10 'ZADD.*string|string.*ZADD|corrupt.*ZADD' -g '*_test.go'

Repository: bootjp/elastickv

Length of output: 42


🏁 Script executed:

# Find all references to rawZSetPhysTypeAt to understand its usage
rg -nP --type=go 'rawZSetPhysTypeAt' -B 3 -A 3

Repository: bootjp/elastickv

Length of output: 3592


ZSet-first probe diverges from documented "string wins" tiebreaker.

rawKeyTypeAt documents the invariant (lines 219–225) that when a key carries both a string AND a collection entry (corruption recovery only), string probes run first so string wins. rawZSetPhysTypeAt inverts this: the ZSetMemberScanPrefix scan runs before rawKeyTypeAtSkipZSet, so when both a ZSet member row and a !redis|str|/bare/HLL entry exist for the same key, rawZSetPhysTypeAt returns redisTypeZSet where rawKeyTypeAt would return redisTypeString.

The comment in zsetStorageHintAt (lines 147–148) acknowledges this: "For any string types we reach via rawZSetPhysTypeAt's fallback (mixed corruption), we still need the string-side check too."

Downstream impact in redis_lua_context.go zsetState: if the hint returns redisTypeZSet for a corrupted key, the cached type bypasses the type-mismatch check that the full keyType() path would catch, allowing ZADD to proceed on a key whose actual keyType is string.

Unlike hash+string (guarded by TestRedis_HGET_FastPathGuardDualEncoding), there is no ZSet+string corruption test. Consider either adding one or reordering the probes in rawZSetPhysTypeAt to check strings before the member-prefix scan to preserve the documented invariant.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/redis_compat_helpers.go` around lines 160 - 184, rawZSetPhysTypeAt
currently probes ZSet members first (via prefixExistsAt/ ZSetMemberScanPrefix)
which violates the documented "string wins" invariant; fix by checking
string/other non-ZSet key types before the member-prefix scan: call
rawKeyTypeAtSkipZSet (or the string-specific probe path used by rawKeyTypeAt)
prior to prefixExistsAt, keep the existing zsetMetaOrDeltaExistsAt check, and
return redisTypeString if the pre-scan finds a string; update zsetStorageHintAt
and any caching assumptions (e.g., redis_lua_context.go zsetState / keyType())
accordingly and add a unit test for ZSet+string corruption to ensure the
string-wins behavior.


// rawKeyTypeAtSkipZSet is rawKeyTypeAt minus the ZSet wide-column
// probes. Used by rawZSetPhysTypeAt when the caller has already
// confirmed no ZSet member / meta / delta rows exist, so the three
// ZSet probes inside detectWideColumnType would be pure redundant I/O.
func (r *RedisServer) rawKeyTypeAtSkipZSet(ctx context.Context, key []byte, readTS uint64) (redisValueType, error) {
if typ, found, err := r.probeStringTypes(ctx, key, readTS); err != nil || found {
return typ, err
}
if typ, found, err := r.probeListType(ctx, key, readTS); err != nil || found {
return typ, err
}
if typ, err := r.detectWideColumnTypeSkipZSet(ctx, key, readTS); err != nil || typ != redisTypeNone {
return typ, err
}
return r.probeLegacyCollectionTypes(ctx, key, readTS)
}

// zsetMetaOrDeltaExistsAt reports whether a ZSet meta key or delta prefix exists.
func (r *RedisServer) zsetMetaOrDeltaExistsAt(ctx context.Context, key []byte, readTS uint64) (bool, error) {
if exists, err := r.store.ExistsAt(ctx, store.ZSetMetaKey(key), readTS); err != nil {
return false, errors.WithStack(err)
} else if exists {
return true, nil
}
return r.prefixExistsAt(ctx, store.ZSetMetaDeltaScanPrefix(key), readTS)
}

// rawKeyTypeAt classifies the Redis encoding under which key is
// currently stored. Probes run string-first because real workloads are
// dominated by string keys: a live new-format string resolves in 1
Expand Down
76 changes: 51 additions & 25 deletions adapter/redis_lua_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,44 +669,47 @@ func (c *luaScriptContext) zsetState(key []byte) (*luaZSetState, error) {
}
c.zsets[k] = st

typ, err := c.keyType(key)
if errors.Is(err, store.ErrKeyNotFound) {
st.loaded = true
return st, nil
// Script-local type override: if a prior DEL / SET / type-change
// in this Eval has cached a different type for this key, the
// pre-script storage probe in zsetStorageHintAt would leak stale
// state (e.g. returning a live ZSet after an in-script SET).
// Mirror the keyType() fallback order: cachedType first, storage
// second.
if st, handled, err := c.zsetStateFromCachedType(key, st); handled {
return st, err
}

// zsetStorageHintAt performs the member-prefix scan once, so we never need
// a second ScanAt after type detection.
h, err := c.server.zsetStorageHintAt(context.Background(), key, c.startTS)
if err != nil {
return nil, err
}
if typ == redisTypeNone {
// Check whether physical ZSet data exists despite the key being logically
// absent (TTL-expired). If so, mark physicallyExistsAtStart so that
// zsetCommitPlan can force a full commit to clean up stale storage rows.
rawTyp, rawErr := c.server.rawKeyTypeAt(context.Background(), key, c.startTS)
if rawErr != nil {
return nil, rawErr
}
st.physicallyExistsAtStart = rawTyp == redisTypeZSet
switch {
case h.physType == redisTypeNone:
// Truly absent: brand-new key.
st.loaded = true
return st, nil
}
if typ != redisTypeZSet {
case h.logType == redisTypeNone:
// TTL-expired: physical ZSet data exists but the key is logically absent.
// physicallyExistsAtStart tells zsetCommitPlan to force a full commit so
// deleteLogicalKeyElems can remove the stale storage rows.
st.physicallyExistsAtStart = true
st.loaded = true
return st, nil
case h.logType != redisTypeZSet:
return nil, wrongTypeError()
}

// Probe for wide-column format with a single seek instead of a full scan.
prefix := store.ZSetMemberScanPrefix(key)
kvs, err := c.server.store.ScanAt(context.Background(), prefix, store.PrefixScanEnd(prefix), 1, c.startTS)
if err != nil {
return nil, errors.WithStack(err)
}
// Key is a live ZSet.
st.loaded = true
st.exists = true
if len(kvs) > 0 {
if h.memberFound {
// Member keys present → wide-column format (not legacy blob).
return st, nil
}
// No !zs|mem| rows does not imply legacy-blob: a wide-column ZSet that had
// all members deleted leaves only meta/delta keys behind. Probe the legacy
// blob key directly to distinguish these cases.
// No !zs|mem| rows: either wide-column with all members deleted (meta/delta
// only) or legacy blob. Probe the legacy blob key to distinguish.
blobExists, err := c.server.store.ExistsAt(context.Background(), redisZSetKey(key), c.startTS)
if err != nil {
return nil, errors.WithStack(err)
Expand All @@ -715,6 +718,29 @@ func (c *luaScriptContext) zsetState(key []byte) (*luaZSetState, error) {
return st, nil
}

// zsetStateFromCachedType applies the script-local cached type (if
// any) and returns (state, handled=true, err) when the answer is
// determined entirely by in-script mutations. Returns handled=false
// when the caller must fall through to the storage probe.
func (c *luaScriptContext) zsetStateFromCachedType(key []byte, st *luaZSetState) (*luaZSetState, bool, error) {
typ, cached := c.cachedType(key)
if !cached {
return nil, false, nil
}
if typ == redisTypeNone {
st.loaded = true
return st, true, nil
}
if typ == redisTypeZSet {
// Live ZSet via in-script mutation — loadZSetAt under
// ensureZSetLoaded will surface the pre-script storage rows.
st.loaded = true
st.exists = true
return st, true, nil
}
return nil, true, wrongTypeError()
}

// ensureZSetLoaded loads all ZSet members from storage if not already loaded,
// merging any in-script delta (added/removed) into st.members.
func (c *luaScriptContext) ensureZSetLoaded(st *luaZSetState, key []byte) error {
Expand Down
Loading