From e8057f055dcced20512a6dd5aeef47e4428e3892 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 10 Apr 2026 16:03:15 -0700 Subject: [PATCH 1/5] feat: instrument RedisCluster and ClusterPipeline for record/replay --- .../redis/e2e-tests/docker-compose.yml | 44 +++++ .../redis/e2e-tests/src/app.py | 77 +++++++- .../redis/e2e-tests/src/test_requests.py | 5 + .../instrumentation/redis/instrumentation.py | 167 ++++++++++++++++++ 4 files changed, 292 insertions(+), 1 deletion(-) diff --git a/drift/instrumentation/redis/e2e-tests/docker-compose.yml b/drift/instrumentation/redis/e2e-tests/docker-compose.yml index 84b269c..d39c3ae 100644 --- a/drift/instrumentation/redis/e2e-tests/docker-compose.yml +++ b/drift/instrumentation/redis/e2e-tests/docker-compose.yml @@ -7,6 +7,46 @@ services: timeout: 5s retries: 5 + redis-node-1: + image: redis:7-alpine + command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-1 --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 5 + + redis-node-2: + image: redis:7-alpine + command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-2 --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 5 + + redis-node-3: + image: redis:7-alpine + command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-3 --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 5 + + redis-cluster-init: + image: redis:7-alpine + depends_on: + redis-node-1: + condition: service_healthy + redis-node-2: + condition: service_healthy + redis-node-3: + condition: service_healthy + command: > + sh -c "redis-cli --cluster create redis-node-1:6379 redis-node-2:6379 redis-node-3:6379 --cluster-replicas 0 --cluster-yes" + restart: "no" + app: build: context: ../../../.. @@ -21,10 +61,14 @@ services: depends_on: redis: condition: service_healthy + redis-cluster-init: + condition: service_completed_successfully environment: - PORT=8000 - REDIS_HOST=redis - REDIS_PORT=6379 + - REDIS_CLUSTER_HOST=redis-node-1 + - REDIS_CLUSTER_PORT=6379 - TUSK_ANALYTICS_DISABLED=1 - PYTHONUNBUFFERED=1 - TUSK_USE_RUST_CORE=${TUSK_USE_RUST_CORE:-0} diff --git a/drift/instrumentation/redis/e2e-tests/src/app.py b/drift/instrumentation/redis/e2e-tests/src/app.py index 112ab11..b673bb1 100644 --- a/drift/instrumentation/redis/e2e-tests/src/app.py +++ b/drift/instrumentation/redis/e2e-tests/src/app.py @@ -15,11 +15,27 @@ app = Flask(__name__) -# Initialize Redis client +# Initialize Redis client (standalone) redis_client = redis.Redis( host=os.getenv("REDIS_HOST", "redis"), port=int(os.getenv("REDIS_PORT", "6379")), db=0, decode_responses=True ) +# Lazy-initialized RedisCluster client. +# Initialized on first use so that cluster connection failures don't prevent +# the rest of the app from starting (avoids breaking non-cluster tests). +_cluster_client = None + + +def get_cluster_client(): + global _cluster_client + if _cluster_client is None: + _cluster_client = redis.RedisCluster( + host=os.getenv("REDIS_CLUSTER_HOST", "redis-node-1"), + port=int(os.getenv("REDIS_CLUSTER_PORT", "6379")), + decode_responses=True, + ) + return _cluster_client + @app.route("/health") def health(): @@ -245,6 +261,65 @@ def test_transaction_watch(): return jsonify({"error": str(e)}), 500 +@app.route("/test/cluster-set-get", methods=["GET"]) +def test_cluster_set_get(): + """Test SET/GET through RedisCluster. + + RedisCluster.execute_command is a separate method from Redis.execute_command + """ + try: + cluster = get_cluster_client() + cluster.set("test:cluster:key1", "cluster_value1") + cluster.set("test:cluster:key2", "cluster_value2") + val1 = cluster.get("test:cluster:key1") + val2 = cluster.get("test:cluster:key2") + cluster.delete("test:cluster:key1", "test:cluster:key2") + return jsonify({"success": True, "val1": val1, "val2": val2}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/cluster-incr", methods=["GET"]) +def test_cluster_incr(): + """Test INCR through RedisCluster. + + Exercises the cluster's execute_command path for a simple write operation. + """ + try: + cluster = get_cluster_client() + cluster.set("test:cluster:counter", "0") + cluster.incr("test:cluster:counter") + cluster.incr("test:cluster:counter") + cluster.incr("test:cluster:counter") + val = cluster.get("test:cluster:counter") + cluster.delete("test:cluster:counter") + return jsonify({"success": True, "value": int(val)}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/cluster-pipeline", methods=["GET"]) +def test_cluster_pipeline(): + """Test pipeline through RedisCluster (ClusterPipeline). + + ClusterPipeline.execute_command is also a separate path. + All keys use the same hash tag {cp} to ensure they land on the same slot, + which is required for cluster pipelines. + """ + try: + cluster = get_cluster_client() + pipe = cluster.pipeline() + pipe.set("{cp}:key1", "pipe_val1") + pipe.set("{cp}:key2", "pipe_val2") + pipe.get("{cp}:key1") + pipe.get("{cp}:key2") + results = pipe.execute() + cluster.delete("{cp}:key1", "{cp}:key2") + return jsonify({"success": True, "results": results}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + if __name__ == "__main__": sdk.mark_app_as_ready() app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/redis/e2e-tests/src/test_requests.py b/drift/instrumentation/redis/e2e-tests/src/test_requests.py index b923668..95835d3 100644 --- a/drift/instrumentation/redis/e2e-tests/src/test_requests.py +++ b/drift/instrumentation/redis/e2e-tests/src/test_requests.py @@ -44,4 +44,9 @@ make_request("GET", "/test/transaction-watch") + # RedisCluster operations + make_request("GET", "/test/cluster-set-get") + make_request("GET", "/test/cluster-incr") + make_request("GET", "/test/cluster-pipeline") + print_request_summary() diff --git a/drift/instrumentation/redis/instrumentation.py b/drift/instrumentation/redis/instrumentation.py index 0b671da..b5e1cab 100644 --- a/drift/instrumentation/redis/instrumentation.py +++ b/drift/instrumentation/redis/instrumentation.py @@ -212,6 +212,110 @@ async def patched_async_pipeline_immediate_execute(pipeline_self, *args, **kwarg except ImportError: logger.debug("redis.asyncio not available") + # Patch RedisCluster.execute_command (separate class, NOT a subclass of Redis) + try: + from redis.cluster import ClusterPipeline as SyncClusterPipeline + from redis.cluster import RedisCluster + + if hasattr(RedisCluster, "execute_command"): + original_cluster_execute = RedisCluster.execute_command + instrumentation = self + + def patched_cluster_execute_command(cluster_self, *args, **kwargs): + """Patched RedisCluster.execute_command method.""" + sdk = TuskDrift.get_instance() + + if sdk.mode == TuskDriftMode.DISABLED: + return original_cluster_execute(cluster_self, *args, **kwargs) + + return instrumentation._traced_execute_command( + cluster_self, + original_cluster_execute, + sdk, + args, + kwargs, + ) + + RedisCluster.execute_command = patched_cluster_execute_command + logger.debug("redis.cluster.RedisCluster.execute_command instrumented") + + # Patch ClusterPipeline.execute + if hasattr(SyncClusterPipeline, "execute"): + original_cluster_pipeline_execute = SyncClusterPipeline.execute + instrumentation = self + + def patched_cluster_pipeline_execute(pipeline_self, *args, **kwargs): + """Patched ClusterPipeline.execute method.""" + sdk = TuskDrift.get_instance() + + if sdk.mode == TuskDriftMode.DISABLED: + return original_cluster_pipeline_execute(pipeline_self, *args, **kwargs) + + return instrumentation._traced_cluster_pipeline_execute( + pipeline_self, + original_cluster_pipeline_execute, + sdk, + args, + kwargs, + ) + + SyncClusterPipeline.execute = patched_cluster_pipeline_execute + logger.debug("redis.cluster.ClusterPipeline.execute instrumented") + except ImportError: + logger.debug("redis.cluster not available") + + # Patch async RedisCluster.execute_command + try: + from redis.asyncio.cluster import ClusterPipeline as AsyncClusterPipeline + from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster + + if hasattr(AsyncRedisCluster, "execute_command"): + original_async_cluster_execute = AsyncRedisCluster.execute_command + instrumentation = self + + async def patched_async_cluster_execute_command(cluster_self, *args, **kwargs): + """Patched async RedisCluster.execute_command method.""" + sdk = TuskDrift.get_instance() + + if sdk.mode == TuskDriftMode.DISABLED: + return await original_async_cluster_execute(cluster_self, *args, **kwargs) + + return await instrumentation._traced_async_execute_command( + cluster_self, + original_async_cluster_execute, + sdk, + args, + kwargs, + ) + + AsyncRedisCluster.execute_command = patched_async_cluster_execute_command + logger.debug("redis.asyncio.cluster.RedisCluster.execute_command instrumented") + + # Patch async ClusterPipeline.execute + if hasattr(AsyncClusterPipeline, "execute"): + original_async_cluster_pipeline_execute = AsyncClusterPipeline.execute + instrumentation = self + + async def patched_async_cluster_pipeline_execute(pipeline_self, *args, **kwargs): + """Patched async ClusterPipeline.execute method.""" + sdk = TuskDrift.get_instance() + + if sdk.mode == TuskDriftMode.DISABLED: + return await original_async_cluster_pipeline_execute(pipeline_self, *args, **kwargs) + + return await instrumentation._traced_async_cluster_pipeline_execute( + pipeline_self, + original_async_cluster_pipeline_execute, + sdk, + args, + kwargs, + ) + + AsyncClusterPipeline.execute = patched_async_cluster_pipeline_execute + logger.debug("redis.asyncio.cluster.ClusterPipeline.execute instrumented") + except ImportError: + logger.debug("redis.asyncio.cluster not available") + def _traced_execute_command( self, redis_client: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict ) -> Any: @@ -519,6 +623,59 @@ async def _traced_async_pipeline_execute( pipeline, original_execute, sdk, args, kwargs, command_str, command_stack ) + def _traced_cluster_pipeline_execute( + self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict + ) -> Any: + """Traced ClusterPipeline.execute method. + + Must snapshot the command queue before calling original_execute because + ClusterPipeline.execute resets the queue in its finally block. + """ + if sdk.mode == TuskDriftMode.DISABLED: + return original_execute(pipeline, *args, **kwargs) + + command_stack = list(self._get_pipeline_commands(pipeline)) + command_str = self._format_pipeline_commands(command_stack) + + def original_call(): + return original_execute(pipeline, *args, **kwargs) + + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: self._replay_pipeline_execute(sdk, command_str, command_stack), + no_op_request_handler=lambda: [], + is_server_request=False, + ) + + return handle_record_mode( + original_function_call=original_call, + record_mode_handler=lambda is_pre_app_start: self._record_pipeline_execute( + pipeline, original_execute, sdk, args, kwargs, command_str, command_stack, is_pre_app_start + ), + span_kind=OTelSpanKind.CLIENT, + ) + + async def _traced_async_cluster_pipeline_execute( + self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict + ) -> Any: + """Traced async ClusterPipeline.execute method.""" + if sdk.mode == TuskDriftMode.DISABLED: + return await original_execute(pipeline, *args, **kwargs) + + command_stack = list(self._get_pipeline_commands(pipeline)) + command_str = self._format_pipeline_commands(command_stack) + + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: self._replay_pipeline_execute(sdk, command_str, command_stack), + no_op_request_handler=lambda: [], + is_server_request=False, + ) + + return await self._record_async_pipeline_execute( + pipeline, original_execute, sdk, args, kwargs, command_str, command_stack + ) + async def _record_async_pipeline_execute( self, pipeline: Any, @@ -699,6 +856,16 @@ def _get_pipeline_commands(self, pipeline: Any) -> list: return pipeline.command_stack elif hasattr(pipeline, "_command_stack"): return pipeline._command_stack + # ClusterPipeline stores commands in _execution_strategy._command_queue + elif hasattr(pipeline, "_execution_strategy"): + strategy = pipeline._execution_strategy + if hasattr(strategy, "_command_queue"): + return strategy._command_queue + elif hasattr(strategy, "command_queue"): + return strategy.command_queue + # Async ClusterPipeline stores commands in _command_queue directly + elif hasattr(pipeline, "_command_queue"): + return pipeline._command_queue except AttributeError: pass return [] From 91f07d5a41ec5bf41f8fa8888098929533b9efd6 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 10 Apr 2026 16:35:11 -0700 Subject: [PATCH 2/5] Address comment --- .../instrumentation/redis/instrumentation.py | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/drift/instrumentation/redis/instrumentation.py b/drift/instrumentation/redis/instrumentation.py index b5e1cab..45680b5 100644 --- a/drift/instrumentation/redis/instrumentation.py +++ b/drift/instrumentation/redis/instrumentation.py @@ -850,22 +850,29 @@ def _format_command(self, args: tuple) -> str: return " ".join(parts) def _get_pipeline_commands(self, pipeline: Any) -> list: - """Extract commands from pipeline.""" + """Extract commands from pipeline. + + ClusterPipeline has an always-empty ``command_stack`` attribute while + the real commands live in ``_execution_strategy._command_queue``, so we + check ``_execution_strategy`` first to avoid returning the empty list. + """ try: - if hasattr(pipeline, "command_stack"): - return pipeline.command_stack - elif hasattr(pipeline, "_command_stack"): - return pipeline._command_stack - # ClusterPipeline stores commands in _execution_strategy._command_queue - elif hasattr(pipeline, "_execution_strategy"): + # ClusterPipeline stores commands in _execution_strategy._command_queue. + # Must be checked before command_stack because ClusterPipeline also + # has a command_stack attr that is always empty (redis-py #3703). + if hasattr(pipeline, "_execution_strategy"): strategy = pipeline._execution_strategy if hasattr(strategy, "_command_queue"): return strategy._command_queue - elif hasattr(strategy, "command_queue"): + if hasattr(strategy, "command_queue"): return strategy.command_queue # Async ClusterPipeline stores commands in _command_queue directly - elif hasattr(pipeline, "_command_queue"): + if hasattr(pipeline, "_command_queue"): return pipeline._command_queue + if hasattr(pipeline, "command_stack"): + return pipeline.command_stack + if hasattr(pipeline, "_command_stack"): + return pipeline._command_stack except AttributeError: pass return [] From ec0858f29b61fe4bac439bd8dcd996edc14b5676 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 10 Apr 2026 17:34:25 -0700 Subject: [PATCH 3/5] fix float-to-int deserialization in Redis instrumentation for REPLAY mode --- .../redis/e2e-tests/src/app.py | 19 +++++++++++++++++++ .../redis/e2e-tests/src/test_requests.py | 3 +++ .../instrumentation/redis/instrumentation.py | 2 ++ 3 files changed, 24 insertions(+) diff --git a/drift/instrumentation/redis/e2e-tests/src/app.py b/drift/instrumentation/redis/e2e-tests/src/app.py index b673bb1..33846e2 100644 --- a/drift/instrumentation/redis/e2e-tests/src/app.py +++ b/drift/instrumentation/redis/e2e-tests/src/app.py @@ -319,6 +319,25 @@ def test_cluster_pipeline(): except Exception as e: return jsonify({"error": str(e)}), 500 +@app.route("/test/cluster-pipeline-transaction", methods=["GET"]) +def test_cluster_pipeline_transaction(): + """Test ClusterPipeline with transaction mode. + + Uses TransactionStrategy internally. All keys must be on the same slot. + """ + try: + cluster = get_cluster_client() + pipe = cluster.pipeline(transaction=True) + pipe.set("{tx}:key1", "txval1") + pipe.set("{tx}:key2", "txval2") + pipe.get("{tx}:key1") + pipe.get("{tx}:key2") + results = pipe.execute() + cluster.delete("{tx}:key1", "{tx}:key2") + return jsonify({"success": True, "results": results}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + if __name__ == "__main__": sdk.mark_app_as_ready() diff --git a/drift/instrumentation/redis/e2e-tests/src/test_requests.py b/drift/instrumentation/redis/e2e-tests/src/test_requests.py index 95835d3..01698a4 100644 --- a/drift/instrumentation/redis/e2e-tests/src/test_requests.py +++ b/drift/instrumentation/redis/e2e-tests/src/test_requests.py @@ -49,4 +49,7 @@ make_request("GET", "/test/cluster-incr") make_request("GET", "/test/cluster-pipeline") + # RedisCluster + make_request("GET", "/test/cluster-pipeline-transaction") + print_request_summary() diff --git a/drift/instrumentation/redis/instrumentation.py b/drift/instrumentation/redis/instrumentation.py index 45680b5..b3e6558 100644 --- a/drift/instrumentation/redis/instrumentation.py +++ b/drift/instrumentation/redis/instrumentation.py @@ -1122,6 +1122,8 @@ def _deserialize_value(self, value: Any) -> Any: return {k: self._deserialize_value(v) for k, v in value.items()} elif isinstance(value, list): return [self._deserialize_value(v) for v in value] + elif isinstance(value, float) and value.is_integer(): + return int(value) return value def _deserialize_response(self, mock_data: dict[str, Any]) -> Any: From 066eaa2c1be4c8272181ad48a74215f7b5e6b869 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 10 Apr 2026 18:15:36 -0700 Subject: [PATCH 4/5] fix lint --- drift/instrumentation/redis/e2e-tests/src/app.py | 1 + 1 file changed, 1 insertion(+) diff --git a/drift/instrumentation/redis/e2e-tests/src/app.py b/drift/instrumentation/redis/e2e-tests/src/app.py index 33846e2..c148910 100644 --- a/drift/instrumentation/redis/e2e-tests/src/app.py +++ b/drift/instrumentation/redis/e2e-tests/src/app.py @@ -319,6 +319,7 @@ def test_cluster_pipeline(): except Exception as e: return jsonify({"error": str(e)}), 500 + @app.route("/test/cluster-pipeline-transaction", methods=["GET"]) def test_cluster_pipeline_transaction(): """Test ClusterPipeline with transaction mode. From 0fca7134a3734e7343e7726cd579393fcde8f12b Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 10 Apr 2026 18:44:43 -0700 Subject: [PATCH 5/5] fix --- drift/instrumentation/redis/instrumentation.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/drift/instrumentation/redis/instrumentation.py b/drift/instrumentation/redis/instrumentation.py index b3e6558..5b30362 100644 --- a/drift/instrumentation/redis/instrumentation.py +++ b/drift/instrumentation/redis/instrumentation.py @@ -1091,7 +1091,9 @@ def _serialize_value(self, value: Any) -> Any: return {"__bytes__": True, "encoding": "utf8", "value": decoded} except UnicodeDecodeError: return {"__bytes__": True, "encoding": "hex", "value": value.hex()} - elif isinstance(value, (str, int, float, bool, type(None))): + elif isinstance(value, float): + return {"__float__": True, "value": value} + elif isinstance(value, (str, int, bool, type(None))): return value elif isinstance(value, (list, tuple)): return [self._serialize_value(v) for v in value] @@ -1109,7 +1111,6 @@ def _serialize_response(self, response: Any) -> Any: def _deserialize_value(self, value: Any) -> Any: """Deserialize a value, converting typed wrappers back to original types.""" if isinstance(value, dict): - # Check for bytes wrapper if value.get("__bytes__") is True: encoding = value.get("encoding") data = value.get("value", "") @@ -1118,6 +1119,8 @@ def _deserialize_value(self, value: Any) -> Any: elif encoding == "hex": return bytes.fromhex(data) return data # fallback + if value.get("__float__") is True: + return float(value.get("value", 0.0)) # Recursively deserialize dict values return {k: self._deserialize_value(v) for k, v in value.items()} elif isinstance(value, list):