diff --git a/FSSTVIEW_HANDOVER.md b/FSSTVIEW_HANDOVER.md new file mode 100644 index 00000000000..30602f3ac14 --- /dev/null +++ b/FSSTVIEW_HANDOVER.md @@ -0,0 +1,145 @@ +# FSSTView — Handover + +## TL;DR + +Added a new **`FSSTView`** array encoding to Vortex: a ListView-style FSST that addresses its +compressed codes with separate per-element `offsets` + `ends` arrays instead of one monotonic +offsets array. This makes `filter` / `take` / `slice` **metadata-only** (rewrite only small index +arrays, reuse the compressed byte heap), where plain `FSST` rewrites the whole compressed code heap +per op. The decode cost moves to a single canonicalization at the end. + +Storing the per-element **end offset** (rather than the size) makes the `FSST` → `FSSTView` +conversion allocation-free — both addressing arrays are zero-copy slices of the FSST's existing +offsets — which **eliminated the conversion floor** that previously made the view 9–16× slower than +`fsst` on tiny highly selective `url` predicates (see "Conversion floor — resolved" below). + +- **Branch:** `claude/fsstview-conversion-floor-kRAeg` (built on the original + `claude/fsstview-array-listview-TdW45`). +- **Status:** merge-ready. 105 tests pass, `clippy --all-targets --all-features` clean, + `cargo +nightly fmt` clean, `vortex-file` builds, doc tests pass. +- **No PR opened yet** (was waiting on explicit request). +- **Scope:** additive, contained in `encodings/fsst/` plus 2 registration lines in `vortex-file`. + +## What landed + +New encoding `vortex.fsstview` in `encodings/fsst/src/fsstview/`: + +| file | role | +| --- | --- | +| `array.rs` | encoding struct, `#[array_slots]` children (uncompressed_lengths, codes_offsets, codes_ends, codes_validity), VTable, serde, allocation-free `fsstview_from_fsst` conversion | +| `compute.rs` | metadata-only `FilterKernel` + `TakeExecute` | +| `ops.rs` | `scalar_at` | +| `slice.rs` | metadata-only `SliceReduce` | +| `from_fsst.rs` | `fsst_filter_to_view` / `fsst_take_to_view` helpers | +| `canonical.rs` | decode → `VarBinViewArray` / `VarBinArray`, with the `Auto` export strategy | +| `kernel.rs` / `rules.rs` | parent kernel + rule registration | +| `tests.rs` | conformance + agreement + nullable/gapped/RunDecode coverage + zero-copy conversion guard | + +Registered in `vortex-file/src/lib.rs` (`register_default_encodings`). Public API: +`FSSTView`, `FSSTViewArray`, `FsstViewCompaction`, `canonicalize_fsstview_with`, +`canonicalize_fsstview_to_varbin`, `fsst_filter_to_view`, `fsst_take_to_view`, `fsstview_from_fsst`. + +## Canonicalization strategy (`FsstViewCompaction::Auto`) + +After metadata-only ops the survivors are scattered in the original heap; `Auto` picks how to +decode from the survivor layout: + +- **Direct** — one contiguous run (untouched / sliced): single bulk decode, no copy. +- **RunDecode** — offsets monotonic, few runs (clustered/range filters, sorted takes): decode each + contiguous run straight into the element-ordered output, no gather copy. Threshold: + `runs <= len / 4`. +- **GatherBulk** — scattered (shuffle take) or fragmented (uniform-random filter): compact live + codes into one buffer, single bulk decode. + +`RunDecode` and the gather coalescing came from the optimization work; `PerElement` and +`RunCoalesce` were explored, proven worse, and removed before merge. + +## Benchmarks & results + +Two benches in `encodings/fsst/benches/` (full write-up in `benches/README.md`). All numbers are +divan **medians**, 100 samples, single shared machine — directional, relative ordering stable. + +1. **`fsst_view_compute`** — synthetic, no external data, **runs in CI**. ~2 MiB strings, ManyShort + (~12 B) / FewLong (~256 B). Single filter and a 5-op chain → VarBinView. The chain is where the + view's advantage compounds (each `fsst` op re-rewrites the heap; the view stays metadata-only): + - chain FewLong: fsst 371 µs → view **268 µs** (1.4×); chain ManyShort 4.99 ms → **4.12 ms**. + +2. **`fsst_view_fineweb_queries`** — the real `vortex-bench` query predicates (`dump = ...`, + `date LIKE '2020-10-%'`, `url/text LIKE '%google%'`, `'% vortex %'`, espn filters), evaluated + in DuckDB to authentic per-row masks, then materialize the column → VarBinView. Numbers below + are a same-machine before/after (old `sizes` representation → new `ends` representation): + - text/date_prefix (12%): fsst 69.3 ms vs view **41.4 ms** (1.67×; was 41.0 ms — held) + - text/dump_eq (7%): fsst 42.6 ms vs view **25.3 ms** (1.68×; was 25.3 ms — held) + - url/vortex (0.04%): fsst 8.6 µs vs view **9.1 µs** (was view 140 µs — floor removed) + - url/espn_and (0.08%): fsst 14.5 µs vs view **14.9 µs** (was view 146 µs) + - text/espn_and (0.08%): fsst 284 µs vs view **271 µs** (was view 407 µs — flips to a view win) + +With the `ends` representation the view now **wins or ties every query** in the matrix: the bulk / +clustered / long-`text` cases still win by skipping the per-op heap rewrite (up to 1.68× here), and +the tiny highly selective predicates that used to lose to the conversion floor now match `fsst` to +within noise. Full table in `benches/README.md`. + +### Reproducing the FineWeb queries bench + +The ~2 GB sample is **not** downloaded by the bench. Extract columns + query masks once: + +```bash +pip install duckdb +python3 encodings/fsst/benches/fineweb_queries_extract.py # writes /tmp/fw_*.bin +FINEWEB_DIR=/tmp cargo bench -p vortex-fsst --bench fsst_view_fineweb_queries +``` + +The bench no-ops (CI-safe) when `FINEWEB_DIR` is unset. + +## Conversion floor — resolved + +The view's one previous weakness was a **fixed conversion cost on highly selective filters**: the +original `fsstview_from_fsst` derived a full `sizes` array (`offsets[i+1] - offsets[i]` over all +rows) even when a predicate kept <1% of rows. Samply + cachegrind had pinned this as the top +wall-clock cost (~130–150 µs floor) on the `url`-selective queries — a memory-bandwidth-bound loop +streaming `len * 8` bytes. + +**Fix (this branch): store the end offset, not the size.** `codes_sizes` was replaced by +`codes_ends`, where `codes_ends[i] = codes_offsets[i] + size[i]`. Because a freshly converted heap +is contiguous (element `i` occupies `offsets[i]..offsets[i+1]`), **both** addressing arrays are now +zero-copy slices of the FSST's existing monotonic offsets buffer +(`codes_offsets = offsets[0..len]`, `codes_ends = offsets[1..len+1]`). The conversion allocates and +copies nothing; no per-row `sizes` array is materialized, so a selective `filter`/`take` never pays +to derive sizes for the rows it discards. The per-element size is recovered as +`codes_ends[i] - codes_offsets[i]` only at canonicalize / `scalar_at`, over the survivors only. + +This keeps `filter`/`take`/`slice` metadata-only and composable across a chain (they carry +`codes_ends` alongside `codes_offsets`); the conversion is **not** fused into the filter. Measured +result (same-machine before/after, `fsst_view_fineweb_queries`): `url/vortex` 140 µs → **9.1 µs**, +`url/espn_and` 146 µs → **14.9 µs**, and the previously winning clustered cases (`text/dump_eq`, +`text/date_prefix`) held flat. The view now wins or ties every query in the matrix. + +A regression guard (`conversion_shares_offsets_buffer_zero_copy` in `tests.rs`) asserts the +structural invariant the fix relies on: a freshly converted view's `codes_ends` slice begins exactly +one element past `codes_offsets` in the *same allocation*. This catches a silent revert to a +size-materializing conversion — which the value/agreement tests would not, since the decoded values +would still match — without depending on the FineWeb bench (gated out of CI). + +The alternative follow-up (store `sizes` in the narrowest int width) was considered and rejected: +it only halves the *write* traffic, leaving the unavoidable full read of the offsets — whereas the +`ends` representation removes the whole O(rows) pass. Narrowing widths is orthogonal and can still +be layered on the file layer's compression if desired. + +## Verification commands + +```bash +cargo nextest run -p vortex-fsst # (or cargo test -p vortex-fsst) — 105 pass +cargo clippy -p vortex-fsst --all-targets --all-features +cargo clippy -p vortex-file +cargo +nightly fmt --all +``` + +## Methodology notes (for whoever continues) + +- `perf` is unavailable in the dev sandbox (kernel mismatch). Use **samply** (set + `/proc/sys/kernel/perf_event_paranoid` to 1) for wall-clock sampling and **cachegrind** for + cache/instruction modeling. Build the profiled example with + `RUSTFLAGS="-C force-frame-pointers=yes -C debuginfo=2"` and resolve addresses with `addr2line`. +- Caution learned the hard way: **instruction count is not time.** A 12× instruction-count + reduction in the conversion barely moved wall-clock; always confirm with a sampling profiler and + a realistic workload (real FineWeb columns, real query masks), not synthetic micro-loops. diff --git a/FSSTVIEW_NEXT_PROMPT.md b/FSSTVIEW_NEXT_PROMPT.md new file mode 100644 index 00000000000..296cc86263a --- /dev/null +++ b/FSSTVIEW_NEXT_PROMPT.md @@ -0,0 +1,42 @@ +# Copy-paste prompt to continue the FSSTView work + +Paste the block below to a fresh agent session in the Vortex repo. + +--- + +Continue work on the `FSSTView` encoding in the Vortex repo. It's already implemented and +merge-ready on branch `claude/fsstview-array-listview-TdW45` (17 commits ahead of `develop`). +Read `FSSTVIEW_HANDOVER.md` and `encodings/fsst/benches/README.md` first for full context and +benchmark numbers. + +Background: `FSSTView` (in `encodings/fsst/src/fsstview/`) is a ListView-style FSST that stores +compressed codes addressed by separate `offsets` + `sizes` arrays, making `filter`/`take`/`slice` +metadata-only (no code-heap rewrite). On real FineWeb data the view wins up to 8.6× on chained +ops over long strings. Its one measured weakness: on highly selective predicates over short +columns it pays a fixed ~130 µs floor because `fsstview_from_fsst` derives the full `sizes` array +(over all rows) even when <1% survive. + +Task: eliminate that conversion floor without regressing the cases the view already wins. Approach: + +1. Confirm the current behaviour first: run + `python3 encodings/fsst/benches/fineweb_queries_extract.py` (needs `pip install duckdb`, network + to HuggingFace), then + `FINEWEB_DIR=/tmp cargo bench -p vortex-fsst --bench fsst_view_fineweb_queries`. Note the + `url/vortex`, `url/google_and`, `url/espn_*` rows where `view` trails `fsst`. +2. Implement a cheaper `sizes` representation so a selective filter doesn't materialize sizes for + discarded rows — e.g. derive `sizes` lazily from `offsets` at canonicalize time, or store it in + the narrowest int width that fits. `filter`/`take` currently filter a concrete `codes_sizes` + child array, so whatever you choose must keep those ops metadata-only and still composable + across a chain (do NOT fuse conversion into filter). +3. Prove it with the same methodology, not instruction counts: samply (set + `perf_event_paranoid=1`) for wall-clock and the real `fsst_view_fineweb_queries` bench. Show the + selective `url` queries improve AND the winning cases (`chain text`, `dump_eq`, `date_prefix`) + do not regress. +4. Keep it merge-clean: `cargo test -p vortex-fsst` (107 tests), `cargo clippy -p vortex-fsst + --all-targets --all-features`, `cargo +nightly fmt --all`. Add/adjust tests for any new + representation. Update `benches/README.md` and `FSSTVIEW_HANDOVER.md` with new numbers. Commit + with sign-off `Signed-off-by: Joe Isaacs ` and push to the same branch. + +Be rigorous about measurement: instruction count is not time, and synthetic micro-loops mislead — +always validate on the real FineWeb columns/query masks. If a change doesn't actually help the real +workload, say so and revert it rather than shipping it. diff --git a/encodings/fsst/Cargo.toml b/encodings/fsst/Cargo.toml index 0d722a131d3..e2881ae4742 100644 --- a/encodings/fsst/Cargo.toml +++ b/encodings/fsst/Cargo.toml @@ -56,5 +56,13 @@ name = "chunked_dict_fsst_builder" harness = false required-features = ["_test-harness"] +[[bench]] +name = "fsst_view_compute" +harness = false + +[[bench]] +name = "fsst_view_fineweb_queries" +harness = false + [package.metadata.cargo-machete] ignored = ["fsst-rs"] diff --git a/encodings/fsst/benches/README.md b/encodings/fsst/benches/README.md new file mode 100644 index 00000000000..ec02eeba992 --- /dev/null +++ b/encodings/fsst/benches/README.md @@ -0,0 +1,119 @@ + + +# FSSTView benchmarks + +`FSSTView` is a ListView-style FSST: it addresses its compressed codes with separate +`offsets` + `sizes` arrays instead of a single monotonic offsets array. That makes +`filter` / `take` / `slice` **metadata-only** (they rewrite only the small +offsets/sizes/lengths/validity arrays and reuse the compressed byte heap), whereas plain +`FSST` delegates those ops to `VarBin` and **rewrites the whole compressed code heap** each +time. The cost moves to a single canonicalization (decode → `VarBinViewArray`) at the end. + +These benchmarks quantify that trade-off. All numbers are divan **medians**, 100 samples, on +one shared machine — treat them as directional; the relative ordering is stable. `fsst` = +stay in `FSST` (rewrite heap per op); `view` = convert to `FSSTView`, metadata-only ops, +decode once. + +## 1. `fsst_view_compute` — synthetic shapes + +Self-contained (no external data). ~2 MiB of synthetic strings in two shapes — `ManyShort` +(~12 B) and `FewLong` (~256 B) — with a clustered 10 % filter and a sorted take. Two +workloads, each ending in a `VarBinViewArray`: + +- `single_filter_{fsst,view}` — one filter, then canonicalize. +- `chain_{fsst,view}` — convert once, then 5 alternating filter/take ops, canonicalize once + (the case the view is designed for). + +| workload | shape | fsst | view | speedup | +| --- | --- | --- | --- | --- | +| single_filter | ManyShort | 0.63 ms | 0.62 ms | ~1× | +| single_filter | FewLong | 65 µs | 53 µs | 1.2× | +| chain (5 ops) | ManyShort | 4.99 ms | 4.12 ms | 1.2× | +| chain (5 ops) | FewLong | 371 µs | 268 µs | 1.4× | + +Takeaway: the gap widens with chain length, because each `fsst` op re-rewrites the heap while +the view stays metadata-only and defers the single decode. + +## 2. `fsst_view_fineweb_queries` — real query predicates + +The actual `vortex-bench` FineWeb queries are `SELECT * FROM fineweb WHERE `. Each +predicate is evaluated once in DuckDB against the real sample to produce an authentic per-row +selection mask (recipe: `benches/fineweb_queries_extract.py`); the bench applies that mask to +the FSST-compressed `url`/`text` column and decodes to a `VarBinViewArray`. This is the +materialization half of a real query. No-ops unless `FINEWEB_DIR` points at the dumps. + +Mask shapes vary by predicate (over 200 k rows): `dump_eq` 7 %/177 runs and `date_prefix` +12 %/178 runs are clustered; `google_or` 2 %/4046 runs is scattered; `vortex`/`espn` are +~0.04–0.09 % and tiny. + +The `view (before)` column is the original representation, which derived a full `sizes` array in +`fsstview_from_fsst` (one i64 per row, materialized over **all** 200 k rows regardless of +selectivity). The `view` column stores the per-element **end offset** instead — a zero-copy slice +of the FSST's existing monotonic offsets — so the conversion allocates nothing and a selective +predicate never pays to derive sizes for the rows it discards (see "Conversion is allocation-free" +below). `fsst` is unchanged by this work; its small run-to-run drift is machine noise (the two +measurement runs were back-to-back on a shared machine). + +| query (selectivity) | column | fsst | view (before) | view | winner | +| --- | --- | --- | --- | --- | --- | +| date_prefix (12 %) | text | 69.3 ms | 41.0 ms | **41.4 ms** | view 1.67× | +| dump_eq (7 %) | text | 42.6 ms | 25.3 ms | **25.3 ms** | view 1.68× | +| google_or (2 %) | text | 23.9 ms | 23.7 ms | **19.8 ms** | view 1.2× | +| google_and (0.19 %) | text | 708 µs | 782 µs | **642 µs** | view | +| vortex (0.04 %) | text | 529 µs | 606 µs | **456 µs** | view | +| espn_and (0.08 %) | text | 284 µs | 407 µs | **271 µs** | view | +| espn_or (0.09 %) | text | 650 µs* | 418 µs | **281 µs** | view | +| date_prefix (12 %) | url | 1.68 ms | 1.39 ms | **1.25 ms** | view 1.34× | +| dump_eq (7 %) | url | 1.11 ms | 944 µs | **881 µs** | view 1.25× | +| google_or (2 %) | url | 398 µs | 478 µs | **331 µs** | view 1.2× | +| google_and (0.19 %) | url | 30.2 µs | 173 µs | **28.7 µs** | view | +| espn_and (0.08 %) | url | 14.5 µs | 146 µs | **14.9 µs** | ~tie | +| espn_or (0.09 %) | url | 16.4 µs | 152 µs | **16.0 µs** | ~tie | +| vortex (0.04 %) | url | 8.6 µs | 140 µs | **9.1 µs** | ~tie | + +(divan medians. `*` `text/espn_or` `fsst` was noisy that run — fastest 283 µs, mean 578 µs.) + +Takeaway: + +- **The conversion floor is gone.** Every highly selective `url` predicate that previously trailed + `fsst` by 9–16× — it paid a fixed ~140 µs to walk all 200 k offsets building `sizes` even when + <0.2 % of rows survived — now matches `fsst` to within noise (`url/vortex` 140 µs → **9.1 µs**, + `url/espn_and` 146 µs → **14.9 µs**). The same floor that quietly taxed the *short selective + `text`* predicates (`text/vortex`, `text/espn_*`, `text/google_and`) is also gone, flipping each + of those from an `fsst` win to a `view` win. +- **The winning cases do not regress.** The clustered/bulk selections the view was already built + for hold or improve: `text/dump_eq` and `text/date_prefix` stay at ~1.67–1.68× (the decode, not + the conversion, dominates them), while `url/date_prefix`, `url/dump_eq`, and both `google_or` + columns get a touch faster because the conversion no longer allocates. + +With the floor removed the view now wins or ties **every** query in this matrix. + +## Conversion is allocation-free + +`FSSTView` stores the per-element **end offset** (`codes_ends[i] = offset[i] + size[i]`) rather +than the size. A freshly converted heap is contiguous, so element `i` occupies +`offsets[i]..offsets[i + 1]`, which means **both** addressing arrays are zero-copy slices of the +FSST's existing monotonic offsets buffer: `codes_offsets = offsets[0..len]` and +`codes_ends = offsets[1..len + 1]`. `fsstview_from_fsst` therefore allocates and copies nothing — +in particular it never materializes a per-row `sizes` array, so a selective `filter`/`take` that +keeps a handful of rows no longer pays an O(rows) cost to derive sizes for the rows it discards. +The per-element size is recovered as `codes_ends[i] - codes_offsets[i]` only where it is needed +(canonicalize / `scalar_at`), over the survivors only. `filter`/`take`/`slice` stay metadata-only +and compose across a chain exactly as before — they now carry `codes_ends` alongside +`codes_offsets` instead of `codes_sizes`. + +## How `Auto` chooses the decode + +Canonicalization picks a decode strategy from the survivor layout (`FsstViewCompaction::Auto`): + +- **Direct** — survivors are one contiguous run (untouched / sliced): one bulk decode, no copy. +- **RunDecode** — offsets still monotonic with few runs (clustered/range filters, sorted + takes): decode each contiguous run straight into the element-ordered output, no gather copy. +- **GatherBulk** — scattered (shuffle take) or heavily fragmented (uniform-random filter): + compact the live codes into one buffer, then a single bulk decode. + +The threshold (`runs <= len / 4` → RunDecode, else GatherBulk) was calibrated with the +synthetic `fsst_view_compute` shapes. diff --git a/encodings/fsst/benches/fineweb_queries_extract.py b/encodings/fsst/benches/fineweb_queries_extract.py new file mode 100644 index 00000000000..8ca1bed1ca1 --- /dev/null +++ b/encodings/fsst/benches/fineweb_queries_extract.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright the Vortex contributors +# +# One-time extraction for the `fsst_view_fineweb_queries` benchmark. +# +# Materializes, from the real HuggingFace FineWeb 10BT sample (the same file `vortex-bench` uses), +# the `url` and `text` string columns plus a per-row selection mask for each real benchmark +# predicate. Writes them as the simple length-prefixed / byte-per-row formats the bench reads. +# +# pip install duckdb +# python3 fineweb_queries_extract.py # -> /tmp/fw_url.bin, fw_text.bin, fw_mask_*.bin +# FINEWEB_DIR=/tmp cargo bench -p vortex-fsst --bench fsst_view_fineweb_queries +# +# The sample is ~2 GB; DuckDB streams it over HTTP range reads, so only the first N rows are read. + +import os +import struct + +import duckdb + +SRC = "https://huggingface.co/datasets/HuggingFaceFW/fineweb/resolve/v1.4.0/sample/10BT/001_00000.parquet" +N = 200_000 +OUT = os.environ.get("FINEWEB_DIR", "/tmp") + +# The row-selecting `WHERE` clauses of the `SELECT *` FineWeb queries in vortex-bench. +# (`file_path LIKE '%/CC-MAIN-2014-%'` matches zero rows in this sample, so it is omitted.) +QUERIES = { + "dump_eq": "dump = 'CC-MAIN-2016-30'", + "date_prefix": "date LIKE '2020-10-%'", + "google_and": "url LIKE '%google%' AND text LIKE '%Google%'", + "google_or": "url LIKE '%.google.%' OR text LIKE '% Google %'", + "vortex": "text LIKE '% vortex %'", + "espn_and": "url LIKE '%espn%' AND language = 'en' AND language_score > 0.92", + "espn_or": "url LIKE '%espn%' OR url LIKE '%www.espn.go.com%' OR url LIKE '%espn.go.com%'", +} + + +def main() -> None: + con = duckdb.connect() + con.execute("INSTALL httpfs; LOAD httpfs;") + con.execute( + f"""CREATE TABLE fw AS + SELECT row_number() OVER () AS rid, url, text, dump, date, file_path, + language, language_score + FROM read_parquet('{SRC}') LIMIT {N}""" + ) + + def dump_col(col: str) -> None: + rows = con.execute(f"SELECT {col} FROM fw ORDER BY rid").fetchall() + path = os.path.join(OUT, f"fw_{col}.bin") + with open(path, "wb") as f: + f.write(struct.pack(" {path}") + + dump_col("url") + dump_col("text") + + for name, pred in QUERIES.items(): + rids = {r[0] for r in con.execute(f"SELECT rid FROM fw WHERE {pred}").fetchall()} + path = os.path.join(OUT, f"fw_mask_{name}.bin") + with open(path, "wb") as f: + f.write(struct.pack(" usize { + match self { + Shape::ManyShort => 12, + Shape::FewLong => 256, + } + } +} + +const SHAPES: &[Shape] = &[Shape::ManyShort, Shape::FewLong]; + +/// Build a ~2 MiB input from a small alphabet so FSST finds good symbols, with shared substrings +/// to mimic real string columns. +fn generate(shape: Shape) -> VarBinArray { + let mut rng = StdRng::seed_from_u64(42); + let avg_len = shape.avg_len(); + let count = TARGET_UNCOMPRESSED / avg_len; + const WORDS: &[&str] = &[ + "https://", "example", "vortex", ".com/", "path", "query=", "value", "data", "alpha", + "bravo", "charlie", "delta", "_", "-", "/", "0123", + ]; + let mut strings: Vec> = Vec::with_capacity(count); + for _ in 0..count { + let target = avg_len * rng.random_range(70..=130) / 100; + let mut s = String::with_capacity(target + 8); + while s.len() < target { + s.push_str(WORDS[rng.random_range(0..WORDS.len())]); + } + s.truncate(target.max(1)); + strings.push(s.into_bytes().into_boxed_slice()); + } + VarBinArray::from_iter( + strings.into_iter().map(Some), + DType::Utf8(Nullability::NonNullable), + ) +} + +fn compress(varbin: &VarBinArray, ctx: &mut ExecutionCtx) -> FSSTArray { + let compressor = fsst_train_compressor(varbin); + fsst_compress(varbin, varbin.len(), varbin.dtype(), &compressor, ctx) +} + +/// Clustered selection (32 bursts, ~`keep` fraction) — survivors form runs, the realistic shape. +fn clustered_mask(len: usize, keep: f64) -> Mask { + let mut rng = StdRng::seed_from_u64(9); + #[expect(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let total = (len as f64 * keep) as usize; + let burst_len = (total / 32).max(1); + let mut keep_set = vec![false; len]; + for _ in 0..32 { + let start = rng.random_range(0..len.saturating_sub(burst_len).max(1)); + for j in start..(start + burst_len).min(len) { + keep_set[j] = true; + } + } + Mask::from_iter(keep_set) +} + +/// Sorted-index take (~`keep` fraction) — an index lookup; preserves heap order. +fn sorted_take(len: usize, keep: f64) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(13); + #[expect(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + let n = (len as f64 * keep) as usize; + let mut idx: Vec = (0..n).map(|_| rng.random_range(0..len as u64)).collect(); + idx.sort_unstable(); + PrimitiveArray::from_iter(idx).into_array() +} + +fn fsst_filter(array: &FSSTArray, mask: &Mask, ctx: &mut ExecutionCtx) -> FSSTArray { + ::filter(array.as_view(), mask, ctx) + .unwrap() + .unwrap() + .try_downcast::() + .ok() + .unwrap() +} + +fn fsst_take(array: &FSSTArray, indices: &ArrayRef, ctx: &mut ExecutionCtx) -> FSSTArray { + ::take(array.as_view(), indices, ctx) + .unwrap() + .unwrap() + .try_downcast::() + .ok() + .unwrap() +} + +fn fsst_to_vbv(array: &FSSTArray, ctx: &mut ExecutionCtx) -> ArrayRef { + array + .clone() + .into_array() + .execute::(ctx) + .unwrap() + .into_array() +} + +// =============================== SINGLE FILTER -> VarBinView =================================== + +#[divan::bench(args = SHAPES)] +fn single_filter_fsst(bencher: Bencher, shape: Shape) { + let fsst = compress(&generate(shape), &mut LEGACY_SESSION.create_execution_ctx()); + let mask = clustered_mask(fsst.len(), 0.10); + bencher + .with_inputs(|| (&fsst, &mask, LEGACY_SESSION.create_execution_ctx())) + .bench_refs(|(fsst, mask, ctx)| { + let filtered = fsst_filter(fsst, mask, ctx); + black_box(fsst_to_vbv(&filtered, ctx)) + }); +} + +#[divan::bench(args = SHAPES)] +fn single_filter_view(bencher: Bencher, shape: Shape) { + let fsst = compress(&generate(shape), &mut LEGACY_SESSION.create_execution_ctx()); + let mask = clustered_mask(fsst.len(), 0.10); + bencher + .with_inputs(|| (&fsst, &mask, LEGACY_SESSION.create_execution_ctx())) + .bench_refs(|(fsst, mask, ctx)| { + let view = fsstview_from_fsst(fsst, ctx).unwrap(); + let filtered = ::filter(view.as_view(), mask, ctx) + .unwrap() + .unwrap() + .try_downcast::() + .ok() + .unwrap(); + black_box( + canonicalize_fsstview_with(filtered.as_view(), FsstViewCompaction::Auto, ctx) + .unwrap(), + ) + }); +} + +// =============================== CHAIN (convert once, N ops, export once) ====================== + +const CHAIN_LEN: usize = 5; + +#[divan::bench(args = SHAPES)] +fn chain_fsst(bencher: Bencher, shape: Shape) { + let fsst = compress(&generate(shape), &mut LEGACY_SESSION.create_execution_ctx()); + bencher + .with_inputs(|| (&fsst, LEGACY_SESSION.create_execution_ctx())) + .bench_refs(|(fsst, ctx)| { + let mut cur = (*fsst).clone(); + for op in 0..CHAIN_LEN { + if op % 2 == 0 { + let mask = clustered_mask(cur.len(), 0.80); + cur = fsst_filter(&cur, &mask, ctx); + } else { + let indices = sorted_take(cur.len(), 0.80); + cur = fsst_take(&cur, &indices, ctx); + } + } + black_box(fsst_to_vbv(&cur, ctx)) + }); +} + +#[divan::bench(args = SHAPES)] +fn chain_view(bencher: Bencher, shape: Shape) { + let fsst = compress(&generate(shape), &mut LEGACY_SESSION.create_execution_ctx()); + bencher + .with_inputs(|| (&fsst, LEGACY_SESSION.create_execution_ctx())) + .bench_refs(|(fsst, ctx)| { + // Convert once, then chain metadata-only ops, canonicalize once. + let mut cur = fsstview_from_fsst(fsst, ctx).unwrap(); + for op in 0..CHAIN_LEN { + let next = if op % 2 == 0 { + let mask = clustered_mask(cur.len(), 0.80); + ::filter(cur.as_view(), &mask, ctx) + .unwrap() + .unwrap() + } else { + let indices = sorted_take(cur.len(), 0.80); + ::take(cur.as_view(), &indices, ctx) + .unwrap() + .unwrap() + }; + cur = next.try_downcast::().ok().unwrap(); + } + black_box( + canonicalize_fsstview_with(cur.as_view(), FsstViewCompaction::Auto, ctx).unwrap(), + ) + }); +} diff --git a/encodings/fsst/benches/fsst_view_fineweb_queries.rs b/encodings/fsst/benches/fsst_view_fineweb_queries.rs new file mode 100644 index 00000000000..096ea30b763 --- /dev/null +++ b/encodings/fsst/benches/fsst_view_fineweb_queries.rs @@ -0,0 +1,183 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! FSST vs FSSTView materializing a string column under the **real FineWeb benchmark predicates**. +//! +//! The FineWeb queries in `vortex-bench` are `SELECT * FROM fineweb WHERE `: each one +//! evaluates a predicate to a row selection, then materializes the surviving rows. This bench does +//! exactly the materialization half — apply a real predicate's selection mask to an FSST-compressed +//! string column and decode it to a `VarBinViewArray` — comparing fsst (rewrites the code heap) +//! vs fsstview (metadata-only filter, decode once). +//! +//! The predicate masks and the string columns are produced once with DuckDB against the real +//! HuggingFace FineWeb 10BT sample (the same file `vortex-bench` uses). The ~2 GB sample is not +//! downloaded by the bench; the recipe is in `fineweb_queries_extract.py` next to this file, and +//! the resulting files are pointed at via env vars: +//! +//! ```text +//! FINEWEB_DIR=/tmp cargo bench -p vortex-fsst --bench fsst_view_fineweb_queries +//! ``` +//! +//! `FINEWEB_DIR` must contain `fw_url.bin`, `fw_text.bin` (length-prefixed: `u64` count, then per +//! row `u32` len + bytes) and `fw_mask_.bin` (`u64` count, then one byte per row, 1 = kept). +//! If `FINEWEB_DIR` is unset or files are missing, every bench no-ops so CI stays green. +//! +//! The real predicates span the spectrum the view's `Auto` export was built for: clustered +//! selections (`dump = ...`, `date LIKE '2020-10-%'`) where survivors form long runs, and scattered +//! `LIKE '%...%'` containment filters where they don't. + +#![expect(clippy::unwrap_used)] + +use std::path::PathBuf; + +use divan::Bencher; +use divan::black_box; +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::filter::FilterKernel; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_fsst::FSST; +use vortex_fsst::FSSTArray; +use vortex_fsst::FSSTView; +use vortex_fsst::FsstViewCompaction; +use vortex_fsst::canonicalize_fsstview_with; +use vortex_fsst::fsst_compress; +use vortex_fsst::fsst_train_compressor; +use vortex_fsst::fsstview_from_fsst; +use vortex_mask::Mask; + +fn main() { + divan::main(); +} + +/// Real FineWeb benchmark predicates that select rows (the `WHERE` clauses of the `SELECT *` +/// queries in `vortex-bench/src/fineweb`). `filepath` matches zero rows so it is omitted. +const QUERIES: &[&str] = &[ + "dump_eq", // dump = 'CC-MAIN-2016-30' — clustered, ~7% + "date_prefix", // date LIKE '2020-10-%' — clustered, ~12% + "google_and", // url LIKE '%google%' AND text LIKE — very selective, scattered + "google_or", // url/text LIKE '%google%' — scattered, ~2% + "vortex", // text LIKE '% vortex %' — tiny + "espn_and", // url LIKE '%espn%' AND lang/score — tiny + "espn_or", // url LIKE '%espn%' OR ... — tiny +]; + +/// The materialized string column. `url` is short (~72 B), `text` is long (~3 KB). +const COLUMNS: &[&str] = &["url", "text"]; + +fn dir() -> Option { + std::env::var_os("FINEWEB_DIR").map(PathBuf::from) +} + +/// Read a length-prefixed column dump into a `VarBinArray`. +fn load_column(name: &str) -> Option { + let path = dir()?.join(format!("fw_{name}.bin")); + if !path.exists() { + return None; + } + let bytes = std::fs::read(path).unwrap(); + let mut pos = 0usize; + #[expect(clippy::cast_possible_truncation)] + let rows = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize; + pos += 8; + let mut values: Vec>> = Vec::with_capacity(rows); + for _ in 0..rows { + let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize; + pos += 4; + values.push(Some(bytes[pos..pos + len].to_vec())); + pos += len; + } + Some(VarBinArray::from_iter( + values.into_iter().map(|v| v.map(Vec::into_boxed_slice)), + DType::Utf8(Nullability::NonNullable), + )) +} + +/// Read a one-byte-per-row predicate mask. +fn load_mask(query: &str) -> Option { + let path = dir()?.join(format!("fw_mask_{query}.bin")); + if !path.exists() { + return None; + } + let bytes = std::fs::read(path).unwrap(); + #[expect(clippy::cast_possible_truncation)] + let rows = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize; + Some(Mask::from_iter((0..rows).map(|i| bytes[8 + i] != 0))) +} + +fn compress(varbin: &VarBinArray, ctx: &mut ExecutionCtx) -> FSSTArray { + let compressor = fsst_train_compressor(varbin); + fsst_compress(varbin, varbin.len(), varbin.dtype(), &compressor, ctx) +} + +#[derive(Clone, Copy)] +struct Case { + column: &'static str, + query: &'static str, +} + +impl std::fmt::Display for Case { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.column, self.query) + } +} + +fn cases() -> Vec { + let mut v = Vec::new(); + for &column in COLUMNS { + for &query in QUERIES { + v.push(Case { column, query }); + } + } + v +} + +#[divan::bench(args = cases())] +fn fsst(bencher: Bencher, case: Case) { + let (Some(varbin), Some(mask)) = (load_column(case.column), load_mask(case.query)) else { + return; + }; + let fsst = compress(&varbin, &mut LEGACY_SESSION.create_execution_ctx()); + bencher + .with_inputs(|| (&fsst, &mask, LEGACY_SESSION.create_execution_ctx())) + .bench_refs(|(fsst, mask, ctx)| { + let filtered: ArrayRef = ::filter(fsst.as_view(), mask, ctx) + .unwrap() + .unwrap(); + black_box( + filtered + .execute::(ctx) + .unwrap() + .into_array(), + ) + }); +} + +#[divan::bench(args = cases())] +fn view(bencher: Bencher, case: Case) { + let (Some(varbin), Some(mask)) = (load_column(case.column), load_mask(case.query)) else { + return; + }; + let fsst = compress(&varbin, &mut LEGACY_SESSION.create_execution_ctx()); + bencher + .with_inputs(|| (&fsst, &mask, LEGACY_SESSION.create_execution_ctx())) + .bench_refs(|(fsst, mask, ctx)| { + let view = fsstview_from_fsst(fsst, ctx).unwrap(); + let filtered = ::filter(view.as_view(), mask, ctx) + .unwrap() + .unwrap() + .try_downcast::() + .ok() + .unwrap(); + black_box( + canonicalize_fsstview_with(filtered.as_view(), FsstViewCompaction::Auto, ctx) + .unwrap(), + ) + }); +} diff --git a/encodings/fsst/src/fsstview/array.rs b/encodings/fsst/src/fsstview/array.rs new file mode 100644 index 00000000000..47d7baa9cad --- /dev/null +++ b/encodings/fsst/src/fsstview/array.rs @@ -0,0 +1,459 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use fsst::Symbol; +use prost::Message as _; +use vortex_array::Array; +use vortex_array::ArrayId; +use vortex_array::ArrayParts; +use vortex_array::ArrayRef; +use vortex_array::ArraySlots; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::ExecutionResult; +use vortex_array::IntoArray; +use vortex_array::TypedArrayRef; +use vortex_array::array_slots; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::varbin::VarBinArrayExt; +use vortex_array::buffer::BufferHandle; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::dtype::PType; +use vortex_array::serde::ArrayChildren; +use vortex_array::smallvec::smallvec; +use vortex_array::validity::Validity; +use vortex_array::vtable::VTable; +use vortex_array::vtable::ValidityVTable; +use vortex_array::vtable::child_to_validity; +use vortex_array::vtable::validity_to_child; +use vortex_buffer::Buffer; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; + +use crate::FSSTArray; +use crate::FSSTArrayExt; +// `FSSTView` reuses the exact same inner data representation as `FSST`: the symbol table plus +// the raw compressed byte heap. Only the *addressing* of that heap differs (offsets + sizes +// instead of monotonic offsets), and that addressing lives entirely in the array's slots. +use crate::array::FSSTData; +use crate::fsstview::canonical::canonicalize_fsstview; +use crate::fsstview::kernel::PARENT_KERNELS; +use crate::fsstview::rules::RULES; + +/// An [`FSSTView`]-encoded Vortex array. +pub type FSSTViewArray = Array; + +/// The [`FSSTView`] encoding: a ListView-style FSST array. +#[derive(Clone, Debug)] +pub struct FSSTView; + +/// The child slots of an [`FSSTView`] array. +/// +/// Declared with the [`array_slots`] proc macro, which generates the slot-index constants +/// (`FSSTViewSlots::CODES_OFFSETS`, ...), the borrowed [`FSSTViewSlotsView`] struct, and the +/// typed accessor trait [`FSSTViewArraySlotsExt`] (`.uncompressed_lengths()`, +/// `.codes_offsets()`, `.codes_ends()`, `.codes_validity()`). +#[array_slots(FSSTView)] +pub struct FSSTViewSlots { + /// Length of each original (uncompressed) value. Non-nullable integer. + pub uncompressed_lengths: ArrayRef, + /// Start offset of each element's compressed bytecodes within the code heap. Non-nullable + /// integer. Unlike `FSST`, these are **not** required to be monotonic or contiguous. + pub codes_offsets: ArrayRef, + /// End offset of each element's compressed bytecodes within the code heap, i.e. + /// `offset + size`. Non-nullable integer. Element `i`'s bytecodes are + /// `codes_bytes[codes_offsets[i] .. codes_ends[i]]`. + /// + /// Storing the end offset (rather than the size) keeps the [`FSSTArray`] → [`FSSTView`] + /// conversion allocation-free: for a freshly converted array the heap is contiguous, so + /// `codes_ends` is a zero-copy slice of the monotonic offsets (`offsets[1..len + 1]`), exactly + /// as `codes_offsets` is `offsets[0..len]`. The per-element size is derived as + /// `codes_ends[i] - codes_offsets[i]` only where it is needed (canonicalize / `scalar_at`), + /// never materialized for rows a selective `filter`/`take` discards. + pub codes_ends: ArrayRef, + /// Optional validity bitmap for the codes. Absent when the array is non-nullable. + pub codes_validity: Option, +} + +#[derive(Clone, prost::Message)] +pub struct FSSTViewMetadata { + #[prost(enumeration = "PType", tag = "1")] + uncompressed_lengths_ptype: i32, + #[prost(enumeration = "PType", tag = "2")] + codes_offsets_ptype: i32, + #[prost(enumeration = "PType", tag = "3")] + codes_ends_ptype: i32, +} + +impl FSSTView { + /// Build an [`FSSTViewArray`] from its decomposed components. + /// + /// `codes_offsets[i]` and `codes_ends[i]` address element `i`'s compressed bytecodes inside + /// `codes_bytes` as the range `codes_offsets[i]..codes_ends[i]`. The offsets do not need to be + /// sorted, contiguous, or non-overlapping. + #[allow(clippy::too_many_arguments)] + pub fn try_new( + dtype: DType, + symbols: Buffer, + symbol_lengths: Buffer, + codes_bytes: BufferHandle, + codes_offsets: ArrayRef, + codes_ends: ArrayRef, + uncompressed_lengths: ArrayRef, + validity: Validity, + ) -> VortexResult { + let len = codes_offsets.len(); + validate_fsstview( + &symbols, + &symbol_lengths, + &codes_offsets, + &codes_ends, + &uncompressed_lengths, + &validity, + &dtype, + len, + )?; + let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?; + let slots = make_slots( + uncompressed_lengths, + codes_offsets, + codes_ends, + &validity, + len, + ); + Ok(unsafe { + Array::from_parts_unchecked( + ArrayParts::new(FSSTView, dtype, len, data).with_slots(slots), + ) + }) + } + + /// Build an [`FSSTViewArray`] without validation. + /// + /// # Safety + /// + /// The caller must uphold the same invariants validated by [`FSSTView::try_new`]. + #[allow(clippy::too_many_arguments)] + pub(crate) unsafe fn new_unchecked( + dtype: DType, + symbols: Buffer, + symbol_lengths: Buffer, + codes_bytes: BufferHandle, + codes_offsets: ArrayRef, + codes_ends: ArrayRef, + uncompressed_lengths: ArrayRef, + validity: Validity, + ) -> FSSTViewArray { + let len = codes_offsets.len(); + let data = unsafe { FSSTData::new_unchecked(symbols, symbol_lengths, codes_bytes, len) }; + let slots = make_slots( + uncompressed_lengths, + codes_offsets, + codes_ends, + &validity, + len, + ); + unsafe { + Array::from_parts_unchecked( + ArrayParts::new(FSSTView, dtype, len, data).with_slots(slots), + ) + } + } +} + +/// Convert a plain [`FSSTArray`] into an [`FSSTViewArray`], sharing the symbol table and the +/// compressed byte heap (zero-copy) and addressing the codes with the FSST's existing monotonic +/// offsets. +/// +/// A freshly converted view's heap is contiguous, so element `i` occupies `offsets[i]..offsets[i + +/// 1]`. Both addressing arrays are therefore **zero-copy slices of the same `offsets` buffer**: +/// `codes_offsets = offsets[0..len]` and `codes_ends = offsets[1..len + 1]`. Nothing is allocated +/// or copied — in particular the per-element size (`codes_ends[i] - codes_offsets[i]`) is never +/// materialized here, so a subsequent selective `filter`/`take` does not pay to derive sizes for +/// the rows it discards. This removes the conversion floor a very selective predicate used to hit. +pub fn fsstview_from_fsst(fsst: &FSSTArray, ctx: &mut ExecutionCtx) -> VortexResult { + let codes = fsst.codes(); + let validity = codes.validity()?; + let offsets = codes.offsets().clone().execute::(ctx)?; + let len = offsets.len().saturating_sub(1); + + // Both addressing arrays are zero-copy slices of the `len + 1` monotonic offsets: element `i`'s + // codes are `offsets[i]..offsets[i + 1]`, so `codes_ends` is simply the offsets shifted by one. + let offsets = offsets.into_array(); + let codes_offsets = offsets.slice(0..len)?; + let codes_ends = offsets.slice(1..len + 1)?; + + // SAFETY: every `FSSTView` invariant is already guaranteed by the source `FSSTArray`: the dtype + // is Binary/Utf8, the offsets are non-nullable integers (so are the two zero-copy slices of + // them, which share `len`), the uncompressed lengths are non-nullable integers of the same + // length, and the validity nullability matches the dtype. Re-validating here would only repeat + // those checks on the hot conversion path. + Ok(unsafe { + FSSTView::new_unchecked( + fsst.dtype().clone(), + fsst.symbols().clone(), + fsst.symbol_lengths().clone(), + fsst.codes_bytes_handle().clone(), + codes_offsets, + codes_ends, + fsst.uncompressed_lengths().clone(), + validity, + ) + }) +} + +fn make_slots( + uncompressed_lengths: ArrayRef, + codes_offsets: ArrayRef, + codes_ends: ArrayRef, + validity: &Validity, + len: usize, +) -> ArraySlots { + smallvec![ + Some(uncompressed_lengths), + Some(codes_offsets), + Some(codes_ends), + validity_to_child(validity, len), + ] +} + +#[allow(clippy::too_many_arguments)] +fn validate_fsstview( + symbols: &Buffer, + symbol_lengths: &Buffer, + codes_offsets: &ArrayRef, + codes_ends: &ArrayRef, + uncompressed_lengths: &ArrayRef, + validity: &Validity, + dtype: &DType, + len: usize, +) -> VortexResult<()> { + vortex_ensure!( + matches!(dtype, DType::Binary(_) | DType::Utf8(_)), + "FSSTView arrays must be Binary or Utf8, found {dtype}" + ); + if symbols.len() > 255 { + vortex_bail!(InvalidArgument: "symbols array must have length <= 255"); + } + if symbols.len() != symbol_lengths.len() { + vortex_bail!(InvalidArgument: "symbols and symbol_lengths arrays must have same length"); + } + if codes_offsets.len() != len { + vortex_bail!(InvalidArgument: "codes_offsets must have same len as outer array"); + } + if codes_ends.len() != len { + vortex_bail!(InvalidArgument: "codes_ends must have same len as outer array"); + } + if uncompressed_lengths.len() != len { + vortex_bail!(InvalidArgument: "uncompressed_lengths must have same len as outer array"); + } + if !codes_offsets.dtype().is_int() || codes_offsets.dtype().is_nullable() { + vortex_bail!(InvalidArgument: "codes_offsets must be non-nullable integer, found {}", codes_offsets.dtype()); + } + if !codes_ends.dtype().is_int() || codes_ends.dtype().is_nullable() { + vortex_bail!(InvalidArgument: "codes_ends must be non-nullable integer, found {}", codes_ends.dtype()); + } + if !uncompressed_lengths.dtype().is_int() || uncompressed_lengths.dtype().is_nullable() { + vortex_bail!(InvalidArgument: "uncompressed_lengths must be non-nullable integer, found {}", uncompressed_lengths.dtype()); + } + if validity.nullability() != dtype.nullability() { + vortex_bail!(InvalidArgument: "validity nullability must match outer dtype nullability"); + } + Ok(()) +} + +/// Typed accessors for [`FSSTViewArray`] that aren't covered by the [`array_slots`] macro. +pub trait FSSTViewArrayExt: TypedArrayRef { + /// The validity of the array, derived from the `codes_validity` slot. + fn fsstview_validity(&self) -> Validity { + child_to_validity( + self.as_ref().slots()[FSSTViewSlots::CODES_VALIDITY].as_ref(), + self.as_ref().dtype().nullability(), + ) + } +} + +impl> FSSTViewArrayExt for T {} + +impl VTable for FSSTView { + type TypedArrayData = FSSTData; + type OperationsVTable = Self; + type ValidityVTable = Self; + + fn id(&self) -> ArrayId { + static ID: CachedId = CachedId::new("vortex.fsstview"); + *ID + } + + fn validate( + &self, + data: &Self::TypedArrayData, + dtype: &DType, + len: usize, + slots: &[Option], + ) -> VortexResult<()> { + let view = FSSTViewSlotsView::from_slots(slots); + let validity = child_to_validity(view.codes_validity, dtype.nullability()); + validate_fsstview( + data.symbols(), + data.symbol_lengths(), + view.codes_offsets, + view.codes_ends, + view.uncompressed_lengths, + &validity, + dtype, + len, + ) + } + + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { + 3 + } + + fn buffer(array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { + match idx { + 0 => BufferHandle::new_host(array.symbols().clone().into_byte_buffer()), + 1 => BufferHandle::new_host(array.symbol_lengths().clone().into_byte_buffer()), + 2 => array.codes_bytes_handle().clone(), + _ => vortex_panic!("FSSTViewArray buffer index {idx} out of bounds"), + } + } + + fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option { + match idx { + 0 => Some("symbols".to_string()), + 1 => Some("symbol_lengths".to_string()), + 2 => Some("compressed_codes".to_string()), + _ => vortex_panic!("FSSTViewArray buffer_name index {idx} out of bounds"), + } + } + + fn serialize( + array: ArrayView<'_, Self>, + _session: &VortexSession, + ) -> VortexResult>> { + Ok(Some( + FSSTViewMetadata { + uncompressed_lengths_ptype: PType::try_from(array.uncompressed_lengths().dtype())? + as i32, + codes_offsets_ptype: PType::try_from(array.codes_offsets().dtype())? as i32, + codes_ends_ptype: PType::try_from(array.codes_ends().dtype())? as i32, + } + .encode_to_vec(), + )) + } + + fn deserialize( + &self, + dtype: &DType, + len: usize, + metadata: &[u8], + buffers: &[BufferHandle], + children: &dyn ArrayChildren, + _session: &VortexSession, + ) -> VortexResult> { + let metadata = FSSTViewMetadata::decode(metadata)?; + if buffers.len() != 3 { + vortex_bail!( + InvalidArgument: "Expected 3 buffers for fsstview, got {}", + buffers.len() + ); + } + let symbols = Buffer::::from_byte_buffer(buffers[0].clone().try_to_host_sync()?); + let symbol_lengths = Buffer::::from_byte_buffer(buffers[1].clone().try_to_host_sync()?); + let codes_bytes = buffers[2].clone(); + + let uncompressed_lengths = children.get( + 0, + &DType::Primitive( + PType::try_from(metadata.uncompressed_lengths_ptype)?, + Nullability::NonNullable, + ), + len, + )?; + let codes_offsets = children.get( + 1, + &DType::Primitive( + PType::try_from(metadata.codes_offsets_ptype)?, + Nullability::NonNullable, + ), + len, + )?; + let codes_ends = children.get( + 2, + &DType::Primitive( + PType::try_from(metadata.codes_ends_ptype)?, + Nullability::NonNullable, + ), + len, + )?; + + let validity = if children.len() == 3 { + Validity::from(dtype.nullability()) + } else if children.len() == 4 { + Validity::Array(children.get(3, &Validity::DTYPE, len)?) + } else { + vortex_bail!("Expected 3 or 4 children, got {}", children.len()); + }; + + validate_fsstview( + &symbols, + &symbol_lengths, + &codes_offsets, + &codes_ends, + &uncompressed_lengths, + &validity, + dtype, + len, + )?; + + let data = FSSTData::try_new(symbols, symbol_lengths, codes_bytes, len)?; + let slots = make_slots( + uncompressed_lengths, + codes_offsets, + codes_ends, + &validity, + len, + ); + Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots)) + } + + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { + FSSTViewSlots::NAMES[idx].to_string() + } + + fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { + canonicalize_fsstview(array.as_view(), ctx).map(ExecutionResult::done) + } + + fn execute_parent( + array: ArrayView<'_, Self>, + parent: &ArrayRef, + child_idx: usize, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + PARENT_KERNELS.execute(array, parent, child_idx, ctx) + } + + fn reduce_parent( + array: ArrayView<'_, Self>, + parent: &ArrayRef, + child_idx: usize, + ) -> VortexResult> { + RULES.evaluate(array, parent, child_idx) + } +} + +impl ValidityVTable for FSSTView { + fn validity(array: ArrayView<'_, FSSTView>) -> VortexResult { + Ok(child_to_validity( + array.slots()[FSSTViewSlots::CODES_VALIDITY].as_ref(), + array.dtype().nullability(), + )) + } +} diff --git a/encodings/fsst/src/fsstview/canonical.rs b/encodings/fsst/src/fsstview/canonical.rs new file mode 100644 index 00000000000..6d5b2b5165f --- /dev/null +++ b/encodings/fsst/src/fsstview/canonical.rs @@ -0,0 +1,394 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Canonicalization of [`FSSTView`] into a [`VarBinViewArray`] (or [`VarBinArray`]). +//! +//! After metadata-only `filter`/`take`/`slice`, an [`FSSTView`]'s byte heap is the *original* heap +//! and the live codes are scattered (gaps after a filter, reordering/duplication after a take). To +//! canonicalize we must decode the survivors into one element-ordered buffer. [`FsstViewCompaction`] +//! captures how: +//! +//! - [`Direct`][FsstViewCompaction::Direct]: the live codes are still one contiguous in-order run +//! (an untouched or sliced view). Decode that single range in one call, no copy. +//! - [`RunDecode`][FsstViewCompaction::RunDecode] ("export all in place"): the offsets are still +//! monotonic (after any `filter`, sorted-index `take`, or `slice`) but gapped. Decode each +//! maximal contiguous heap run *directly* into the element-ordered output, with **no gather +//! copy** — one decode call per run. Wins while survivors form few runs (clustered / range +//! selections). +//! - [`GatherBulk`][FsstViewCompaction::GatherBulk] ("compact codes"): for scattered survivors (a +//! shuffle take) or heavily fragmented ones (a uniform-random filter), compact the live codes +//! into one contiguous buffer, then a single bulk decode. The one bulk call amortizes FSST's slow +//! decode tail across all elements, which beats run-decode once the runs get small. +//! +//! [`FsstViewCompaction::Auto`] picks `Direct` when contiguous, `RunDecode` when the offsets are +//! monotonic and the survivors form few runs (`runs <= len / RUN_DECODE_MAX_RUN_FRACTION`), and +//! `GatherBulk` otherwise. The choice lives entirely in the export: the conversion and the +//! metadata-only `filter`/`take` stay separate so a *chain* of them composes; only the final +//! canonicalize compacts (or not). + +use std::sync::Arc; + +use fsst::Decompressor; +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::varbinview::build_views::MAX_BUFFER_LEN; +use vortex_array::arrays::varbinview::build_views::build_views; +use vortex_array::buffer::BufferHandle; +use vortex_array::match_each_integer_ptype; +use vortex_buffer::BufferMut; +use vortex_buffer::ByteBufferMut; +use vortex_error::VortexResult; + +use super::array::FSSTView; +use super::array::FSSTViewArrayExt; +use super::array::FSSTViewArraySlotsExt; + +/// Strategy for materializing the decompressed bytes when canonicalizing an [`FSSTView`]. +/// +/// See the [module docs][self] for the full trade-off analysis. Every strategy produces an +/// element-ordered decoded buffer; they differ only in how the survivor codes are fed to the +/// decoder. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum FsstViewCompaction { + /// Pick automatically: `Direct` when contiguous, `RunDecode` when monotonic with few runs, + /// else `GatherBulk`. + Auto, + /// Bulk-decode the single contiguous live range, no copy. Falls back to `GatherBulk` if the + /// codes are not contiguous and in order. + Direct, + /// Compact the scattered live codes into a contiguous buffer, then a single bulk decode. + GatherBulk, + /// Decode each contiguous heap run directly into the element-ordered output, no gather copy. + /// Requires monotonic offsets; falls back to `GatherBulk` otherwise (e.g. a shuffle take). + RunDecode, +} + +/// `Auto` prefers `RunDecode` (export all in place) over `GatherBulk` (compact codes) while the +/// number of contiguous runs is at most `len / RUN_DECODE_MAX_RUN_FRACTION` — i.e. while survivors +/// average more than this many elements per run. Calibrated by the `fsst_view_compute` benches: +/// clustered and range selections sit well under this, uniform-random filters well over it. +const RUN_DECODE_MAX_RUN_FRACTION: usize = 4; + +pub(super) fn canonicalize_fsstview( + array: ArrayView<'_, FSSTView>, + ctx: &mut ExecutionCtx, +) -> VortexResult { + canonicalize_fsstview_with(array, FsstViewCompaction::Auto, ctx) +} + +/// Canonicalize an [`FSSTView`] to a [`VarBinViewArray`] using an explicit compaction strategy. +/// +/// Exposed (rather than only the dispatch-driven [`canonicalize_fsstview`]) so callers and +/// benchmarks can force a strategy. Production code goes through [`FsstViewCompaction::Auto`]. +pub fn canonicalize_fsstview_with( + array: ArrayView<'_, FSSTView>, + strategy: FsstViewCompaction, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let decoded = decode_element_ordered(array, strategy, ctx)?; + let (buffers, views) = match_each_integer_ptype!(decoded.ulen_prim.ptype(), |P| { + build_views( + 0, + MAX_BUFFER_LEN, + decoded.uncompressed, + decoded.ulen_prim.as_slice::

(), + ) + }); + // SAFETY: FSST validates the bytes for binary/UTF-8; the views point at valid ranges. + Ok(unsafe { + VarBinViewArray::new_unchecked( + views, + Arc::from(buffers), + array.dtype().clone(), + array.fsstview_validity(), + ) + .into_array() + }) +} + +/// Canonicalize an [`FSSTView`] to a [`VarBinArray`] (offsets + contiguous bytes) instead of a +/// [`VarBinViewArray`]. +/// +/// Shares the element-ordered decode path with [`canonicalize_fsstview_with`]; the only difference +/// is the finisher, which builds `len + 1` cumulative offsets from the uncompressed lengths rather +/// than per-element views. Cheaper than a `VarBinViewArray` when the consumer wants offsets+bytes +/// (no per-element 16-byte view construction). +pub fn canonicalize_fsstview_to_varbin( + array: ArrayView<'_, FSSTView>, + strategy: FsstViewCompaction, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let decoded = decode_element_ordered(array, strategy, ctx)?; + + let varbin_offsets = cumulative_offsets(&decoded.ulen_prim); + let bytes = BufferHandle::new_host(decoded.uncompressed.freeze()); + // SAFETY: offsets are monotonic and end at the byte length; bytes are valid binary/UTF-8. + Ok(unsafe { + VarBinArray::new_unchecked_from_handle( + varbin_offsets.into_array(), + bytes, + array.dtype().clone(), + array.fsstview_validity(), + ) + .into_array() + }) +} + +/// The element-ordered decoded bytes plus the uncompressed-lengths array the finishers need. +struct Decoded { + uncompressed: ByteBufferMut, + ulen_prim: PrimitiveArray, +} + +/// Decode an [`FSSTView`]'s survivors into one element-ordered buffer using the chosen (or `Auto`) +/// strategy. Shared by the `VarBinView` and `VarBin` finishers. +fn decode_element_ordered( + array: ArrayView<'_, FSSTView>, + strategy: FsstViewCompaction, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let offsets = load_usize(array.codes_offsets(), ctx)?; + // Derive each survivor's size in place from its end offset (`codes_ends[i] - codes_offsets[i]`), + // reusing the widened `ends` buffer as `sizes` so we don't allocate a third index array. + // Downstream layout analysis and decode work on `sizes` exactly as before. + let mut sizes = load_usize(array.codes_ends(), ctx)?; + for (size, &offset) in sizes.iter_mut().zip(&offsets) { + *size -= offset; + } + + let ulen_prim = array + .uncompressed_lengths() + .clone() + .execute::(ctx)?; + // Total decoded length, used by every path to size the output buffer. Summed straight from the + // typed slice — no need to widen the uncompressed lengths into a `Vec`. + #[expect(clippy::cast_possible_truncation)] + let total_size: usize = match_each_integer_ptype!(ulen_prim.ptype(), |P| { + ulen_prim.as_slice::

().iter().map(|x| *x as usize).sum() + }); + + let heap_buffer = array.codes_bytes(); + let heap = heap_buffer.as_slice(); + let decompressor = array.decompressor(); + + let layout = analyze_layout(&offsets, &sizes); + let chosen = match strategy { + FsstViewCompaction::Auto => match layout { + Layout::Contiguous => FsstViewCompaction::Direct, + Layout::Monotonic { runs } if runs <= offsets.len() / RUN_DECODE_MAX_RUN_FRACTION => { + FsstViewCompaction::RunDecode + } + _ => FsstViewCompaction::GatherBulk, + }, + // `Direct`/`RunDecode` require a (contiguous / monotonic) layout; fall back to gather. + FsstViewCompaction::Direct if !matches!(layout, Layout::Contiguous) => { + FsstViewCompaction::GatherBulk + } + FsstViewCompaction::RunDecode if matches!(layout, Layout::Scattered) => { + FsstViewCompaction::GatherBulk + } + other => other, + }; + + let uncompressed = match chosen { + FsstViewCompaction::Direct => { + let start = offsets.first().copied().unwrap_or(0); + // `live` (total compressed bytes) is only needed by the bulk-decode paths, not by + // `RunDecode`, so it is summed here rather than unconditionally up front. + let live: usize = sizes.iter().sum(); + decompress_direct(&decompressor, heap, start, live, total_size) + } + FsstViewCompaction::RunDecode => { + decompress_run_decode(&decompressor, heap, &offsets, &sizes, total_size) + } + // `Auto` is resolved above; `GatherBulk` is the catch-all. + _ => { + let live: usize = sizes.iter().sum(); + decompress_gather(&decompressor, heap, &offsets, &sizes, live, total_size) + } + }; + + Ok(Decoded { + uncompressed, + ulen_prim, + }) +} + +fn load_usize(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult> { + let prim = array.clone().execute::(ctx)?; + #[expect(clippy::cast_possible_truncation)] + let out: Vec = match_each_integer_ptype!(prim.ptype(), |P| { + prim.as_slice::

().iter().map(|x| *x as usize).collect() + }); + Ok(out) +} + +/// Build `len + 1` cumulative offsets over the uncompressed lengths (the `VarBin` offsets array), +/// directly from the typed slice. `push_unchecked` (capacity reserved) keeps this vectorized. +fn cumulative_offsets(ulen_prim: &PrimitiveArray) -> ArrayRef { + let len = ulen_prim.len(); + let mut offsets = BufferMut::::with_capacity(len + 1); + #[expect(clippy::cast_possible_truncation, clippy::cast_possible_wrap)] + let _: () = match_each_integer_ptype!(ulen_prim.ptype(), |P| { + let mut acc: usize = 0; + // SAFETY: `len + 1` slots reserved; we push exactly that many. + unsafe { offsets.push_unchecked(0) }; + for &ulen in ulen_prim.as_slice::

() { + acc += ulen as usize; + unsafe { offsets.push_unchecked(acc as i64) }; + } + }); + offsets.into_array() +} + +/// The survivor layout in the heap, used to pick an export strategy. +enum Layout { + /// Survivors are one contiguous in-order run (untouched / sliced view) — `Direct`. + Contiguous, + /// Offsets are strictly increasing but gapped: survivors form `runs` contiguous blocks. + /// Eligible for `RunDecode`. + Monotonic { runs: usize }, + /// Offsets are out of heap order (e.g. a shuffle take) — must gather. + Scattered, +} + +/// Classify the survivor layout in a single O(n) pass: are offsets monotonic, and how many maximal +/// contiguous runs do the (non-empty) survivors form? +fn analyze_layout(offsets: &[usize], sizes: &[usize]) -> Layout { + let mut runs = 0usize; + let mut gapped = false; + let mut prev_end: Option = None; + for (&offset, &size) in offsets.iter().zip(sizes) { + if size == 0 { + continue; // empty/null elements don't affect run structure + } + match prev_end { + None => runs = 1, + Some(end) if offset == end => {} // continues the current run + Some(end) if offset > end => { + runs += 1; + gapped = true; + } + Some(_) => return Layout::Scattered, // offset < end: out of order + } + prev_end = Some(offset + size); + } + if gapped { + Layout::Monotonic { runs } + } else { + Layout::Contiguous + } +} + +/// "Export all in place": decode each maximal contiguous heap run directly into the element-ordered +/// output, with no gather copy. Requires monotonic offsets (the caller guarantees this). +fn decompress_run_decode( + decompressor: &Decompressor<'_>, + heap: &[u8], + offsets: &[usize], + sizes: &[usize], + total_size: usize, +) -> ByteBufferMut { + let mut out = ByteBufferMut::with_capacity(total_size + 7); + { + let spare = out.spare_capacity_mut(); + // Walk elements in order, batching heap-adjacent survivors into one decode call. `out_pos` + // tracks where the current run's decoded bytes begin in the (element-ordered) output. + let mut out_pos = 0usize; + let mut i = 0usize; + while i < offsets.len() { + if sizes[i] == 0 { + i += 1; + continue; + } + let run_heap_start = offsets[i]; + let mut run_heap_end = run_heap_start; + let mut j = i; + while j < offsets.len() { + if sizes[j] == 0 { + j += 1; + continue; + } + if offsets[j] != run_heap_end { + break; + } + run_heap_end += sizes[j]; + j += 1; + } + // `decompress_into` returns the exact decoded byte count for this run, which equals the + // run's total uncompressed length; advance by it instead of precomputing per-element + // uncompressed lengths. + let written = decompressor + .decompress_into(&heap[run_heap_start..run_heap_end], &mut spare[out_pos..]); + out_pos += written; + i = j; + } + debug_assert_eq!( + out_pos, total_size, + "run-decode must fill exactly total_size bytes" + ); + } + unsafe { out.set_len(total_size) }; + out +} + +/// Decompress a single contiguous run of the heap in one bulk call (no copy). +fn decompress_direct( + decompressor: &Decompressor<'_>, + heap: &[u8], + start: usize, + live: usize, + total_size: usize, +) -> ByteBufferMut { + let mut out = ByteBufferMut::with_capacity(total_size + 7); + let written = + decompressor.decompress_into(&heap[start..start + live], out.spare_capacity_mut()); + unsafe { out.set_len(written) }; + out +} + +/// Compact the scattered live codes into a contiguous buffer, then a single bulk decompress. +/// +/// The gather coalesces consecutive heap-adjacent spans into a single `extend_from_slice`: for an +/// order-preserving `filter`, surviving neighbours are still contiguous in the heap, so a run of +/// `k` survivors is copied in one memcpy instead of `k`. This collapses the per-span copy overhead +/// (which dominates for short codes) to per-run, while a shuffle (no adjacency) is unaffected. +fn decompress_gather( + decompressor: &Decompressor<'_>, + heap: &[u8], + offsets: &[usize], + sizes: &[usize], + live: usize, + total_size: usize, +) -> ByteBufferMut { + let mut compressed = ByteBufferMut::with_capacity(live); + // Accumulate a contiguous `[run_start, run_end)` heap range and flush it as one copy. + let mut run_start = 0usize; + let mut run_end = 0usize; + for (&offset, &size) in offsets.iter().zip(sizes) { + if size == 0 { + continue; + } + if offset == run_end && run_end != run_start { + run_end += size; // extend the current run (heap-adjacent) + } else { + if run_end != run_start { + compressed.extend_from_slice(&heap[run_start..run_end]); + } + run_start = offset; + run_end = offset + size; + } + } + if run_end != run_start { + compressed.extend_from_slice(&heap[run_start..run_end]); + } + let mut out = ByteBufferMut::with_capacity(total_size + 7); + let written = decompressor.decompress_into(compressed.as_slice(), out.spare_capacity_mut()); + unsafe { out.set_len(written) }; + out +} diff --git a/encodings/fsst/src/fsstview/compute.rs b/encodings/fsst/src/fsstview/compute.rs new file mode 100644 index 00000000000..21b11c7b3d0 --- /dev/null +++ b/encodings/fsst/src/fsstview/compute.rs @@ -0,0 +1,108 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Metadata-only `filter` and `take` for [`FSSTView`]. +//! +//! Both operations rewrite only the small `offsets`/`ends`/`uncompressed_lengths`/`validity` +//! arrays and reuse the compressed byte heap (and symbol table) untouched. This is the core +//! "ListView speed" win over plain [`FSST`][crate::FSST], whose `filter`/`take` delegate to +//! `VarBin` and therefore rewrite the entire compressed heap. + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::arrays::dict::TakeExecute; +use vortex_array::arrays::filter::FilterKernel; +use vortex_array::builtins::ArrayBuiltins; +use vortex_array::scalar::Scalar; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use super::array::FSSTView; +use super::array::FSSTViewArrayExt; +use super::array::FSSTViewArraySlotsExt; + +impl FilterKernel for FSSTView { + fn filter( + array: ArrayView<'_, Self>, + mask: &Mask, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + // Filter only the addressing arrays; the byte heap and symbol table are reused as-is. + let validity = array.fsstview_validity().filter(mask)?; + let codes_offsets = array.codes_offsets().filter(mask.clone())?; + let codes_ends = array.codes_ends().filter(mask.clone())?; + let uncompressed_lengths = array.uncompressed_lengths().filter(mask.clone())?; + + // SAFETY: filter preserves all `FSSTView` invariants — offsets/ends/lengths stay + // non-nullable and equal-length, and validity tracks nullness separately. + Ok(Some( + unsafe { + FSSTView::new_unchecked( + array.dtype().clone(), + array.symbols().clone(), + array.symbol_lengths().clone(), + array.codes_bytes_handle().clone(), + codes_offsets, + codes_ends, + uncompressed_lengths, + validity, + ) + } + .into_array(), + )) + } +} + +impl TakeExecute for FSSTView { + fn take( + array: ArrayView<'_, Self>, + indices: &ArrayRef, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + let dtype = array + .dtype() + .clone() + .union_nullability(indices.dtype().nullability()); + + let validity = array.fsstview_validity().take(indices)?; + + // `take` of a non-nullable child with non-nullable indices stays non-nullable, so the + // `fill_null` (and the `cast`/`optimize` it pulls in) is pure overhead in the common case. + // Only when the indices are nullable can a null index introduce a null we must fill with + // zero — nullness itself is tracked separately by `validity`. + let fill = indices.dtype().is_nullable(); + let codes_offsets = take_child(array.codes_offsets(), indices, fill)?; + let codes_ends = take_child(array.codes_ends(), indices, fill)?; + let uncompressed_lengths = take_child(array.uncompressed_lengths(), indices, fill)?; + + // SAFETY: take preserves all `FSSTView` invariants (see `filter`). + Ok(Some( + unsafe { + FSSTView::new_unchecked( + dtype, + array.symbols().clone(), + array.symbol_lengths().clone(), + array.codes_bytes_handle().clone(), + codes_offsets, + codes_ends, + uncompressed_lengths, + validity, + ) + } + .into_array(), + )) + } +} + +/// Take a non-nullable integer child by `indices`, only filling nulls with zero when the indices +/// are nullable (and so could have introduced nulls). The child is always non-nullable on input. +fn take_child(child: &ArrayRef, indices: &ArrayRef, fill: bool) -> VortexResult { + let taken = child.take(indices.clone())?; + if fill { + taken.fill_null(Scalar::zero_value(child.dtype())) + } else { + Ok(taken) + } +} diff --git a/encodings/fsst/src/fsstview/from_fsst.rs b/encodings/fsst/src/fsstview/from_fsst.rs new file mode 100644 index 00000000000..10ed6add2d0 --- /dev/null +++ b/encodings/fsst/src/fsstview/from_fsst.rs @@ -0,0 +1,51 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Metadata-only `filter`/`take` that go straight from an [`FSSTArray`] to an [`FSSTViewArray`]. +//! +//! These are the "first hop" of the view pipeline. They never touch the compressed byte heap: +//! the [`FSSTArray`] is reinterpreted as an [`FSSTViewArray`] (sharing symbols + codes bytes, and +//! addressing the codes with zero-copy slices of the existing offsets) and then the selection is +//! applied to the small `offsets`/`ends`/`lengths`/`validity` arrays only. + +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::dict::TakeExecute; +use vortex_array::arrays::filter::FilterKernel; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use super::array::FSSTView; +use super::array::FSSTViewArray; +use super::array::fsstview_from_fsst; +use crate::FSSTArray; + +/// Filter an [`FSSTArray`], producing an [`FSSTViewArray`] without touching the codes. +pub fn fsst_filter_to_view( + array: &FSSTArray, + mask: &Mask, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let view = fsstview_from_fsst(array, ctx)?; + let filtered: ArrayRef = ::filter(view.as_view(), mask, ctx)? + .vortex_expect("FSSTView filter always returns Some"); + filtered + .try_downcast::() + .map_err(|_| vortex_err!("FSSTView filter must return an FSSTView")) +} + +/// Take from an [`FSSTArray`], producing an [`FSSTViewArray`] without touching the codes. +pub fn fsst_take_to_view( + array: &FSSTArray, + indices: &ArrayRef, + ctx: &mut ExecutionCtx, +) -> VortexResult { + let view = fsstview_from_fsst(array, ctx)?; + let taken: ArrayRef = ::take(view.as_view(), indices, ctx)? + .vortex_expect("FSSTView take always returns Some"); + taken + .try_downcast::() + .map_err(|_| vortex_err!("FSSTView take must return an FSSTView")) +} diff --git a/encodings/fsst/src/fsstview/kernel.rs b/encodings/fsst/src/fsstview/kernel.rs new file mode 100644 index 00000000000..4cbd32bf565 --- /dev/null +++ b/encodings/fsst/src/fsstview/kernel.rs @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::arrays::dict::TakeExecuteAdaptor; +use vortex_array::arrays::filter::FilterExecuteAdaptor; +use vortex_array::kernel::ParentKernelSet; + +use super::array::FSSTView; + +pub(super) const PARENT_KERNELS: ParentKernelSet = ParentKernelSet::new(&[ + ParentKernelSet::lift(&FilterExecuteAdaptor(FSSTView)), + ParentKernelSet::lift(&TakeExecuteAdaptor(FSSTView)), +]); diff --git a/encodings/fsst/src/fsstview/mod.rs b/encodings/fsst/src/fsstview/mod.rs new file mode 100644 index 00000000000..d2c3a574ad9 --- /dev/null +++ b/encodings/fsst/src/fsstview/mod.rs @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! A "ListView"-style variant of the [`FSST`][crate::FSST] encoding. +//! +//! Where [`FSST`][crate::FSST] addresses its compressed codes with a single monotonic +//! offsets array (`len + 1` offsets, exactly like `VarBin`/`List`), [`FSSTView`] addresses +//! them with a pair of per-element `offsets` **and** `ends` arrays (the ListView idea, storing +//! the end offset rather than the size — see [`ListView`][vortex_array::arrays::ListView]). +//! Element `i`'s compressed bytecodes live in `codes_bytes[offsets[i] .. ends[i]]`, and its size +//! is the derived `ends[i] - offsets[i]`. +//! +//! Decoupling the start (`offset`) from the end means the offsets are no longer required to be +//! monotonic or contiguous, so `filter`, `take`, and `slice` become metadata-only operations: +//! they rewrite only the (small) `offsets`/`ends`/lengths/validity arrays and **reuse the +//! compressed byte heap untouched**. The plain [`FSST`][crate::FSST] encoding has to rewrite the +//! entire compressed heap for `filter`/`take` because it delegates to `VarBin`. This is the same +//! trade-off `ListView` makes over `List`. +//! +//! Storing the *end* offset (instead of the size) additionally makes the [`FSSTArray`] → +//! [`FSSTViewArray`] conversion allocation-free: a freshly converted heap is contiguous, so both +//! `offsets` and `ends` are zero-copy slices of the FSST's monotonic offsets buffer +//! (`offsets[0..len]` and `offsets[1..len + 1]`). A selective `filter`/`take` therefore never +//! pays to derive sizes for the rows it discards. + +mod array; +mod canonical; +mod compute; +mod from_fsst; +mod kernel; +mod ops; +mod rules; +mod slice; +#[cfg(test)] +mod tests; + +pub use array::*; +pub use canonical::FsstViewCompaction; +pub use canonical::canonicalize_fsstview_to_varbin; +pub use canonical::canonicalize_fsstview_with; +pub use from_fsst::fsst_filter_to_view; +pub use from_fsst::fsst_take_to_view; diff --git a/encodings/fsst/src/fsstview/ops.rs b/encodings/fsst/src/fsstview/ops.rs new file mode 100644 index 00000000000..10f31ef1aa9 --- /dev/null +++ b/encodings/fsst/src/fsstview/ops.rs @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::ArrayView; +use vortex_array::ExecutionCtx; +use vortex_array::arrays::varbin::varbin_scalar; +use vortex_array::scalar::Scalar; +use vortex_array::vtable::OperationsVTable; +use vortex_buffer::ByteBuffer; +use vortex_error::VortexResult; + +use super::array::FSSTView; +use super::array::FSSTViewArraySlotsExt; + +impl OperationsVTable for FSSTView { + fn scalar_at( + array: ArrayView<'_, FSSTView>, + index: usize, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + // Preconditions (see `OperationsVTable`): `index` is in bounds and non-null. + let offset: usize = (&array.codes_offsets().execute_scalar(index, ctx)?).try_into()?; + let end: usize = (&array.codes_ends().execute_scalar(index, ctx)?).try_into()?; + + let compressed = &array.codes_bytes()[offset..end]; + let decoded = ByteBuffer::from(array.decompressor().decompress(compressed)); + Ok(varbin_scalar(decoded, array.dtype())) + } +} diff --git a/encodings/fsst/src/fsstview/rules.rs b/encodings/fsst/src/fsstview/rules.rs new file mode 100644 index 00000000000..a3a5c891be9 --- /dev/null +++ b/encodings/fsst/src/fsstview/rules.rs @@ -0,0 +1,10 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::arrays::slice::SliceReduceAdaptor; +use vortex_array::optimizer::rules::ParentRuleSet; + +use super::array::FSSTView; + +pub(crate) static RULES: ParentRuleSet = + ParentRuleSet::new(&[ParentRuleSet::lift(&SliceReduceAdaptor(FSSTView))]); diff --git a/encodings/fsst/src/fsstview/slice.rs b/encodings/fsst/src/fsstview/slice.rs new file mode 100644 index 00000000000..f3100032237 --- /dev/null +++ b/encodings/fsst/src/fsstview/slice.rs @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use vortex_array::ArrayRef; +use vortex_array::ArrayView; +use vortex_array::IntoArray; +use vortex_array::arrays::slice::SliceReduce; +use vortex_error::VortexResult; + +use super::array::FSSTView; +use super::array::FSSTViewArrayExt; +use super::array::FSSTViewArraySlotsExt; + +impl SliceReduce for FSSTView { + fn slice(array: ArrayView<'_, Self>, range: Range) -> VortexResult> { + // Slicing leaves the symbol table and compressed byte heap intact; we only slice the + // addressing arrays. + Ok(Some( + unsafe { + FSSTView::new_unchecked( + array.dtype().clone(), + array.symbols().clone(), + array.symbol_lengths().clone(), + array.codes_bytes_handle().clone(), + array.codes_offsets().slice(range.clone())?, + array.codes_ends().slice(range.clone())?, + array.uncompressed_lengths().slice(range.clone())?, + array.fsstview_validity().slice(range)?, + ) + } + .into_array(), + )) + } +} diff --git a/encodings/fsst/src/fsstview/tests.rs b/encodings/fsst/src/fsstview/tests.rs new file mode 100644 index 00000000000..fddbbc2ec5e --- /dev/null +++ b/encodings/fsst/src/fsstview/tests.rs @@ -0,0 +1,380 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use rstest::rstest; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::LEGACY_SESSION; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::Primitive; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::VarBinArray; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::dict::TakeExecute; +use vortex_array::assert_arrays_eq; +use vortex_array::compute::conformance::consistency::test_array_consistency; +use vortex_array::compute::conformance::filter::test_filter_conformance; +use vortex_array::compute::conformance::take::test_take_conformance; +use vortex_array::dtype::DType; +use vortex_array::dtype::Nullability; +use vortex_array::match_each_integer_ptype; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use super::array::FSSTViewArraySlotsExt; +use crate::FSSTArray; +use crate::FSSTView; +use crate::FSSTViewArray; +use crate::FsstViewCompaction; +use crate::canonicalize_fsstview_to_varbin; +use crate::canonicalize_fsstview_with; +use crate::fsst_compress; +use crate::fsst_filter_to_view; +use crate::fsst_take_to_view; +use crate::fsst_train_compressor; +use crate::fsstview_from_fsst; + +fn make_fsstview( + strings: &[Option<&str>], + nullability: Nullability, + ctx: &mut ExecutionCtx, +) -> FSSTViewArray { + let varbin = VarBinArray::from_iter(strings.iter().copied(), DType::Utf8(nullability)); + let compressor = fsst_train_compressor(&varbin); + let fsst = fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx); + fsstview_from_fsst(&fsst, ctx).expect("fsstview_from_fsst") +} + +const SAMPLE: [Option<&str>; 6] = [ + Some("hello world"), + Some("testing fsst compression"), + Some("hello world"), + Some("another string here"), + Some("the quick brown fox"), + Some("hello world"), +]; + +const SAMPLE_NULLABLE: [Option<&str>; 6] = [ + Some("hello world"), + None, + Some("testing fsst compression"), + Some("another string here"), + None, + Some("the quick brown fox"), +]; + +#[test] +fn canonicalizes_to_same_values() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let view = make_fsstview(&SAMPLE, Nullability::NonNullable, &mut ctx); + let array = view.into_array(); + assert!(array.is::()); + + let canonical = array.execute::(&mut ctx)?; + let expected = VarBinArray::from_iter( + SAMPLE.iter().copied(), + DType::Utf8(Nullability::NonNullable), + ) + .into_array() + .execute::(&mut ctx)?; + assert_arrays_eq!(canonical.into_array(), expected.into_array()); + Ok(()) +} + +/// The conversion-floor fix depends on `codes_offsets`/`codes_ends` being **zero-copy** slices of +/// the FSST's single monotonic offsets buffer (`offsets[0..len]` and `offsets[1..len + 1]`) — +/// nothing copied, no per-element `sizes` array materialized. Verify it structurally: a freshly +/// converted view's `codes_ends` must begin exactly one element past `codes_offsets` *in the same +/// allocation*. A regression to a size-materializing conversion would break this even though the +/// decoded values would still agree (so the value/agreement tests would not catch it). The bench +/// that measures the resulting floor (`fsst_view_fineweb_queries`) is gated out of CI. +#[test] +fn conversion_shares_offsets_buffer_zero_copy() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let fsst = make_fsst(&SAMPLE, Nullability::NonNullable, &mut ctx); + + let view = fsstview_from_fsst(&fsst, &mut ctx)?; + let offsets = view + .codes_offsets() + .clone() + .try_downcast::() + .map_err(|_| vortex_err!("codes_offsets should be a primitive slice"))?; + let ends = view + .codes_ends() + .clone() + .try_downcast::() + .map_err(|_| vortex_err!("codes_ends should be a primitive slice"))?; + + assert_eq!(offsets.ptype(), ends.ptype()); + match_each_integer_ptype!(offsets.ptype(), |P| { + let off = offsets.as_slice::

(); + let end = ends.as_slice::

(); + assert_eq!( + end.as_ptr(), + off.as_ptr().wrapping_add(1), + "codes_ends must be codes_offsets shifted by one within the same buffer (zero-copy)" + ); + }); + Ok(()) +} + +#[test] +fn scalar_at_decodes_each_element() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let view = make_fsstview(&SAMPLE, Nullability::NonNullable, &mut ctx); + let array = view.into_array(); + + for (i, expected) in SAMPLE.iter().enumerate() { + let scalar = array.execute_scalar(i, &mut ctx)?; + let value = scalar.as_utf8().value().expect("non-null"); + assert_eq!(value.as_str(), expected.unwrap()); + } + Ok(()) +} + +#[rstest] +#[case(&SAMPLE, Nullability::NonNullable)] +#[case(&SAMPLE_NULLABLE, Nullability::Nullable)] +fn filter_conformance(#[case] strings: &[Option<&str>], #[case] nullability: Nullability) { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let view = make_fsstview(strings, nullability, &mut ctx); + test_filter_conformance(&view.into_array()); +} + +#[rstest] +#[case(&SAMPLE, Nullability::NonNullable)] +#[case(&SAMPLE_NULLABLE, Nullability::Nullable)] +fn take_conformance(#[case] strings: &[Option<&str>], #[case] nullability: Nullability) { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let view = make_fsstview(strings, nullability, &mut ctx); + test_take_conformance(&view.into_array()); +} + +#[rstest] +#[case(&SAMPLE, Nullability::NonNullable)] +#[case(&SAMPLE_NULLABLE, Nullability::Nullable)] +fn consistency(#[case] strings: &[Option<&str>], #[case] nullability: Nullability) { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let view = make_fsstview(strings, nullability, &mut ctx); + test_array_consistency(&view.into_array()); +} + +fn make_fsst( + strings: &[Option<&str>], + nullability: Nullability, + ctx: &mut ExecutionCtx, +) -> FSSTArray { + let varbin = VarBinArray::from_iter(strings.iter().copied(), DType::Utf8(nullability)); + let compressor = fsst_train_compressor(&varbin); + fsst_compress(&varbin, varbin.len(), varbin.dtype(), &compressor, ctx) +} + +/// `fsst_filter_to_view` must agree with filtering the canonical VarBin, and must not touch the +/// codes bytes (the produced view shares the original heap). +#[test] +fn fsst_filter_to_view_matches_canonical() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let fsst = make_fsst(&SAMPLE_NULLABLE, Nullability::Nullable, &mut ctx); + let mask = Mask::from_iter([true, false, true, false, true, true]); + + let view = fsst_filter_to_view(&fsst, &mask, &mut ctx)?; + let result = view.into_array().execute::(&mut ctx)?; + + let expected = VarBinArray::from_iter( + SAMPLE_NULLABLE.iter().copied(), + DType::Utf8(Nullability::Nullable), + ) + .into_array() + .filter(mask)? + .execute::(&mut ctx)?; + assert_arrays_eq!(result.into_array(), expected.into_array()); + Ok(()) +} + +#[test] +fn fsst_take_to_view_matches_canonical() -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let fsst = make_fsst(&SAMPLE, Nullability::NonNullable, &mut ctx); + let indices = PrimitiveArray::from_iter([5u64, 0, 0, 3, 1]).into_array(); + + let view = fsst_take_to_view(&fsst, &indices, &mut ctx)?; + let result = view.into_array().execute::(&mut ctx)?; + + let expected = VarBinArray::from_iter( + SAMPLE.iter().copied(), + DType::Utf8(Nullability::NonNullable), + ) + .into_array() + .take(indices)? + .execute::(&mut ctx)?; + assert_arrays_eq!(result.into_array(), expected.into_array()); + Ok(()) +} + +/// All explicit compaction strategies must produce identical canonical output, both for a +/// contiguous (sliced) view and a scattered (taken) one. +#[rstest] +#[case(FsstViewCompaction::Auto)] +#[case(FsstViewCompaction::Direct)] +#[case(FsstViewCompaction::GatherBulk)] +#[case(FsstViewCompaction::RunDecode)] +fn compaction_strategies_agree(#[case] strategy: FsstViewCompaction) -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let fsst = make_fsst(&SAMPLE, Nullability::NonNullable, &mut ctx); + + // Scattered view via a take (reorders + duplicates -> non-contiguous codes). + let indices = PrimitiveArray::from_iter([5u64, 0, 0, 3, 1, 2]).into_array(); + let scattered = fsst_take_to_view(&fsst, &indices, &mut ctx)?; + let got = canonicalize_fsstview_with(scattered.as_view(), strategy, &mut ctx)?; + let expected = VarBinArray::from_iter( + SAMPLE.iter().copied(), + DType::Utf8(Nullability::NonNullable), + ) + .into_array() + .take(indices)? + .execute::(&mut ctx)?; + assert_arrays_eq!(got, expected.into_array()); + + // Contiguous view (untouched) — exercises the Direct fast path. + let contiguous = fsstview_from_fsst(&fsst, &mut ctx)?; + let got = canonicalize_fsstview_with(contiguous.as_view(), strategy, &mut ctx)?; + let expected = VarBinArray::from_iter( + SAMPLE.iter().copied(), + DType::Utf8(Nullability::NonNullable), + ) + .into_array() + .execute::(&mut ctx)?; + assert_arrays_eq!(got, expected.into_array()); + Ok(()) +} + +/// Adversarial coverage: a filter that punches gaps into the heap (so survivors form multiple +/// runs), then a shuffle take (reorders runs, forcing `GatherBulk`), over nullable data. Every +/// strategy must still agree with the canonical result. +#[rstest] +#[case(FsstViewCompaction::Auto)] +#[case(FsstViewCompaction::GatherBulk)] +fn gaps_and_shuffle_agree(#[case] strategy: FsstViewCompaction) -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + // 12 distinct-ish strings, nullable. + let strings: Vec> = vec![ + Some("alpha"), + None, + Some("bravo bravo"), + Some("charlie"), + Some("delta delta delta"), + None, + Some("echo"), + Some("foxtrot foxtrot"), + Some("golf"), + Some("hotel hotel hotel"), + None, + Some("india"), + ]; + let fsst = make_fsst(&strings, Nullability::Nullable, &mut ctx); + + // Filter to keep a gapped subset (drops 1,2,5,8,10 -> remaining survivors aren't all adjacent). + let keep = [ + true, false, false, true, true, false, true, true, false, true, false, true, + ]; + let mask = Mask::from_iter(keep); + let filtered = fsst_filter_to_view(&fsst, &mask, &mut ctx)?; + + // Then a shuffle+dup take over the filtered length (7 survivors). + let indices = PrimitiveArray::from_iter([6u64, 0, 3, 3, 5, 1, 2, 4]).into_array(); + let view = ::take(filtered.as_view(), &indices, &mut ctx)? + .unwrap() + .try_downcast::() + .ok() + .unwrap(); + + let got = canonicalize_fsstview_with(view.as_view(), strategy, &mut ctx)?; + + let expected = + VarBinArray::from_iter(strings.iter().copied(), DType::Utf8(Nullability::Nullable)) + .into_array() + .filter(mask)? + .take(indices)? + .execute::(&mut ctx)?; + assert_arrays_eq!(got, expected.into_array()); + Ok(()) +} + +/// `RunDecode` ("export all in place") must agree with the canonical result on a *monotonic* +/// gapped view (a filter, which keeps offsets increasing). Covers nulls, empty strings, and a +/// trailing run, across the strategies that accept monotonic input. +#[rstest] +#[case(FsstViewCompaction::Auto)] +#[case(FsstViewCompaction::RunDecode)] +#[case(FsstViewCompaction::GatherBulk)] +#[case(FsstViewCompaction::Direct)] +fn run_decode_monotonic_filter(#[case] strategy: FsstViewCompaction) -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let strings: Vec> = vec![ + Some("alpha"), + Some(""), + None, + Some("bravo bravo"), + Some("charlie"), + None, + Some("delta delta delta"), + Some("echo"), + Some(""), + Some("foxtrot foxtrot"), + Some("golf golf"), + ]; + let fsst = make_fsst(&strings, Nullability::Nullable, &mut ctx); + // Keep a gapped-but-ordered subset (multiple runs, including an adjacent pair and a trailing + // run) so RunDecode exercises >1 run and the GatherBulk fallback is also valid. + let keep = [ + true, true, false, true, false, false, true, true, true, false, true, + ]; + let mask = Mask::from_iter(keep); + let view = fsst_filter_to_view(&fsst, &mask, &mut ctx)?; + + let got = canonicalize_fsstview_with(view.as_view(), strategy, &mut ctx)?; + let expected = + VarBinArray::from_iter(strings.iter().copied(), DType::Utf8(Nullability::Nullable)) + .into_array() + .filter(mask)? + .execute::(&mut ctx)?; + assert_arrays_eq!(got, expected.into_array()); + Ok(()) +} + +/// The VarBin exporter must agree with the canonical VarBin filter, across the export strategies, +/// for a gapped filter over nullable data. +#[rstest] +#[case(FsstViewCompaction::Auto)] +#[case(FsstViewCompaction::GatherBulk)] +#[case(FsstViewCompaction::RunDecode)] +fn varbin_export_matches_canonical(#[case] strategy: FsstViewCompaction) -> VortexResult<()> { + let mut ctx = LEGACY_SESSION.create_execution_ctx(); + let strings: Vec> = vec![ + Some("alpha"), + None, + Some("bravo bravo"), + Some("charlie"), + Some("delta delta delta"), + None, + Some("echo"), + Some("foxtrot foxtrot"), + ]; + let fsst = make_fsst(&strings, Nullability::Nullable, &mut ctx); + let keep = [true, false, true, true, false, false, true, true]; + let mask = Mask::from_iter(keep); + let view = fsst_filter_to_view(&fsst, &mask, &mut ctx)?; + + let got = canonicalize_fsstview_to_varbin(view.as_view(), strategy, &mut ctx)?; + // Compare as VarBinView so the offsets-vs-views layout difference doesn't matter. + let got_view = got.execute::(&mut ctx)?; + + let expected = + VarBinArray::from_iter(strings.iter().copied(), DType::Utf8(Nullability::Nullable)) + .into_array() + .filter(mask)? + .execute::(&mut ctx)?; + assert_arrays_eq!(got_view.into_array(), expected.into_array()); + Ok(()) +} diff --git a/encodings/fsst/src/lib.rs b/encodings/fsst/src/lib.rs index 3305c0e66fc..47fbfae9547 100644 --- a/encodings/fsst/src/lib.rs +++ b/encodings/fsst/src/lib.rs @@ -16,6 +16,7 @@ mod canonical; mod compress; mod compute; mod dfa; +mod fsstview; mod kernel; mod ops; mod rules; @@ -27,3 +28,4 @@ mod tests; pub use array::*; pub use compress::*; +pub use fsstview::*; diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index e69b5848de2..58179486b76 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -115,6 +115,7 @@ use vortex_array::arrays::patched::use_experimental_patches; use vortex_array::session::ArraySessionExt; use vortex_bytebool::ByteBool; use vortex_fsst::FSST; +use vortex_fsst::FSSTView; use vortex_pco::Pco; use vortex_session::VortexSession; use vortex_zigzag::ZigZag; @@ -162,6 +163,7 @@ pub fn register_default_encodings(session: &VortexSession) { arrays.register(ByteBool); arrays.register(Dict); arrays.register(FSST); + arrays.register(FSSTView); arrays.register(Pco); arrays.register(ZigZag); #[cfg(feature = "zstd")]