diff --git a/adapter/redis_compat_helpers.go b/adapter/redis_compat_helpers.go index 61426b78..e1dfa2fe 100644 --- a/adapter/redis_compat_helpers.go +++ b/adapter/redis_compat_helpers.go @@ -61,6 +61,23 @@ func normalizeStartTS(ts uint64) uint64 { // correctly detect collections whose fields were all deleted (metadata key exists but // no member keys) or newly created collections that only have delta keys. func (r *RedisServer) detectWideColumnType(ctx context.Context, key []byte, readTS uint64) (redisValueType, error) { + if typ, err := r.detectWideColumnTypeSkipZSet(ctx, key, readTS); err != nil || typ != redisTypeNone { + return typ, err + } + if found, err := r.wideColumnTypeExists(ctx, key, readTS, store.ZSetMemberScanPrefix, store.ZSetMetaKey, store.ZSetMetaDeltaScanPrefix); err != nil { + return redisTypeNone, err + } else if found { + return redisTypeZSet, nil + } + return redisTypeNone, nil +} + +// detectWideColumnTypeSkipZSet runs the wide-column hash / set probes +// only. Callers that have already eliminated ZSet (e.g. +// rawZSetPhysTypeAt's fallback after the member-prefix and meta/delta +// scans came back empty) use this to avoid re-issuing the three +// ZSet-side probes detectWideColumnType would otherwise repeat. +func (r *RedisServer) detectWideColumnTypeSkipZSet(ctx context.Context, key []byte, readTS uint64) (redisValueType, error) { if found, err := r.wideColumnTypeExists(ctx, key, readTS, store.HashFieldScanPrefix, store.HashMetaKey, store.HashMetaDeltaScanPrefix); err != nil { return redisTypeNone, err } else if found { @@ -71,11 +88,6 @@ func (r *RedisServer) detectWideColumnType(ctx context.Context, key []byte, read } else if found { return redisTypeSet, nil } - if found, err := r.wideColumnTypeExists(ctx, key, readTS, store.ZSetMemberScanPrefix, store.ZSetMetaKey, store.ZSetMetaDeltaScanPrefix); err != nil { - return redisTypeNone, err - } else if found { - return redisTypeZSet, nil - } return redisTypeNone, nil } @@ -110,6 +122,94 @@ func (r *RedisServer) prefixExistsAt(ctx context.Context, prefix []byte, readTS return len(kvs) > 0, nil } +// zsetStorageHint bundles the storage-probe results needed by zsetState so that +// the member-prefix scan is performed at most once. +type zsetStorageHint struct { + physType redisValueType // type ignoring TTL expiry + logType redisValueType // type after TTL check (redisTypeNone if expired) + memberFound bool // true when the member-prefix scan returned ≥1 key +} + +// zsetStorageHintAt probes storage for ZSet data at readTS. +// It performs the ZSetMemberScanPrefix scan only once, so callers (zsetState) +// do not need a second ScanAt to determine wide-column vs legacy-blob format. +// For non-ZSet keys the full rawKeyTypeAt path is used for correctness. +func (r *RedisServer) zsetStorageHintAt(ctx context.Context, key []byte, readTS uint64) (zsetStorageHint, error) { + physType, memberFound, err := r.rawZSetPhysTypeAt(ctx, key, readTS) + if err != nil { + return zsetStorageHint{}, err + } + h := zsetStorageHint{physType: physType, logType: physType, memberFound: memberFound} + if physType != redisTypeNone { + // Known-physType TTL probe: for collection types the embedded + // TTL only lives under the collection-side key, so we can skip + // the `!redis|str|` probe (nonStringOnly=true). For any string + // types we reach via rawZSetPhysTypeAt's fallback (mixed + // corruption), we still need the string-side check too. + expired, err := r.hasExpired(ctx, key, readTS, isNonStringCollectionType(physType)) + if err != nil { + return zsetStorageHint{}, err + } + if expired { + h.logType = redisTypeNone + } + } + return h, nil +} + +// rawZSetPhysTypeAt detects whether a ZSet exists physically at readTS (ignoring +// TTL) and whether the detection was via the member-prefix scan (memberFound). +// For non-ZSet keys it falls back to rawKeyTypeAt. +func (r *RedisServer) rawZSetPhysTypeAt(ctx context.Context, key []byte, readTS uint64) (redisValueType, bool, error) { + // Single scan: probe member prefix (common path). + memberFound, err := r.prefixExistsAt(ctx, store.ZSetMemberScanPrefix(key), readTS) + if err != nil { + return redisTypeNone, false, err + } + if memberFound { + return redisTypeZSet, true, nil + } + // No member rows — check meta/delta for a memberless wide-column ZSet. + zsetOnly, err := r.zsetMetaOrDeltaExistsAt(ctx, key, readTS) + if err != nil { + return redisTypeNone, false, err + } + if zsetOnly { + return redisTypeZSet, false, nil + } + // Not a wide-column ZSet — probe other types without re-scanning + // the three ZSet-side prefixes we already ruled out above. + physType, err := r.rawKeyTypeAtSkipZSet(ctx, key, readTS) + return physType, false, err +} + +// rawKeyTypeAtSkipZSet is rawKeyTypeAt minus the ZSet wide-column +// probes. Used by rawZSetPhysTypeAt when the caller has already +// confirmed no ZSet member / meta / delta rows exist, so the three +// ZSet probes inside detectWideColumnType would be pure redundant I/O. +func (r *RedisServer) rawKeyTypeAtSkipZSet(ctx context.Context, key []byte, readTS uint64) (redisValueType, error) { + if typ, found, err := r.probeStringTypes(ctx, key, readTS); err != nil || found { + return typ, err + } + if typ, found, err := r.probeListType(ctx, key, readTS); err != nil || found { + return typ, err + } + if typ, err := r.detectWideColumnTypeSkipZSet(ctx, key, readTS); err != nil || typ != redisTypeNone { + return typ, err + } + return r.probeLegacyCollectionTypes(ctx, key, readTS) +} + +// zsetMetaOrDeltaExistsAt reports whether a ZSet meta key or delta prefix exists. +func (r *RedisServer) zsetMetaOrDeltaExistsAt(ctx context.Context, key []byte, readTS uint64) (bool, error) { + if exists, err := r.store.ExistsAt(ctx, store.ZSetMetaKey(key), readTS); err != nil { + return false, errors.WithStack(err) + } else if exists { + return true, nil + } + return r.prefixExistsAt(ctx, store.ZSetMetaDeltaScanPrefix(key), readTS) +} + // rawKeyTypeAt classifies the Redis encoding under which key is // currently stored. Probes run string-first because real workloads are // dominated by string keys: a live new-format string resolves in 1 diff --git a/adapter/redis_lua_context.go b/adapter/redis_lua_context.go index b9436136..35470d04 100644 --- a/adapter/redis_lua_context.go +++ b/adapter/redis_lua_context.go @@ -669,44 +669,47 @@ func (c *luaScriptContext) zsetState(key []byte) (*luaZSetState, error) { } c.zsets[k] = st - typ, err := c.keyType(key) - if errors.Is(err, store.ErrKeyNotFound) { - st.loaded = true - return st, nil + // Script-local type override: if a prior DEL / SET / type-change + // in this Eval has cached a different type for this key, the + // pre-script storage probe in zsetStorageHintAt would leak stale + // state (e.g. returning a live ZSet after an in-script SET). + // Mirror the keyType() fallback order: cachedType first, storage + // second. + if st, handled, err := c.zsetStateFromCachedType(key, st); handled { + return st, err } + + // zsetStorageHintAt performs the member-prefix scan once, so we never need + // a second ScanAt after type detection. + h, err := c.server.zsetStorageHintAt(context.Background(), key, c.startTS) if err != nil { return nil, err } - if typ == redisTypeNone { - // Check whether physical ZSet data exists despite the key being logically - // absent (TTL-expired). If so, mark physicallyExistsAtStart so that - // zsetCommitPlan can force a full commit to clean up stale storage rows. - rawTyp, rawErr := c.server.rawKeyTypeAt(context.Background(), key, c.startTS) - if rawErr != nil { - return nil, rawErr - } - st.physicallyExistsAtStart = rawTyp == redisTypeZSet + switch { + case h.physType == redisTypeNone: + // Truly absent: brand-new key. st.loaded = true return st, nil - } - if typ != redisTypeZSet { + case h.logType == redisTypeNone: + // TTL-expired: physical ZSet data exists but the key is logically absent. + // physicallyExistsAtStart tells zsetCommitPlan to force a full commit so + // deleteLogicalKeyElems can remove the stale storage rows. + st.physicallyExistsAtStart = true + st.loaded = true + return st, nil + case h.logType != redisTypeZSet: return nil, wrongTypeError() } - // Probe for wide-column format with a single seek instead of a full scan. - prefix := store.ZSetMemberScanPrefix(key) - kvs, err := c.server.store.ScanAt(context.Background(), prefix, store.PrefixScanEnd(prefix), 1, c.startTS) - if err != nil { - return nil, errors.WithStack(err) - } + // Key is a live ZSet. st.loaded = true st.exists = true - if len(kvs) > 0 { + if h.memberFound { + // Member keys present → wide-column format (not legacy blob). return st, nil } - // No !zs|mem| rows does not imply legacy-blob: a wide-column ZSet that had - // all members deleted leaves only meta/delta keys behind. Probe the legacy - // blob key directly to distinguish these cases. + // No !zs|mem| rows: either wide-column with all members deleted (meta/delta + // only) or legacy blob. Probe the legacy blob key to distinguish. blobExists, err := c.server.store.ExistsAt(context.Background(), redisZSetKey(key), c.startTS) if err != nil { return nil, errors.WithStack(err) @@ -715,6 +718,29 @@ func (c *luaScriptContext) zsetState(key []byte) (*luaZSetState, error) { return st, nil } +// zsetStateFromCachedType applies the script-local cached type (if +// any) and returns (state, handled=true, err) when the answer is +// determined entirely by in-script mutations. Returns handled=false +// when the caller must fall through to the storage probe. +func (c *luaScriptContext) zsetStateFromCachedType(key []byte, st *luaZSetState) (*luaZSetState, bool, error) { + typ, cached := c.cachedType(key) + if !cached { + return nil, false, nil + } + if typ == redisTypeNone { + st.loaded = true + return st, true, nil + } + if typ == redisTypeZSet { + // Live ZSet via in-script mutation — loadZSetAt under + // ensureZSetLoaded will surface the pre-script storage rows. + st.loaded = true + st.exists = true + return st, true, nil + } + return nil, true, wrongTypeError() +} + // ensureZSetLoaded loads all ZSet members from storage if not already loaded, // merging any in-script delta (added/removed) into st.members. func (c *luaScriptContext) ensureZSetLoaded(st *luaZSetState, key []byte) error {