diff --git a/drift/instrumentation/redis/e2e-tests/docker-compose.yml b/drift/instrumentation/redis/e2e-tests/docker-compose.yml index 84b269c..d39c3ae 100644 --- a/drift/instrumentation/redis/e2e-tests/docker-compose.yml +++ b/drift/instrumentation/redis/e2e-tests/docker-compose.yml @@ -7,6 +7,46 @@ services: timeout: 5s retries: 5 + redis-node-1: + image: redis:7-alpine + command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-1 --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 5 + + redis-node-2: + image: redis:7-alpine + command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-2 --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 5 + + redis-node-3: + image: redis:7-alpine + command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --cluster-announce-hostname redis-node-3 --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 5 + + redis-cluster-init: + image: redis:7-alpine + depends_on: + redis-node-1: + condition: service_healthy + redis-node-2: + condition: service_healthy + redis-node-3: + condition: service_healthy + command: > + sh -c "redis-cli --cluster create redis-node-1:6379 redis-node-2:6379 redis-node-3:6379 --cluster-replicas 0 --cluster-yes" + restart: "no" + app: build: context: ../../../.. @@ -21,10 +61,14 @@ services: depends_on: redis: condition: service_healthy + redis-cluster-init: + condition: service_completed_successfully environment: - PORT=8000 - REDIS_HOST=redis - REDIS_PORT=6379 + - REDIS_CLUSTER_HOST=redis-node-1 + - REDIS_CLUSTER_PORT=6379 - TUSK_ANALYTICS_DISABLED=1 - PYTHONUNBUFFERED=1 - TUSK_USE_RUST_CORE=${TUSK_USE_RUST_CORE:-0} diff --git a/drift/instrumentation/redis/e2e-tests/src/app.py b/drift/instrumentation/redis/e2e-tests/src/app.py index 112ab11..c148910 100644 --- a/drift/instrumentation/redis/e2e-tests/src/app.py +++ b/drift/instrumentation/redis/e2e-tests/src/app.py @@ -15,11 +15,27 @@ app = Flask(__name__) -# Initialize Redis client +# Initialize Redis client (standalone) redis_client = redis.Redis( host=os.getenv("REDIS_HOST", "redis"), port=int(os.getenv("REDIS_PORT", "6379")), db=0, decode_responses=True ) +# Lazy-initialized RedisCluster client. +# Initialized on first use so that cluster connection failures don't prevent +# the rest of the app from starting (avoids breaking non-cluster tests). +_cluster_client = None + + +def get_cluster_client(): + global _cluster_client + if _cluster_client is None: + _cluster_client = redis.RedisCluster( + host=os.getenv("REDIS_CLUSTER_HOST", "redis-node-1"), + port=int(os.getenv("REDIS_CLUSTER_PORT", "6379")), + decode_responses=True, + ) + return _cluster_client + @app.route("/health") def health(): @@ -245,6 +261,85 @@ def test_transaction_watch(): return jsonify({"error": str(e)}), 500 +@app.route("/test/cluster-set-get", methods=["GET"]) +def test_cluster_set_get(): + """Test SET/GET through RedisCluster. + + RedisCluster.execute_command is a separate method from Redis.execute_command + """ + try: + cluster = get_cluster_client() + cluster.set("test:cluster:key1", "cluster_value1") + cluster.set("test:cluster:key2", "cluster_value2") + val1 = cluster.get("test:cluster:key1") + val2 = cluster.get("test:cluster:key2") + cluster.delete("test:cluster:key1", "test:cluster:key2") + return jsonify({"success": True, "val1": val1, "val2": val2}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/cluster-incr", methods=["GET"]) +def test_cluster_incr(): + """Test INCR through RedisCluster. + + Exercises the cluster's execute_command path for a simple write operation. + """ + try: + cluster = get_cluster_client() + cluster.set("test:cluster:counter", "0") + cluster.incr("test:cluster:counter") + cluster.incr("test:cluster:counter") + cluster.incr("test:cluster:counter") + val = cluster.get("test:cluster:counter") + cluster.delete("test:cluster:counter") + return jsonify({"success": True, "value": int(val)}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/cluster-pipeline", methods=["GET"]) +def test_cluster_pipeline(): + """Test pipeline through RedisCluster (ClusterPipeline). + + ClusterPipeline.execute_command is also a separate path. + All keys use the same hash tag {cp} to ensure they land on the same slot, + which is required for cluster pipelines. + """ + try: + cluster = get_cluster_client() + pipe = cluster.pipeline() + pipe.set("{cp}:key1", "pipe_val1") + pipe.set("{cp}:key2", "pipe_val2") + pipe.get("{cp}:key1") + pipe.get("{cp}:key2") + results = pipe.execute() + cluster.delete("{cp}:key1", "{cp}:key2") + return jsonify({"success": True, "results": results}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route("/test/cluster-pipeline-transaction", methods=["GET"]) +def test_cluster_pipeline_transaction(): + """Test ClusterPipeline with transaction mode. + + Uses TransactionStrategy internally. All keys must be on the same slot. + """ + try: + cluster = get_cluster_client() + pipe = cluster.pipeline(transaction=True) + pipe.set("{tx}:key1", "txval1") + pipe.set("{tx}:key2", "txval2") + pipe.get("{tx}:key1") + pipe.get("{tx}:key2") + results = pipe.execute() + cluster.delete("{tx}:key1", "{tx}:key2") + return jsonify({"success": True, "results": results}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + if __name__ == "__main__": sdk.mark_app_as_ready() app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/redis/e2e-tests/src/test_requests.py b/drift/instrumentation/redis/e2e-tests/src/test_requests.py index b923668..01698a4 100644 --- a/drift/instrumentation/redis/e2e-tests/src/test_requests.py +++ b/drift/instrumentation/redis/e2e-tests/src/test_requests.py @@ -44,4 +44,12 @@ make_request("GET", "/test/transaction-watch") + # RedisCluster operations + make_request("GET", "/test/cluster-set-get") + make_request("GET", "/test/cluster-incr") + make_request("GET", "/test/cluster-pipeline") + + # RedisCluster + make_request("GET", "/test/cluster-pipeline-transaction") + print_request_summary() diff --git a/drift/instrumentation/redis/instrumentation.py b/drift/instrumentation/redis/instrumentation.py index 0b671da..5b30362 100644 --- a/drift/instrumentation/redis/instrumentation.py +++ b/drift/instrumentation/redis/instrumentation.py @@ -212,6 +212,110 @@ async def patched_async_pipeline_immediate_execute(pipeline_self, *args, **kwarg except ImportError: logger.debug("redis.asyncio not available") + # Patch RedisCluster.execute_command (separate class, NOT a subclass of Redis) + try: + from redis.cluster import ClusterPipeline as SyncClusterPipeline + from redis.cluster import RedisCluster + + if hasattr(RedisCluster, "execute_command"): + original_cluster_execute = RedisCluster.execute_command + instrumentation = self + + def patched_cluster_execute_command(cluster_self, *args, **kwargs): + """Patched RedisCluster.execute_command method.""" + sdk = TuskDrift.get_instance() + + if sdk.mode == TuskDriftMode.DISABLED: + return original_cluster_execute(cluster_self, *args, **kwargs) + + return instrumentation._traced_execute_command( + cluster_self, + original_cluster_execute, + sdk, + args, + kwargs, + ) + + RedisCluster.execute_command = patched_cluster_execute_command + logger.debug("redis.cluster.RedisCluster.execute_command instrumented") + + # Patch ClusterPipeline.execute + if hasattr(SyncClusterPipeline, "execute"): + original_cluster_pipeline_execute = SyncClusterPipeline.execute + instrumentation = self + + def patched_cluster_pipeline_execute(pipeline_self, *args, **kwargs): + """Patched ClusterPipeline.execute method.""" + sdk = TuskDrift.get_instance() + + if sdk.mode == TuskDriftMode.DISABLED: + return original_cluster_pipeline_execute(pipeline_self, *args, **kwargs) + + return instrumentation._traced_cluster_pipeline_execute( + pipeline_self, + original_cluster_pipeline_execute, + sdk, + args, + kwargs, + ) + + SyncClusterPipeline.execute = patched_cluster_pipeline_execute + logger.debug("redis.cluster.ClusterPipeline.execute instrumented") + except ImportError: + logger.debug("redis.cluster not available") + + # Patch async RedisCluster.execute_command + try: + from redis.asyncio.cluster import ClusterPipeline as AsyncClusterPipeline + from redis.asyncio.cluster import RedisCluster as AsyncRedisCluster + + if hasattr(AsyncRedisCluster, "execute_command"): + original_async_cluster_execute = AsyncRedisCluster.execute_command + instrumentation = self + + async def patched_async_cluster_execute_command(cluster_self, *args, **kwargs): + """Patched async RedisCluster.execute_command method.""" + sdk = TuskDrift.get_instance() + + if sdk.mode == TuskDriftMode.DISABLED: + return await original_async_cluster_execute(cluster_self, *args, **kwargs) + + return await instrumentation._traced_async_execute_command( + cluster_self, + original_async_cluster_execute, + sdk, + args, + kwargs, + ) + + AsyncRedisCluster.execute_command = patched_async_cluster_execute_command + logger.debug("redis.asyncio.cluster.RedisCluster.execute_command instrumented") + + # Patch async ClusterPipeline.execute + if hasattr(AsyncClusterPipeline, "execute"): + original_async_cluster_pipeline_execute = AsyncClusterPipeline.execute + instrumentation = self + + async def patched_async_cluster_pipeline_execute(pipeline_self, *args, **kwargs): + """Patched async ClusterPipeline.execute method.""" + sdk = TuskDrift.get_instance() + + if sdk.mode == TuskDriftMode.DISABLED: + return await original_async_cluster_pipeline_execute(pipeline_self, *args, **kwargs) + + return await instrumentation._traced_async_cluster_pipeline_execute( + pipeline_self, + original_async_cluster_pipeline_execute, + sdk, + args, + kwargs, + ) + + AsyncClusterPipeline.execute = patched_async_cluster_pipeline_execute + logger.debug("redis.asyncio.cluster.ClusterPipeline.execute instrumented") + except ImportError: + logger.debug("redis.asyncio.cluster not available") + def _traced_execute_command( self, redis_client: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict ) -> Any: @@ -519,6 +623,59 @@ async def _traced_async_pipeline_execute( pipeline, original_execute, sdk, args, kwargs, command_str, command_stack ) + def _traced_cluster_pipeline_execute( + self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict + ) -> Any: + """Traced ClusterPipeline.execute method. + + Must snapshot the command queue before calling original_execute because + ClusterPipeline.execute resets the queue in its finally block. + """ + if sdk.mode == TuskDriftMode.DISABLED: + return original_execute(pipeline, *args, **kwargs) + + command_stack = list(self._get_pipeline_commands(pipeline)) + command_str = self._format_pipeline_commands(command_stack) + + def original_call(): + return original_execute(pipeline, *args, **kwargs) + + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: self._replay_pipeline_execute(sdk, command_str, command_stack), + no_op_request_handler=lambda: [], + is_server_request=False, + ) + + return handle_record_mode( + original_function_call=original_call, + record_mode_handler=lambda is_pre_app_start: self._record_pipeline_execute( + pipeline, original_execute, sdk, args, kwargs, command_str, command_stack, is_pre_app_start + ), + span_kind=OTelSpanKind.CLIENT, + ) + + async def _traced_async_cluster_pipeline_execute( + self, pipeline: Any, original_execute: Any, sdk: TuskDrift, args: tuple, kwargs: dict + ) -> Any: + """Traced async ClusterPipeline.execute method.""" + if sdk.mode == TuskDriftMode.DISABLED: + return await original_execute(pipeline, *args, **kwargs) + + command_stack = list(self._get_pipeline_commands(pipeline)) + command_str = self._format_pipeline_commands(command_stack) + + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: self._replay_pipeline_execute(sdk, command_str, command_stack), + no_op_request_handler=lambda: [], + is_server_request=False, + ) + + return await self._record_async_pipeline_execute( + pipeline, original_execute, sdk, args, kwargs, command_str, command_stack + ) + async def _record_async_pipeline_execute( self, pipeline: Any, @@ -693,11 +850,28 @@ def _format_command(self, args: tuple) -> str: return " ".join(parts) def _get_pipeline_commands(self, pipeline: Any) -> list: - """Extract commands from pipeline.""" + """Extract commands from pipeline. + + ClusterPipeline has an always-empty ``command_stack`` attribute while + the real commands live in ``_execution_strategy._command_queue``, so we + check ``_execution_strategy`` first to avoid returning the empty list. + """ try: + # ClusterPipeline stores commands in _execution_strategy._command_queue. + # Must be checked before command_stack because ClusterPipeline also + # has a command_stack attr that is always empty (redis-py #3703). + if hasattr(pipeline, "_execution_strategy"): + strategy = pipeline._execution_strategy + if hasattr(strategy, "_command_queue"): + return strategy._command_queue + if hasattr(strategy, "command_queue"): + return strategy.command_queue + # Async ClusterPipeline stores commands in _command_queue directly + if hasattr(pipeline, "_command_queue"): + return pipeline._command_queue if hasattr(pipeline, "command_stack"): return pipeline.command_stack - elif hasattr(pipeline, "_command_stack"): + if hasattr(pipeline, "_command_stack"): return pipeline._command_stack except AttributeError: pass @@ -917,7 +1091,9 @@ def _serialize_value(self, value: Any) -> Any: return {"__bytes__": True, "encoding": "utf8", "value": decoded} except UnicodeDecodeError: return {"__bytes__": True, "encoding": "hex", "value": value.hex()} - elif isinstance(value, (str, int, float, bool, type(None))): + elif isinstance(value, float): + return {"__float__": True, "value": value} + elif isinstance(value, (str, int, bool, type(None))): return value elif isinstance(value, (list, tuple)): return [self._serialize_value(v) for v in value] @@ -935,7 +1111,6 @@ def _serialize_response(self, response: Any) -> Any: def _deserialize_value(self, value: Any) -> Any: """Deserialize a value, converting typed wrappers back to original types.""" if isinstance(value, dict): - # Check for bytes wrapper if value.get("__bytes__") is True: encoding = value.get("encoding") data = value.get("value", "") @@ -944,10 +1119,14 @@ def _deserialize_value(self, value: Any) -> Any: elif encoding == "hex": return bytes.fromhex(data) return data # fallback + if value.get("__float__") is True: + return float(value.get("value", 0.0)) # Recursively deserialize dict values return {k: self._deserialize_value(v) for k, v in value.items()} elif isinstance(value, list): return [self._deserialize_value(v) for v in value] + elif isinstance(value, float) and value.is_integer(): + return int(value) return value def _deserialize_response(self, mock_data: dict[str, Any]) -> Any: