Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions vortex-tensor/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub const vortex_tensor::encodings::turboquant::MIN_DIMENSION: u32

pub fn vortex_tensor::encodings::turboquant::tq_validate_vector_dtype(dtype: &vortex_array::dtype::DType) -> vortex_error::VortexResult<vortex_tensor::vector::VectorMatcherMetadata>

pub fn vortex_tensor::encodings::turboquant::turboquant_compress(input: vortex_array::array::erased::ArrayRef, config: &vortex_tensor::encodings::turboquant::TurboQuantConfig, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub fn vortex_tensor::encodings::turboquant::turboquant_encode(ext: vortex_array::array::view::ArrayView<'_, vortex_array::arrays::extension::vtable::Extension>, config: &vortex_tensor::encodings::turboquant::TurboQuantConfig, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub unsafe fn vortex_tensor::encodings::turboquant::turboquant_encode_unchecked(ext: vortex_array::array::view::ArrayView<'_, vortex_array::arrays::extension::vtable::Extension>, config: &vortex_tensor::encodings::turboquant::TurboQuantConfig, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>
Expand Down Expand Up @@ -502,7 +504,7 @@ pub fn vortex_tensor::scalar_fns::sorf_transform::SorfTransform::child_name(&sel

pub fn vortex_tensor::scalar_fns::sorf_transform::SorfTransform::execute(&self, options: &Self::Options, args: &dyn vortex_array::scalar_fn::vtable::ExecutionArgs, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub fn vortex_tensor::scalar_fns::sorf_transform::SorfTransform::fmt_sql(&self, _options: &Self::Options, expr: &vortex_array::expr::expression::Expression, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
pub fn vortex_tensor::scalar_fns::sorf_transform::SorfTransform::fmt_sql(&self, options: &Self::Options, expr: &vortex_array::expr::expression::Expression, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result

pub fn vortex_tensor::scalar_fns::sorf_transform::SorfTransform::id(&self) -> vortex_array::scalar_fn::ScalarFnId

Expand All @@ -526,6 +528,12 @@ pub fn vortex_tensor::vector::AnyVector::try_match<'a>(ext_dtype: &'a vortex_arr

pub struct vortex_tensor::vector::Vector

impl vortex_tensor::vector::Vector

pub fn vortex_tensor::vector::Vector::constant_array<T: vortex_array::dtype::ptype::NativePType + core::convert::Into<vortex_array::scalar::typed_view::primitive::pvalue::PValue>>(elements: &[T], len: usize) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub fn vortex_tensor::vector::Vector::wrap_storage(storage: vortex_array::array::erased::ArrayRef) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

impl core::clone::Clone for vortex_tensor::vector::Vector

pub fn vortex_tensor::vector::Vector::clone(&self) -> vortex_tensor::vector::Vector
Expand Down Expand Up @@ -574,6 +582,8 @@ pub fn vortex_tensor::vector::VectorMatcherMetadata::dimensions(&self) -> u32

pub fn vortex_tensor::vector::VectorMatcherMetadata::element_ptype(&self) -> vortex_array::dtype::ptype::PType

pub fn vortex_tensor::vector::VectorMatcherMetadata::list_size(&self) -> usize

pub fn vortex_tensor::vector::VectorMatcherMetadata::try_new(element_ptype: vortex_array::dtype::ptype::PType, dimensions: u32) -> vortex_error::VortexResult<Self>

impl core::clone::Clone for vortex_tensor::vector::VectorMatcherMetadata
Expand All @@ -600,12 +610,8 @@ impl core::marker::StructuralPartialEq for vortex_tensor::vector::VectorMatcherM

pub mod vortex_tensor::vector_search

pub fn vortex_tensor::vector_search::build_constant_query_vector<T: vortex_array::dtype::ptype::NativePType + core::convert::Into<vortex_array::scalar::typed_view::primitive::pvalue::PValue>>(query: &[T], num_rows: usize) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub fn vortex_tensor::vector_search::build_similarity_search_tree<T: vortex_array::dtype::ptype::NativePType + core::convert::Into<vortex_array::scalar::typed_view::primitive::pvalue::PValue>>(data: vortex_array::array::erased::ArrayRef, query: &[T], threshold: T) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub fn vortex_tensor::vector_search::compress_turboquant(data: vortex_array::array::erased::ArrayRef, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::erased::ArrayRef>

pub const vortex_tensor::SCALAR_FN_ARRAY_TENSOR_PLUGIN_ENV: &str

pub fn vortex_tensor::initialize(session: &vortex_session::VortexSession)
60 changes: 46 additions & 14 deletions vortex-tensor/src/encodings/turboquant/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ use vortex_array::ArrayView;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::arrays::Extension;
use vortex_array::arrays::ExtensionArray;
use vortex_array::arrays::FixedSizeListArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::dict::DictArray;
use vortex_array::arrays::extension::ExtensionArrayExt;
use vortex_array::arrays::fixed_size_list::FixedSizeListArrayExt;
use vortex_array::arrays::scalar_fn::ScalarFnArrayExt;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::extension::ExtDType;
use vortex_array::extension::EmptyMetadata;
use vortex_array::validity::Validity;
use vortex_buffer::Buffer;
use vortex_buffer::BufferMut;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
Expand All @@ -35,6 +34,8 @@ use crate::encodings::turboquant::MIN_DIMENSION;
use crate::encodings::turboquant::centroids::compute_centroid_boundaries;
use crate::encodings::turboquant::centroids::find_nearest_centroid;
use crate::encodings::turboquant::centroids::get_centroids;
use crate::scalar_fns::l2_denorm::L2Denorm;
use crate::scalar_fns::l2_denorm::normalize_as_l2_denorm;
use crate::scalar_fns::l2_denorm::validate_l2_normalized_rows;
use crate::scalar_fns::sorf_transform::SorfMatrix;
use crate::scalar_fns::sorf_transform::SorfOptions;
Expand Down Expand Up @@ -136,10 +137,8 @@ fn build_quantized_fsl(
padded_dim: usize,
) -> VortexResult<ArrayRef> {
let codes = PrimitiveArray::new::<u8>(all_indices.freeze(), Validity::NonNullable);

let mut centroids_buf = BufferMut::<f32>::with_capacity(centroids.len());
centroids_buf.extend_from_slice(centroids);
let centroids_array = PrimitiveArray::new::<f32>(centroids_buf.freeze(), Validity::NonNullable);
let centroids_array =
PrimitiveArray::new::<f32>(Buffer::copy_from(centroids), Validity::NonNullable);

let dict = DictArray::try_new(codes.into_array(), centroids_array.into_array())?;

Expand Down Expand Up @@ -240,7 +239,7 @@ pub unsafe fn turboquant_encode_unchecked(
Validity::NonNullable,
0,
)?;
let empty_padded_vector = wrap_padded_as_vector(empty_fsl.into_array())?;
let empty_padded_vector = Vector::wrap_storage(empty_fsl.into_array())?;

let sorf_options = SorfOptions {
seed,
Expand All @@ -256,7 +255,7 @@ pub unsafe fn turboquant_encode_unchecked(
let core = turboquant_quantize_core(&fsl, seed, config.bit_width, config.num_rounds, ctx)?;
let quantized_fsl =
build_quantized_fsl(num_rows, core.all_indices, &core.centroids, core.padded_dim)?;
let padded_vector = wrap_padded_as_vector(quantized_fsl)?;
let padded_vector = Vector::wrap_storage(quantized_fsl)?;

let sorf_options = SorfOptions {
seed,
Expand All @@ -267,9 +266,42 @@ pub unsafe fn turboquant_encode_unchecked(
Ok(SorfTransform::try_new_array(&sorf_options, padded_vector, num_rows)?.into_array())
}

/// Wrap an `FSL<f32, padded_dim>` in a [`Vector`](crate::vector::Vector) extension so it can be
/// passed as the child of [`SorfTransform`], which expects a `Vector<padded_dim>` input.
fn wrap_padded_as_vector(fsl: ArrayRef) -> VortexResult<ArrayRef> {
let ext_dtype = ExtDType::<Vector>::try_new(EmptyMetadata, fsl.dtype().clone())?.erased();
Ok(ExtensionArray::new(ext_dtype, fsl).into_array())
/// Apply the full TurboQuant compression pipeline to a [`Vector`](crate::vector::Vector)
/// extension array: normalize the rows via [`normalize_as_l2_denorm`], quantize the normalized
/// child via [`turboquant_encode_unchecked`], and reattach the stored norms as the outer
/// [`L2Denorm`] wrapper.
///
/// The returned array has the canonical TurboQuant shape:
///
/// ```text
/// ScalarFnArray(L2Denorm, [
/// ScalarFnArray(SorfTransform, [FSL(Dict(codes, centroids))]),
/// norms,
/// ])
/// ```
///
/// # Errors
///
/// Returns an error if `input` is not a tensor-like extension array, if normalization fails, or
/// if [`turboquant_encode_unchecked`] rejects the input shape.
pub fn turboquant_compress(
input: ArrayRef,
config: &TurboQuantConfig,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let l2_denorm = normalize_as_l2_denorm(input, ctx)?;
let normalized = l2_denorm.child_at(0).clone();
let norms = l2_denorm.child_at(1).clone();
let num_rows = l2_denorm.len();

let normalized_ext = normalized
.as_opt::<Extension>()
.vortex_expect("normalize_as_l2_denorm always produces an Extension array child");

// SAFETY: `normalize_as_l2_denorm` guarantees every row is unit-norm (or zero for null rows).
let tq = unsafe { turboquant_encode_unchecked(normalized_ext, config, ctx) }?;

// SAFETY: TurboQuant is a lossy approximation of the normalized child, so we intentionally
// bypass the strict normalized-row validation when reattaching the stored norms.
Ok(unsafe { L2Denorm::new_array_unchecked(tq, norms, num_rows) }?.into_array())
}
26 changes: 6 additions & 20 deletions vortex-tensor/src/encodings/turboquant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,13 @@
//! ```
//! use vortex_array::IntoArray;
//! use vortex_array::VortexSessionExecute;
//! use vortex_array::arrays::ExtensionArray;
//! use vortex_array::arrays::FixedSizeListArray;
//! use vortex_array::arrays::PrimitiveArray;
//! use vortex_array::arrays::Extension;
//! use vortex_array::arrays::scalar_fn::ScalarFnArrayExt;
//! use vortex_array::dtype::extension::ExtDType;
//! use vortex_array::extension::EmptyMetadata;
//! use vortex_array::session::ArraySession;
//! use vortex_array::validity::Validity;
//! use vortex_buffer::BufferMut;
//! use vortex_array::session::ArraySession;
//! use vortex_session::VortexSession;
//! use vortex_tensor::encodings::turboquant::{TurboQuantConfig, turboquant_encode_unchecked};
//! use vortex_tensor::scalar_fns::l2_denorm::normalize_as_l2_denorm;
//! use vortex_tensor::encodings::turboquant::{TurboQuantConfig, turboquant_compress};
//! use vortex_tensor::vector::Vector;
//!
//! // Create a Vector extension array of 100 random 128-d vectors.
Expand All @@ -118,22 +112,13 @@
//! let fsl = FixedSizeListArray::try_new(
//! elements.into_array(), dim, Validity::NonNullable, num_rows,
//! ).unwrap();
//! let ext_dtype = ExtDType::<Vector>::try_new(EmptyMetadata, fsl.dtype().clone())
//! .unwrap().erased();
//! let ext = ExtensionArray::new(ext_dtype, fsl.into_array());
//! let vector = Vector::wrap_storage(fsl.into_array()).unwrap();
//!
//! // Normalize, then quantize the normalized child at 2 bits per coordinate.
//! // Normalize and quantize at 2 bits per coordinate in one pass.
//! let session = VortexSession::empty().with::<ArraySession>();
//! let mut ctx = session.create_execution_ctx();
//! let l2_denorm = normalize_as_l2_denorm(ext.into_array(), &mut ctx).unwrap();
//! let normalized = l2_denorm.child_at(0).clone();
//!
//! let normalized_ext = normalized.as_opt::<Extension>().unwrap();
//! let config = TurboQuantConfig { bit_width: 2, seed: Some(42), num_rounds: 3 };
//! // SAFETY: We just normalized the input.
//! let tq = unsafe {
//! turboquant_encode_unchecked(normalized_ext, &config, &mut ctx).unwrap()
//! };
//! let tq = turboquant_compress(vector, &config, &mut ctx).unwrap();
//!
//! // Verify compression: 100 vectors x 128 dims x 4 bytes = 51200 bytes input.
//! assert!(tq.nbytes() < 51200);
Expand All @@ -144,6 +129,7 @@ pub(crate) mod compress;

mod scheme;
pub use compress::TurboQuantConfig;
pub use compress::turboquant_compress;
pub use compress::turboquant_encode;
pub use compress::turboquant_encode_unchecked;
pub use scheme::TurboQuantScheme;
Expand Down
40 changes: 4 additions & 36 deletions vortex-tensor/src/encodings/turboquant/scheme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

//! TurboQuant compression scheme.
//!
//! The scheme first normalizes the input via [`normalize_as_l2_denorm`], then encodes the
//! normalized child via [`turboquant_encode_unchecked`]. The result is:
//! The scheme is a thin [`Scheme`] adapter over [`turboquant_compress`], which produces:
//!
//! ```text
//! ScalarFnArray(L2Denorm, [
Expand All @@ -18,14 +17,10 @@
//!
//! Decompression is automatic: executing the outer array walks the ScalarFn tree.
//!
//! [`normalize_as_l2_denorm`]: crate::scalar_fns::l2_denorm::normalize_as_l2_denorm
//! [`turboquant_encode_unchecked`]: crate::encodings::turboquant::turboquant_encode_unchecked
//! [`turboquant_compress`]: crate::encodings::turboquant::turboquant_compress

use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::IntoArray;
use vortex_array::arrays::Extension;
use vortex_array::arrays::scalar_fn::ScalarFnArrayExt;
use vortex_compressor::CascadingCompressor;
use vortex_compressor::ctx::CompressorContext;
use vortex_compressor::estimate::CompressionEstimate;
Expand All @@ -38,9 +33,7 @@ use vortex_error::VortexResult;
use crate::encodings::turboquant::MAX_CENTROIDS;
use crate::encodings::turboquant::TurboQuantConfig;
use crate::encodings::turboquant::tq_validate_vector_dtype;
use crate::encodings::turboquant::turboquant_encode_unchecked;
use crate::scalar_fns::l2_denorm::L2Denorm;
use crate::scalar_fns::l2_denorm::normalize_as_l2_denorm;
use crate::encodings::turboquant::turboquant_compress;

/// TurboQuant compression scheme for [`Vector`] extension types.
///
Expand Down Expand Up @@ -105,33 +98,8 @@ impl Scheme for TurboQuantScheme {
data: &mut ArrayAndStats,
_ctx: CompressorContext,
) -> VortexResult<ArrayRef> {
let ext_array = data
.array()
.as_opt::<Extension>()
.vortex_expect("expected an extension array");

let mut ctx = compressor.execution_ctx();

// 1. Normalize: produces L2Denorm(normalized_vectors, norms).
let l2_denorm = normalize_as_l2_denorm(ext_array.as_ref().clone(), &mut ctx)?;
let normalized = l2_denorm.child_at(0).clone();
let norms = l2_denorm.child_at(1).clone();
let num_rows = l2_denorm.len();

// 2. Quantize the normalized child: SorfTransform(FSL(Dict)).
let normalized_ext = normalized
.as_opt::<Extension>()
.vortex_expect("normalized child should be an Extension array");

let config = TurboQuantConfig::default();
// SAFETY: We just normalized the input via `normalize_as_l2_denorm`, so all rows are
// guaranteed to be unit-norm (or zero for originally-null rows).
let sorf_dict = unsafe { turboquant_encode_unchecked(normalized_ext, &config, &mut ctx)? };

// 3. Wrap back in L2Denorm: the SorfTransform is the "normalized" child.
// SAFETY: TurboQuant is a lossy approximation of the normalized child, so we intentionally
// bypass the strict normalized-row validation when reattaching the stored norms.
Ok(unsafe { L2Denorm::new_array_unchecked(sorf_dict, norms, num_rows) }?.into_array())
turboquant_compress(data.array().clone(), &TurboQuantConfig::default(), &mut ctx)
}
}

Expand Down
10 changes: 5 additions & 5 deletions vortex-tensor/src/encodings/turboquant/tests/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn slice_preserves_data() -> VortexResult<()> {
num_rounds: 4,
};
let mut ctx = SESSION.create_execution_ctx();
let encoded = normalize_and_encode(&ext, &config, &mut ctx)?;
let encoded = turboquant_compress(ext, &config, &mut ctx)?;

// Full decompress then slice.
let mut ctx = SESSION.create_execution_ctx();
Expand Down Expand Up @@ -89,7 +89,7 @@ fn scalar_at_matches_decompress() -> VortexResult<()> {
num_rounds: 2,
};
let mut ctx = SESSION.create_execution_ctx();
let encoded = normalize_and_encode(&ext, &config, &mut ctx)?;
let encoded = turboquant_compress(ext, &config, &mut ctx)?;

let full_decoded = encoded.clone().execute::<ExtensionArray>(&mut ctx)?;

Expand All @@ -112,7 +112,7 @@ fn l2_norm_readthrough() -> VortexResult<()> {
num_rounds: 5,
};
let mut ctx = SESSION.create_execution_ctx();
let encoded = normalize_and_encode(&ext, &config, &mut ctx)?;
let encoded = turboquant_compress(ext, &config, &mut ctx)?;
let (_sorf_child, norms_child) = unwrap_l2denorm(&encoded);

// Stored norms should match the actual L2 norms of the input.
Expand Down Expand Up @@ -150,7 +150,7 @@ fn l2_norm_readthrough_is_authoritative_for_lossy_storage() -> VortexResult<()>
num_rounds: 3,
};
let mut ctx = SESSION.create_execution_ctx();
let encoded = normalize_and_encode(&ext, &config, &mut ctx)?;
let encoded = turboquant_compress(ext, &config, &mut ctx)?;
let (_sorf_child, norms_child) = unwrap_l2denorm(&encoded);

let stored_norms: PrimitiveArray = norms_child.execute(&mut ctx)?;
Expand Down Expand Up @@ -187,7 +187,7 @@ fn cosine_similarity_readthrough_is_authoritative_for_lossy_storage() -> VortexR
num_rounds: 3,
};
let mut ctx = SESSION.create_execution_ctx();
let encoded = normalize_and_encode(&ext, &config, &mut ctx)?;
let encoded = turboquant_compress(ext, &config, &mut ctx)?;

let encoded_cos =
execute_cosine_similarity(encoded.clone(), encoded.clone(), num_rows, &mut ctx)?;
Expand Down
Loading
Loading