From 0c4744e0e686a381d640eef6ab40b4c5d2a8994d Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sun, 5 Apr 2026 08:59:21 +0300 Subject: [PATCH 1/3] feat: NumPy-accelerated vector serialization in VectorType MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add three fast paths to VectorType.serialize() for the common case of fixed-size numeric subtypes (float, double, int, bigint): 1. bytes/bytearray passthrough – skip all conversion when the caller already holds a correctly-sized blob (e.g. from serialize_numpy_bulk). 2. NumPy ndarray fast path – convert a 1-D numpy array to big-endian bytes via asarray(dtype=...).tobytes() instead of 768+ individual struct.pack + BytesIO.write calls. 3. serialize_numpy_bulk() classmethod – byte-swap an entire 2-D array (N rows × dim columns) once and slice the raw buffer, yielding one bytes object per row with zero per-element overhead. Benchmarks on 768-dim float32 vectors show 70-300× speedups depending on the path, directly benefiting bulk-insert workloads such as loading embeddings from Parquet files (VectorDBBench use case). NumPy remains an optional dependency; all new code is guarded by try/except ImportError. Variable-size subtypes (smallint, tinyint, text, etc.) are excluded and continue to use the original element-by-element path. Unit tests cover correctness, round-trip fidelity, error handling, and fallback behavior for all three paths. --- cassandra/cqltypes.py | 119 ++++++++++++- tests/unit/test_types.py | 355 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 473 insertions(+), 1 deletion(-) diff --git a/cassandra/cqltypes.py b/cassandra/cqltypes.py index 547a13c979..12aa886405 100644 --- a/cassandra/cqltypes.py +++ b/cassandra/cqltypes.py @@ -54,6 +54,12 @@ _little_endian_flag = 1 # we always serialize LE import ipaddress + +try: + import numpy as _np + _HAVE_NUMPY = True +except ImportError: + _HAVE_NUMPY = False apache_cassandra_type_prefix = 'org.apache.cassandra.db.marshal.' cassandra_empty_type = 'org.apache.cassandra.db.marshal.EmptyType' @@ -1428,10 +1434,25 @@ def serialize(cls, v, protocol_version): return buf.getvalue() +# Used by VectorType to enable fast serialization from NumPy arrays. +# Only types with a fixed serial_size() are included, because the vector +# wire format for variable-size subtypes uses uvint length prefixes per +# element, which cannot be produced by raw numpy tobytes(). +_NUMPY_DTYPE_MAP = {} +if _HAVE_NUMPY: + _NUMPY_DTYPE_MAP = { + 'float': _np.dtype('>f4'), # FloatType - 4-byte big-endian float + 'double': _np.dtype('>f8'), # DoubleType - 8-byte big-endian double + 'int': _np.dtype('>i4'), # Int32Type - 4-byte big-endian int32 + 'bigint': _np.dtype('>i8'), # LongType - 8-byte big-endian int64 + } + + class VectorType(_CassandraType): typename = 'org.apache.cassandra.db.marshal.VectorType' vector_size = 0 subtype = None + _numpy_dtype = None @classmethod def serial_size(cls): @@ -1443,7 +1464,12 @@ def apply_parameters(cls, params, names): assert len(params) == 2 subtype = lookup_casstype(params[0]) vsize = params[1] - return type('%s(%s)' % (cls.cass_parameterized_type_with([]), vsize), (cls,), {'vector_size': vsize, 'subtype': subtype}) + # Determine the NumPy dtype for fast serialization (only for + # fixed-size subtypes whose typename appears in _NUMPY_DTYPE_MAP). + np_dtype = None + if _HAVE_NUMPY and subtype.serial_size() is not None: + np_dtype = _NUMPY_DTYPE_MAP.get(subtype.typename) + return type('%s(%s)' % (cls.cass_parameterized_type_with([]), vsize), (cls,), {'vector_size': vsize, 'subtype': subtype, '_numpy_dtype': np_dtype}) @classmethod def deserialize(cls, byts, protocol_version): @@ -1476,6 +1502,36 @@ def deserialize(cls, byts, protocol_version): @classmethod def serialize(cls, v, protocol_version): + # ---- bytes / bytearray passthrough ---- + # If the caller already holds a correctly-sized blob (e.g. from + # serialize_numpy_bulk), skip all conversion work. + # Only enabled for subtypes with a known NumPy dtype (float, double, + # int32, bigint) - variable-size subtypes fall through to the + # element-by-element path. + if cls._numpy_dtype is not None and isinstance(v, (bytes, bytearray)): + expected = cls.serial_size() + if len(v) == expected: + return v if isinstance(v, bytes) else bytes(v) + raise ValueError( + 'Pre-serialized bytes have wrong length %d (expected %d for vector<%s, %d>)' + % (len(v), expected, cls.subtype.typename, cls.vector_size)) + + # ---- NumPy ndarray fast path ---- + if cls._numpy_dtype is not None and _HAVE_NUMPY and isinstance(v, _np.ndarray): + if v.shape != (cls.vector_size,): + raise ValueError( + 'Expected ndarray of shape ({0},) for vector of type {1}, got shape {2}'.format( + cls.vector_size, cls.subtype.typename, v.shape)) + if v.dtype != cls._numpy_dtype and not _np.can_cast(v.dtype, cls._numpy_dtype, casting='safe'): + raise TypeError( + 'Unsafe dtype conversion from %s to %s for vector<%s, %d>: ' + 'values may overflow or lose precision. ' + 'Cast explicitly with arr.astype(%r) if this is intentional.' + % (v.dtype, cls._numpy_dtype, cls.subtype.typename, cls.vector_size, cls._numpy_dtype)) + arr = _np.asarray(v, dtype=cls._numpy_dtype) + return arr.tobytes() + + # ---- Original element-by-element path ---- v_length = len(v) if cls.vector_size != v_length: raise ValueError( @@ -1491,6 +1547,67 @@ def serialize(cls, v, protocol_version): buf.write(item_bytes) return buf.getvalue() + @classmethod + def serialize_numpy_bulk(cls, vectors): + """Serialize a batch of vectors from a 2-D NumPy array. + + Parameters + ---------- + vectors : numpy.ndarray + A 2-D array of shape ``(N, cls.vector_size)`` whose values are + compatible with the CQL vector subtype. + + Returns + ------- + list[bytes] + One ``bytes`` object per row, ready to be bound to a CQL + ``vector<...>`` column. + + Raises + ------ + TypeError + If NumPy is not available or ``cls._numpy_dtype`` is ``None`` + (subtype not supported for the fast path). + ValueError + If the second dimension of *vectors* does not match + ``cls.vector_size``. + + Notes + ----- + The entire array is byte-swapped to big-endian once, then each row + is sliced out of the raw buffer with zero per-element overhead. + The returned ``bytes`` objects are accepted directly by + ``VectorType.serialize()`` (bytes passthrough) so they flow through + ``BoundStatement.bind()`` without further conversion. + """ + if not _HAVE_NUMPY: + raise TypeError('serialize_numpy_bulk() requires NumPy to be installed') + if cls._numpy_dtype is None: + raise TypeError( + 'serialize_numpy_bulk() requires a subtype with a known ' + 'NumPy dtype; %s is not supported' % cls.subtype.typename) + if not isinstance(vectors, _np.ndarray): + raise ValueError( + 'Expected a 2-D NumPy array, got %s' % type(vectors).__name__) + if vectors.ndim != 2: + raise ValueError( + 'Expected a 2-D NumPy array, got %d-D array with shape %s' + % (vectors.ndim, vectors.shape)) + if vectors.shape[1] != cls.vector_size: + raise ValueError( + 'Expected array with %d columns, got shape %s' + % (cls.vector_size, vectors.shape)) + if vectors.dtype != cls._numpy_dtype and not _np.can_cast(vectors.dtype, cls._numpy_dtype, casting='safe'): + raise TypeError( + 'Unsafe dtype conversion from %s to %s for vector<%s, %d>: ' + 'values may overflow or lose precision. ' + 'Cast explicitly with arr.astype(%r) if this is intentional.' + % (vectors.dtype, cls._numpy_dtype, cls.subtype.typename, cls.vector_size, cls._numpy_dtype)) + arr = _np.asarray(vectors, dtype=cls._numpy_dtype) + row_bytes = cls._numpy_dtype.itemsize * cls.vector_size + raw = arr.tobytes() + return [raw[i * row_bytes:(i + 1) * row_bytes] for i in range(arr.shape[0])] + @classmethod def cql_parameterized_type(cls): return "%s<%s, %s>" % (cls.typename, cls.subtype.cql_parameterized_type(), cls.vector_size) diff --git a/tests/unit/test_types.py b/tests/unit/test_types.py index 11aab2748d..dc93d00214 100644 --- a/tests/unit/test_types.py +++ b/tests/unit/test_types.py @@ -1117,3 +1117,358 @@ def test_token_order(self): tokens_equal = [Token(1), Token(1)] check_sequence_consistency(tokens) check_sequence_consistency(tokens_equal, equal=True) + + + +try: + import numpy as np + + _HAVE_NUMPY = True +except ImportError: + _HAVE_NUMPY = False + + +@unittest.skipUnless(_HAVE_NUMPY, "NumPy not installed") +class VectorNumpySerializeTests(unittest.TestCase): + """Tests for NumPy fast path in VectorType.serialize().""" + + FLOAT_CTYPE = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)" + DOUBLE_CTYPE = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.DoubleType, 4)" + INT32_CTYPE = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.Int32Type, 4)" + BIGINT_CTYPE = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.LongType, 4)" + + def _get_ctype(self, ctype_str): + return parse_casstype_args(ctype_str) + + # -- NumPy dtype is cached in the parameterized subclass -- + + def test_numpy_dtype_cached_for_float(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + self.assertEqual(ctype._numpy_dtype, np.dtype(">f4")) + + def test_numpy_dtype_cached_for_double(self): + ctype = self._get_ctype(self.DOUBLE_CTYPE) + self.assertEqual(ctype._numpy_dtype, np.dtype(">f8")) + + def test_numpy_dtype_cached_for_int32(self): + ctype = self._get_ctype(self.INT32_CTYPE) + self.assertEqual(ctype._numpy_dtype, np.dtype(">i4")) + + def test_numpy_dtype_cached_for_bigint(self): + ctype = self._get_ctype(self.BIGINT_CTYPE) + self.assertEqual(ctype._numpy_dtype, np.dtype(">i8")) + + def test_numpy_dtype_none_for_variable_size_numeric_subtype(self): + """ShortType/ByteType have no fixed serial_size(), so numpy fast path is disabled.""" + for ctype_str in [ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.ShortType, 4)", + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.ByteType, 4)", + ]: + ctype = self._get_ctype(ctype_str) + self.assertIsNone(ctype._numpy_dtype) + + def test_numpy_dtype_none_for_unsupported_subtype(self): + """Subtypes without a fixed-size wire format (e.g. AsciiType) should have _numpy_dtype = None.""" + ctype = self._get_ctype( + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.AsciiType, 4)" + ) + self.assertIsNone(ctype._numpy_dtype) + + # -- NumPy fast path: correctness (result matches list path) -- + + def test_numpy_float32_matches_list_serialize(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + data = [1.0, 2.0, 3.0, 4.0] + list_result = ctype.serialize(data, 4) + arr = np.array(data, dtype=np.float32) + numpy_result = ctype.serialize(arr, 4) + self.assertEqual(list_result, numpy_result) + + def test_numpy_float64_matches_list_serialize(self): + ctype = self._get_ctype(self.DOUBLE_CTYPE) + data = [1.5, 2.5, 3.5, 4.5] + list_result = ctype.serialize(data, 4) + arr = np.array(data, dtype=np.float64) + numpy_result = ctype.serialize(arr, 4) + self.assertEqual(list_result, numpy_result) + + def test_numpy_int32_matches_list_serialize(self): + ctype = self._get_ctype(self.INT32_CTYPE) + data = [10, 20, 30, 40] + list_result = ctype.serialize(data, 4) + arr = np.array(data, dtype=np.int32) + numpy_result = ctype.serialize(arr, 4) + self.assertEqual(list_result, numpy_result) + + def test_numpy_bigint_matches_list_serialize(self): + ctype = self._get_ctype(self.BIGINT_CTYPE) + data = [100, 200, 300, 400] + list_result = ctype.serialize(data, 4) + arr = np.array(data, dtype=np.int64) + numpy_result = ctype.serialize(arr, 4) + self.assertEqual(list_result, numpy_result) + + # -- NumPy fast path: safe dtype widening (no precision loss) -- + + def test_numpy_widens_float32_to_float64(self): + """float32 -> float64 is a safe widening conversion, should work.""" + ctype = self._get_ctype(self.DOUBLE_CTYPE) + data = [1.0, 2.0, 3.0, 4.0] + list_result = ctype.serialize(data, 4) + arr_f32 = np.array(data, dtype=np.float32) + numpy_result = ctype.serialize(arr_f32, 4) + self.assertEqual(list_result, numpy_result) + + def test_numpy_widens_int32_to_int64(self): + """int32 -> int64 is a safe widening conversion, should work.""" + ctype = self._get_ctype(self.BIGINT_CTYPE) + data = [10, 20, 30, 40] + list_result = ctype.serialize(data, 4) + arr_i32 = np.array(data, dtype=np.int32) + numpy_result = ctype.serialize(arr_i32, 4) + self.assertEqual(list_result, numpy_result) + + # -- NumPy fast path: unsafe dtype narrowing raises TypeError -- + + def test_numpy_rejects_float64_to_float32(self): + """float64 -> float32 is an unsafe narrowing, should raise TypeError.""" + ctype = self._get_ctype(self.FLOAT_CTYPE) + arr_f64 = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float64) + with pytest.raises(TypeError, match="Unsafe dtype conversion"): + ctype.serialize(arr_f64, 4) + + def test_numpy_rejects_int64_to_int32(self): + """int64 -> int32 is an unsafe narrowing, should raise TypeError.""" + ctype = self._get_ctype(self.INT32_CTYPE) + arr_i64 = np.array([10, 20, 30, 40], dtype=np.int64) + with pytest.raises(TypeError, match="Unsafe dtype conversion"): + ctype.serialize(arr_i64, 4) + + # -- NumPy fast path: round-trip (serialize -> deserialize) -- + + def test_numpy_round_trip_float(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + data = np.array([1.5, 2.5, 3.5, 4.5], dtype=np.float32) + serialized = ctype.serialize(data, 4) + deserialized = ctype.deserialize(serialized, 4) + np.testing.assert_allclose(deserialized, data, rtol=1e-5) + + def test_numpy_round_trip_double(self): + ctype = self._get_ctype(self.DOUBLE_CTYPE) + data = np.array([1.5, 2.5, 3.5, 4.5], dtype=np.float64) + serialized = ctype.serialize(data, 4) + deserialized = ctype.deserialize(serialized, 4) + np.testing.assert_allclose(deserialized, data, rtol=1e-10) + + # -- NumPy fast path: error cases -- + + def test_numpy_wrong_shape_raises(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + arr = np.array([1.0, 2.0, 3.0], dtype=np.float32) # 3 elements, expected 4 + with pytest.raises(ValueError, match="Expected ndarray of shape"): + ctype.serialize(arr, 4) + + def test_numpy_2d_array_raises(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + arr = np.array([[1.0, 2.0, 3.0, 4.0]], dtype=np.float32) # 2D + with pytest.raises(ValueError, match="Expected ndarray of shape"): + ctype.serialize(arr, 4) + + def test_numpy_falls_back_to_list_for_unsupported_subtype(self): + """For subtypes without a NumPy dtype mapping, ndarray input should fall through to the list path.""" + ctype = self._get_ctype( + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.AsciiType, 3)" + ) + arr = np.array(["abc", "def", "ghi"], dtype=object) + result = ctype.serialize(arr, 4) + list_result = ctype.serialize(["abc", "def", "ghi"], 4) + self.assertEqual(result, list_result) + + +@unittest.skipUnless(_HAVE_NUMPY, "NumPy not installed") +class VectorBytesPassthroughTests(unittest.TestCase): + """Tests for bytes/bytearray passthrough in VectorType.serialize().""" + + FLOAT_CTYPE = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)" + + def _get_ctype(self, ctype_str): + return parse_casstype_args(ctype_str) + + def test_bytes_passthrough_exact_size(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + original = ctype.serialize([1.0, 2.0, 3.0, 4.0], 4) + passthrough = ctype.serialize(original, 4) + self.assertEqual(original, passthrough) + + def test_bytearray_passthrough(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + original = ctype.serialize([1.0, 2.0, 3.0, 4.0], 4) + ba = bytearray(original) + passthrough = ctype.serialize(ba, 4) + self.assertEqual(original, passthrough) + self.assertIsInstance(passthrough, bytes) + + def test_bytes_wrong_size_raises(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + with pytest.raises(ValueError, match="Pre-serialized bytes"): + ctype.serialize(b"\x00" * 12, 4) # 12 bytes, expected 16 + + def test_bytes_passthrough_round_trip(self): + """Bytes from serialize_numpy_bulk flow through BoundStatement.bind() without double-serialization.""" + ctype = self._get_ctype(self.FLOAT_CTYPE) + arr = np.array([[1.0, 2.0, 3.0, 4.0]], dtype=np.float32) + bulk = ctype.serialize_numpy_bulk(arr) + re_serialized = ctype.serialize(bulk[0], 4) + self.assertEqual(bulk[0], re_serialized) + deserialized = ctype.deserialize(re_serialized, 4) + np.testing.assert_allclose(deserialized, [1.0, 2.0, 3.0, 4.0], rtol=1e-5) + + def test_bytes_not_intercepted_for_unsupported_subtype(self): + """Bytes passed to a variable-size vector should NOT be intercepted by the passthrough path.""" + ctype = self._get_ctype( + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.AsciiType, 3)" + ) + with pytest.raises((ValueError, TypeError)): + ctype.serialize(b"\x00" * 10, 4) + + +@unittest.skipUnless(_HAVE_NUMPY, "NumPy not installed") +class VectorSerializeNumpyBulkTests(unittest.TestCase): + """Tests for VectorType.serialize_numpy_bulk().""" + + FLOAT_CTYPE = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)" + DOUBLE_CTYPE = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.DoubleType, 3)" + INT32_CTYPE = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.Int32Type, 4)" + + def _get_ctype(self, ctype_str): + return parse_casstype_args(ctype_str) + + def test_bulk_float32_matches_individual(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + vectors = np.array( + [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0], [9.0, 10.0, 11.0, 12.0]], + dtype=np.float32, + ) + bulk = ctype.serialize_numpy_bulk(vectors) + self.assertEqual(len(bulk), 3) + for i in range(3): + individual = ctype.serialize(vectors[i], 4) + self.assertEqual(bulk[i], individual) + + def test_bulk_float64_matches_individual(self): + ctype = self._get_ctype(self.DOUBLE_CTYPE) + vectors = np.array([[1.5, 2.5, 3.5], [4.5, 5.5, 6.5]], dtype=np.float64) + bulk = ctype.serialize_numpy_bulk(vectors) + self.assertEqual(len(bulk), 2) + for i in range(2): + individual = ctype.serialize(vectors[i], 4) + self.assertEqual(bulk[i], individual) + + def test_bulk_int32_matches_individual(self): + ctype = self._get_ctype(self.INT32_CTYPE) + vectors = np.array([[10, 20, 30, 40], [50, 60, 70, 80]], dtype=np.int32) + bulk = ctype.serialize_numpy_bulk(vectors) + self.assertEqual(len(bulk), 2) + for i in range(2): + individual = ctype.serialize(vectors[i], 4) + self.assertEqual(bulk[i], individual) + + def test_bulk_widens_int32_to_int64(self): + """Widening int32 -> int64 should work in bulk path.""" + ctype = self._get_ctype( + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.LongType, 4)" + ) + vectors = np.array([[10, 20, 30, 40]], dtype=np.int32) + bulk = ctype.serialize_numpy_bulk(vectors) + individual = ctype.serialize(np.array([10, 20, 30, 40], dtype=np.int32), 4) + self.assertEqual(bulk[0], individual) + + def test_bulk_rejects_float64_to_float32(self): + """float64 -> float32 is unsafe narrowing, should raise TypeError.""" + ctype = self._get_ctype(self.FLOAT_CTYPE) + vectors = np.array([[1.0, 2.0, 3.0, 4.0]], dtype=np.float64) + with pytest.raises(TypeError, match="Unsafe dtype conversion"): + ctype.serialize_numpy_bulk(vectors) + + def test_bulk_rejects_int64_to_int32(self): + """int64 -> int32 is unsafe narrowing, should raise TypeError.""" + ctype = self._get_ctype(self.INT32_CTYPE) + vectors = np.array([[10, 20, 30, 40]], dtype=np.int64) + with pytest.raises(TypeError, match="Unsafe dtype conversion"): + ctype.serialize_numpy_bulk(vectors) + + def test_bulk_round_trip(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + vectors = np.random.rand(100, 4).astype(np.float32) + bulk = ctype.serialize_numpy_bulk(vectors) + for i in range(100): + deserialized = ctype.deserialize(bulk[i], 4) + np.testing.assert_allclose(deserialized, vectors[i], rtol=1e-5) + + def test_bulk_single_row(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + vectors = np.array([[1.0, 2.0, 3.0, 4.0]], dtype=np.float32) + bulk = ctype.serialize_numpy_bulk(vectors) + self.assertEqual(len(bulk), 1) + + def test_bulk_empty_array(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + vectors = np.empty((0, 4), dtype=np.float32) + bulk = ctype.serialize_numpy_bulk(vectors) + self.assertEqual(len(bulk), 0) + + def test_bulk_fortran_order_array(self): + """Column-major (Fortran) arrays should be handled correctly.""" + ctype = self._get_ctype(self.FLOAT_CTYPE) + vectors_c = np.array( + [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]], dtype=np.float32, order="C" + ) + vectors_f = np.asfortranarray(vectors_c) + self.assertFalse(vectors_f.flags["C_CONTIGUOUS"]) + bulk_c = ctype.serialize_numpy_bulk(vectors_c) + bulk_f = ctype.serialize_numpy_bulk(vectors_f) + self.assertEqual(bulk_c, bulk_f) + + def test_bulk_large_batch(self): + """Stress test with a realistic embedding size (768-dim, 10K rows).""" + ctype = parse_casstype_args( + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 768)" + ) + vectors = np.random.rand(10000, 768).astype(np.float32) + bulk = ctype.serialize_numpy_bulk(vectors) + self.assertEqual(len(bulk), 10000) + self.assertEqual(len(bulk[0]), 768 * 4) + np.testing.assert_allclose(ctype.deserialize(bulk[0], 4), vectors[0], rtol=1e-5) + np.testing.assert_allclose( + ctype.deserialize(bulk[-1], 4), vectors[-1], rtol=1e-5 + ) + + def test_bulk_wrong_dimension_raises(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + vectors = np.array([[1.0, 2.0, 3.0]], dtype=np.float32) + with pytest.raises(ValueError, match="Expected array with 4 columns"): + ctype.serialize_numpy_bulk(vectors) + + def test_bulk_1d_array_raises(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + arr = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32) + with pytest.raises(ValueError, match="Expected a 2-D NumPy array"): + ctype.serialize_numpy_bulk(arr) + + def test_bulk_3d_array_raises(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + arr = np.ones((2, 4, 3), dtype=np.float32) + with pytest.raises(ValueError, match="Expected a 2-D NumPy array"): + ctype.serialize_numpy_bulk(arr) + + def test_bulk_unsupported_subtype_raises(self): + ctype = self._get_ctype( + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.AsciiType, 4)" + ) + with pytest.raises(TypeError, match="serialize_numpy_bulk.*is not supported"): + ctype.serialize_numpy_bulk(np.array([["a", "b", "c", "d"]], dtype=object)) + + def test_bulk_non_array_raises(self): + ctype = self._get_ctype(self.FLOAT_CTYPE) + with pytest.raises(ValueError, match="Expected a 2-D NumPy array"): + ctype.serialize_numpy_bulk([[1.0, 2.0, 3.0, 4.0]]) From 869b72f06dc8f71f99be15975eaf6eb491f25540 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sun, 5 Apr 2026 08:59:34 +0300 Subject: [PATCH 2/3] bench: add vector NumPy serialization benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Standalone benchmark comparing the four serialization paths for VectorType across dimensions (128, 768, 1536) and batch sizes (1, 100, 10000): - list (element-by-element) – baseline - numpy (per-row ndarray) – single-row fast path - bulk (serialize_numpy_bulk) – batch fast path - bytes passthrough (bind) – pre-serialized blob Includes auto-calibrated iteration counts and correctness verification. --- benchmarks/bench_vector_numpy_serialize.py | 257 +++++++++++++++++++++ 1 file changed, 257 insertions(+) create mode 100644 benchmarks/bench_vector_numpy_serialize.py diff --git a/benchmarks/bench_vector_numpy_serialize.py b/benchmarks/bench_vector_numpy_serialize.py new file mode 100644 index 0000000000..6227084ccf --- /dev/null +++ b/benchmarks/bench_vector_numpy_serialize.py @@ -0,0 +1,257 @@ +# Copyright 2025 ScyllaDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Benchmark: VectorType serialization – list vs NumPy ndarray vs bulk. + +Compares three approaches for serializing float vectors into the CQL +binary protocol wire format: + + 1. list path – original element-by-element serialize (struct.pack per element) + 2. numpy path – single ndarray passed to serialize(), uses tobytes() + 3. bulk path – serialize_numpy_bulk() on a 2-D array (one byte-swap for + the entire batch, then bytes slicing) + +Each scenario is tested at realistic vector dimensions (128, 768, 1536) and +batch sizes (1, 100, 10_000 rows). + +Usage: + python benchmarks/bench_vector_numpy_serialize.py +""" + +import os +import sys +import time +import timeit + +# Ensure the repo root is importable +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import numpy as np + +from cassandra.cqltypes import parse_casstype_args + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +DIMENSIONS = [128, 768, 1536] +BATCH_SIZES = [1, 100, 10_000] +REPEATS = 3 +# Number of iterations is scaled per scenario to keep total time reasonable +MIN_ITERS = 5 +TARGET_TIME_S = 0.2 # aim for ~0.2s per measurement + + +def auto_iters(fn, target_s=TARGET_TIME_S, min_iters=MIN_ITERS): + """Estimate a good iteration count for the given function.""" + # Warm up + fn() + # Time a single call + t0 = time.perf_counter() + fn() + t1 = time.perf_counter() + elapsed = t1 - t0 + if elapsed <= 0: + return 100_000 + n = max(min_iters, int(target_s / elapsed)) + return n + + +# --------------------------------------------------------------------------- +# Benchmark functions +# --------------------------------------------------------------------------- + + +def bench_list_serialize(ctype, vectors_list, protocol_version=4): + """Serialize each row as a Python list (original path).""" + for row in vectors_list: + ctype.serialize(row, protocol_version) + + +def bench_numpy_serialize(ctype, vectors_np_rows, protocol_version=4): + """Serialize each row as an individual 1-D ndarray (numpy fast path).""" + for row in vectors_np_rows: + ctype.serialize(row, protocol_version) + + +def bench_bulk_serialize(ctype, vectors_2d, protocol_version=4): + """Serialize all rows at once with serialize_numpy_bulk().""" + ctype.serialize_numpy_bulk(vectors_2d) + + +def bench_bytes_passthrough(ctype, pre_serialized, protocol_version=4): + """Simulate bind() calling serialize() on pre-serialized bytes.""" + for row_bytes in pre_serialized: + ctype.serialize(row_bytes, protocol_version) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def main(): + print(f"Python: {sys.version.split()[0]}") + print(f"NumPy: {np.__version__}") + print(f"Repeats: {REPEATS} (best of)") + print() + + for dim in DIMENSIONS: + ctype_str = ( + f"org.apache.cassandra.db.marshal.VectorType(" + f"org.apache.cassandra.db.marshal.FloatType, {dim})" + ) + ctype = parse_casstype_args(ctype_str) + + print(f"{'=' * 72}") + print(f" Vector dimension: {dim} (float32, {dim * 4} bytes/vector)") + print(f"{'=' * 72}") + print() + + for batch_size in BATCH_SIZES: + # Prepare data + vectors_2d = np.random.rand(batch_size, dim).astype(np.float32) + vectors_list = [vectors_2d[i].tolist() for i in range(batch_size)] + vectors_np_rows = [vectors_2d[i] for i in range(batch_size)] + + print(f" --- batch_size = {batch_size:,} ---") + print( + f" {'method':30s} {'iters':>8s} {'ns/call':>12s} {'total_ms':>10s} {'us/row':>10s} {'vs list':>8s}" + ) + print( + f" {'------':30s} {'-----':>8s} {'-------':>12s} {'--------':>10s} {'------':>10s} {'-------':>8s}" + ) + + results = {} + + # 1. List path + def run_list(): + bench_list_serialize(ctype, vectors_list) + + n_iters = auto_iters(run_list) + t = timeit.repeat( + run_list, number=n_iters, repeat=REPEATS, timer=time.perf_counter + ) + best_s = min(t) / n_iters + us_per_row = best_s / batch_size * 1e6 + total_ms = best_s * 1e3 + ns_per_call = best_s * 1e9 + results["list"] = us_per_row + print( + f" {'list (element-by-element)':30s} {n_iters:8d} {ns_per_call:12.1f} {total_ms:10.3f} {us_per_row:10.3f} {'1.00x':>8s}" + ) + + # 2. NumPy per-row path + def run_numpy(): + bench_numpy_serialize(ctype, vectors_np_rows) + + n_iters = auto_iters(run_numpy) + t = timeit.repeat( + run_numpy, number=n_iters, repeat=REPEATS, timer=time.perf_counter + ) + best_s = min(t) / n_iters + us_per_row = best_s / batch_size * 1e6 + total_ms = best_s * 1e3 + ns_per_call = best_s * 1e9 + speedup = results["list"] / us_per_row if us_per_row > 0 else float("inf") + results["numpy"] = us_per_row + print( + f" {'numpy (per-row ndarray)':30s} {n_iters:8d} {ns_per_call:12.1f} {total_ms:10.3f} {us_per_row:10.3f} {speedup:7.2f}x" + ) + + # 3. Bulk path + def run_bulk(): + bench_bulk_serialize(ctype, vectors_2d) + + n_iters = auto_iters(run_bulk) + t = timeit.repeat( + run_bulk, number=n_iters, repeat=REPEATS, timer=time.perf_counter + ) + best_s = min(t) / n_iters + us_per_row = best_s / batch_size * 1e6 + total_ms = best_s * 1e3 + ns_per_call = best_s * 1e9 + speedup = results["list"] / us_per_row if us_per_row > 0 else float("inf") + results["bulk"] = us_per_row + print( + f" {'bulk (serialize_numpy_bulk)':30s} {n_iters:8d} {ns_per_call:12.1f} {total_ms:10.3f} {us_per_row:10.3f} {speedup:7.2f}x" + ) + + # 4. Bytes passthrough (simulates bind() on pre-serialized bulk output) + pre_serialized = ctype.serialize_numpy_bulk(vectors_2d) + + def run_passthrough(): + bench_bytes_passthrough(ctype, pre_serialized) + + n_iters = auto_iters(run_passthrough) + t = timeit.repeat( + run_passthrough, number=n_iters, repeat=REPEATS, timer=time.perf_counter + ) + best_s = min(t) / n_iters + us_per_row = best_s / batch_size * 1e6 + total_ms = best_s * 1e3 + ns_per_call = best_s * 1e9 + speedup = results["list"] / us_per_row if us_per_row > 0 else float("inf") + print( + f" {'bytes passthrough (bind)':30s} {n_iters:8d} {ns_per_call:12.1f} {total_ms:10.3f} {us_per_row:10.3f} {speedup:7.2f}x" + ) + + # 5. Bulk + passthrough combined (realistic end-to-end) + def run_bulk_e2e(): + serialized = ctype.serialize_numpy_bulk(vectors_2d) + for row_bytes in serialized: + ctype.serialize(row_bytes, 4) + + n_iters = auto_iters(run_bulk_e2e) + t = timeit.repeat( + run_bulk_e2e, number=n_iters, repeat=REPEATS, timer=time.perf_counter + ) + best_s = min(t) / n_iters + us_per_row = best_s / batch_size * 1e6 + total_ms = best_s * 1e3 + ns_per_call = best_s * 1e9 + speedup = results["list"] / us_per_row if us_per_row > 0 else float("inf") + print( + f" {'bulk+passthrough (end-to-end)':30s} {n_iters:8d} {ns_per_call:12.1f} {total_ms:10.3f} {us_per_row:10.3f} {speedup:7.2f}x" + ) + + print() + + # --- Correctness verification --- + print("Correctness verification:") + for dim in DIMENSIONS: + ctype_str = ( + f"org.apache.cassandra.db.marshal.VectorType(" + f"org.apache.cassandra.db.marshal.FloatType, {dim})" + ) + ctype = parse_casstype_args(ctype_str) + vectors = np.random.rand(10, dim).astype(np.float32) + bulk = ctype.serialize_numpy_bulk(vectors) + for i in range(10): + list_result = ctype.serialize(vectors[i].tolist(), 4) + numpy_result = ctype.serialize(vectors[i], 4) + assert bulk[i] == list_result == numpy_result, ( + f"Mismatch at dim={dim}, row={i}" + ) + # Bytes passthrough + assert ctype.serialize(bulk[i], 4) == bulk[i] + print(f" dim={dim}: OK (list == numpy == bulk == passthrough)") + + print() + print("Done.") + + +if __name__ == "__main__": + main() From 9e4bf4a40fcab2a8288347310d3f18e107d4ffd5 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sun, 5 Apr 2026 09:00:05 +0300 Subject: [PATCH 3/3] docs: document NumPy-accelerated vector serialization Add a new section to docs/performance.rst covering the three fast serialization paths (single-row ndarray, bulk serialize_numpy_bulk, bytes passthrough) with usage examples and supported subtype list. --- docs/performance.rst | 50 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/docs/performance.rst b/docs/performance.rst index f7a3f49e0f..026daf6ca3 100644 --- a/docs/performance.rst +++ b/docs/performance.rst @@ -43,3 +43,53 @@ objects should all be created after forking the process, not before. For further discussion and simple examples using the driver with ``multiprocessing``, see `this blog post `_. + +NumPy-accelerated Vector Serialization +-------------------------------------- +When inserting high-dimensional vectors (e.g. ML embeddings), the default +element-by-element serialization in +:class:`~cassandra.cqltypes.VectorType` can become a bottleneck. If +`NumPy `_ is installed, the driver provides three +progressively faster paths: + +**Single-row ndarray fast path** – pass a 1-D ``numpy.ndarray`` directly +as the bound value for a ``vector`` column. The driver +byte-swaps the array to big-endian in a single C-level operation and +calls ``tobytes()``, replacing *N* individual ``struct.pack`` calls:: + + import numpy as np + embedding = np.array([0.1, 0.2, ..., 0.768], dtype=np.float32) + session.execute(insert_stmt, [key, embedding]) + +**Bulk serialization** – for batch inserts, convert an entire 2-D array +(one row per vector) into a list of ``bytes`` objects with a single +byte-swap:: + + from cassandra.cqltypes import VectorType + + # Build the parameterized type (usually done once) + ctype = VectorType.apply_parameters( + [lookup_casstype('org.apache.cassandra.db.marshal.FloatType'), 768], + names=None, + ) + + vectors_2d = np.array(all_embeddings, dtype=np.float32) # (N, 768) + blobs = ctype.serialize_numpy_bulk(vectors_2d) # list[bytes] + + for key, blob in zip(keys, blobs): + session.execute(insert_stmt, [key, blob]) + +Each ``bytes`` object in the returned list is accepted directly by the +driver's bytes-passthrough path, so ``BoundStatement.bind()`` performs no +further conversion. + +**Supported subtypes** – the fast paths are available for ``float`` +(``>f4``), ``double`` (``>f8``), ``int`` (``>i4``), and ``bigint`` +(``>i8``). Variable-length subtypes (``smallint``, ``tinyint``, text +types, etc.) fall back to the original element-by-element serialization +automatically. + +**Benchmarks** – on 768-dimension ``float32`` vectors (100-row batches), +the bulk path is ~146× faster than the baseline, and the bytes +passthrough path is ~298× faster. See +``benchmarks/bench_vector_numpy_serialize.py`` for reproducible numbers.