Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions quickwit/quickwit-query/src/query_ast/cache_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,16 @@ pub struct HitSet {
const INCOMPLETE_BLOCK_MARKER: u8 = 0x80;

impl HitSet {
#[cfg(test)]
fn empty() -> Self {
/// An empty hit set, matching no document.
pub fn empty() -> Self {
Self::from_buffer(OwnedBytes::new(vec![0, 0, 0, 0]))
}

/// Returns true if this hit set matches no document.
pub fn is_empty(&self) -> bool {
self.size_hint() == 0
}

/// Build a HitSet from its serialized form.
///
/// The provided buffer must come from `HitSet::into_buffer`
Expand Down
177 changes: 142 additions & 35 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use quickwit_proto::search::{
SortOrder, SortValue, SplitIdAndFooterOffsets, SplitResourceStats, SplitSearchError,
};
use quickwit_query::query_ast::{
BoolQuery, CacheNode, QueryAst, QueryAstTransformer, RangeQuery, TermQuery,
BoolQuery, CacheNode, HitSet, PredicateCache, QueryAst, QueryAstTransformer, RangeQuery,
TermQuery,
};
use quickwit_query::tokenizers::TokenizerManager;
use quickwit_storage::{
Expand All @@ -50,6 +51,7 @@ use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::collector::Collector;
use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::index::SegmentId;
use tantivy::schema::Field;
use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term};
use tokio::task::{JoinError, JoinSet};
Expand Down Expand Up @@ -256,16 +258,6 @@ pub(crate) async fn open_index_with_caches(
Ok((index, hot_directory))
}

/// Outcome of [`warmup`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum WarmupOutcome {
/// The warmup completed; the split must be searched.
Completed,
/// A required term has an empty posting list, so the query provably matches
/// nothing in this split. The remaining warmup downloads were cancelled.
ProvablyEmpty,
}

/// Runs `fut`, racing it against `cancel`. If cancellation fires first, the
/// (possibly in-flight) future is dropped — aborting its downloads — and
/// `Ok(())` is returned. With no token, `fut` simply runs to completion.
Expand Down Expand Up @@ -298,11 +290,21 @@ async fn run_cancellable(
/// * `term_dict_field_names` - A list of fields, where the whole dictionary needs to be loaded.
/// This is e.g. required for term aggregation, since we don't know in advance which terms are
/// going to be hit.
///
/// `on_absent` is invoked once for every required term found to have an empty posting list,
/// with the segment it was missing from. Such a term proves the query empty in this split,
/// so the remaining warmup downloads are then cancelled; the callback lets the caller record
/// the (immutable, query-independent) absence — see [`term_absence_cache_key`]. It only ever
/// fires for a single-segment split, where "absent in the split" is sound.
///
/// Returns whether the query is provably empty in this split (i.e. `on_absent` fired and
/// warmup was short-circuited).
#[instrument(skip_all)]
pub(crate) async fn warmup(
searcher: &Searcher,
warmup_info: &WarmupInfo,
) -> anyhow::Result<WarmupOutcome> {
on_absent: &(dyn Fn(&Term, SegmentId) + Sync),
) -> anyhow::Result<bool> {
debug!(warmup_info=?warmup_info);

// Early-abort optimization: the split's downloads can be cancelled as soon as
Expand All @@ -323,6 +325,7 @@ pub(crate) async fn warmup(
&warmup_info.terms_grouped_by_field,
&warmup_info.required_terms,
abort_token.as_ref(),
on_absent,
)
.instrument(debug_span!("warm_up_terms"));
let warm_up_term_ranges_future = run_cancellable(
Expand Down Expand Up @@ -371,11 +374,7 @@ pub(crate) async fn warmup(
Some(abort_token) => abort_token.is_cancelled(),
None => false,
};
if provably_empty {
Ok(WarmupOutcome::ProvablyEmpty)
} else {
Ok(WarmupOutcome::Completed)
}
Ok(provably_empty)
}

async fn warm_up_term_dict_fields(
Expand Down Expand Up @@ -453,11 +452,13 @@ async fn warm_up_terms(
terms_grouped_by_field: &HashMap<Field, HashMap<Term, bool>>,
required_terms: &HashSet<Term>,
abort_token: Option<&CancellationToken>,
on_absent: &(dyn Fn(&Term, SegmentId) + Sync),
) -> anyhow::Result<()> {
let mut warm_up_futures = Vec::new();
for (field, terms) in terms_grouped_by_field {
for segment_reader in searcher.segment_readers() {
let inv_idx = segment_reader.inverted_index(*field)?;
let segment_id = segment_reader.segment_id();
for (term, position_needed) in terms.iter() {
let inv_idx_clone = inv_idx.clone();
// Only a required term can prove the query empty. When such a
Expand All @@ -470,6 +471,9 @@ async fn warm_up_terms(
warm_up_futures.push(async move {
let found = inv_idx_clone.warm_postings(term, *position_needed).await?;
if !found && let Some(abort_token) = cancel_on_empty {
// Report the absence and fire the abort token. Both are synchronous, so
// they run before any cancellation can drop us.
on_absent(term, segment_id);
abort_token.cancel();
}
anyhow::Ok(())
Expand Down Expand Up @@ -621,6 +625,33 @@ fn compute_index_size(hot_directory: &HotDirectory) -> ByteSize {
ByteSize(size_bytes)
}

/// Cache key under which "this term has no posting list in this split" is recorded in
/// the shared predicate cache.
///
/// Absence is a property of the term and the split alone: it is independent of the rest
/// of the query and of any time window, and — because splits are immutable — it never
/// changes once observed. So a single entry per `(split, term)` lets *every* query that
/// carries this term short-circuit before warmup, no matter what other filters or time
/// range ride along, and adding more required terms can only make a query emptier.
///
/// The key is the field id followed by the hex of the term's serialized value bytes
/// (which for a JSON field already encode the path and type) — together a unique
/// identifier of the term. It never collides with the whole-query keys the [`CacheNode`]
/// positive cache stores in the same instance: those are serialized query ASTs that
/// start with `{`, never a hex field id.
pub(crate) fn term_absence_cache_key(term: &Term) -> String {
use std::fmt::Write;

let value_bytes = term.serialized_value_bytes();
let mut key = String::with_capacity(9 + value_bytes.len() * 2);
let _ = write!(key, "{:08x}:", term.field().field_id());
for byte in value_bytes {
// Hex-encode so the binary value bytes form a valid (printable) String key.
let _ = write!(key, "{byte:02x}");
}
key
}

/// Apply a leaf search on a single split.
async fn leaf_search_single_split(
search_request: SearchRequest,
Expand Down Expand Up @@ -712,7 +743,40 @@ async fn leaf_search_single_split(

let warmup_start = Instant::now();
leaf_search_state_guard.set_state(SplitSearchState::WarmUp);
let warmup_outcome = warmup(&searcher, &warmup_info).await?;
// Negative cache: a split is provably empty for this query if any required term has
// previously been proven absent here. Absence is an immutable, query- and
// time-window-independent property of the split (see `term_absence_cache_key`), so the
// split short-circuits before warmup no matter which earlier query first proved the
// term absent, nor what other filters or time range this query carries — extra
// required terms can only make it emptier. An empty result is segment- and
// scoring-agnostic, so this holds even for scored queries (which the `CacheNode`
// machinery itself does not support).
let cached_known_empty = warmup_info.required_terms.iter().any(|term| {
match ctx
.searcher_context
.predicate_cache
.get(split_id.clone(), term_absence_cache_key(term))
{
Some((_segment_id, hits)) => hits.is_empty(),
None => false,
}
});
let provably_empty = if cached_known_empty {
true
} else {
// Record every required term proven absent during warmup, so any future query
// carrying one of them prunes before warmup. Absence is per `(split, term)`,
// immutable, and independent of the rest of the query.
let record_absence = |term: &Term, segment_id: SegmentId| {
ctx.searcher_context.predicate_cache.put(
split_id.clone(),
term_absence_cache_key(term),
segment_id,
HitSet::empty(),
);
};
warmup(&searcher, &warmup_info, &record_absence).await?
};
let warmup_end = Instant::now();
let warmup_duration: Duration = warmup_end.duration_since(warmup_start);
let warmup_size = ByteSize(byte_range_cache.get_num_bytes());
Expand All @@ -731,7 +795,11 @@ async fn leaf_search_single_split(
search_permit.update_memory_usage(warmup_size);
search_permit.free_warmup_slot();

if warmup_outcome == WarmupOutcome::ProvablyEmpty {
if provably_empty {
// The absences discovered this run were already recorded per-term above (or the
// short-circuit came from a prior run's per-term entry), so there is nothing to
// write here.
//
// A required term's posting list was empty, so the query matches no
// document in this split. The remaining warmup downloads were aborted;
// skip the search and report an empty (but counted) result. This is the
Expand Down Expand Up @@ -2748,6 +2816,8 @@ mod tests {
(searcher, field)
}

/// Builds a `WarmupInfo` warming `terms`, with `required` as the set of required
/// terms (each must be present for the query to match).
fn warmup_info_with_required(terms: &[&Term], required: &[&Term]) -> WarmupInfo {
let mut terms_grouped_by_field: HashMap<Field, HashMap<Term, bool>> = HashMap::new();
for term in terms {
Expand All @@ -2764,33 +2834,70 @@ mod tests {
}

#[tokio::test]
async fn test_warmup_aborts_when_required_term_is_missing() {
async fn test_warmup_reports_absent_required_terms() {
let (searcher, body) = ram_searcher_with_text("body", &["hello world"]);
// Single segment: the early-abort optimization is armed.
// Single segment: the early-abort optimization is armed, so absence is recorded.
assert_eq!(searcher.segment_readers().len(), 1);

let present = Term::from_field_text(body, "hello");
let missing = Term::from_field_text(body, "missing");

// A required term with an empty posting list proves the query empty.
let warmup_info = warmup_info_with_required(&[&present, &missing], &[&missing]);
assert_eq!(
warmup(&searcher, &warmup_info).await.unwrap(),
WarmupOutcome::ProvablyEmpty
);
// Runs warmup, returning whether the split is provably empty and the terms that
// `on_absent` was invoked with.
async fn run(searcher: &Searcher, warmup_info: &WarmupInfo) -> (bool, Vec<Term>) {
let reported = std::sync::Mutex::new(Vec::new());
let provably_empty = warmup(searcher, warmup_info, &|term: &Term, _segment_id| {
reported.lock().unwrap().push(term.clone());
})
.await
.unwrap();
(provably_empty, reported.into_inner().unwrap())
}

// An absent required term is reported (so the caller can cache it) and proves the
// split empty; the present required term is not reported.
let warmup_info = warmup_info_with_required(&[&present, &missing], &[&present, &missing]);
let (provably_empty, reported) = run(&searcher, &warmup_info).await;
assert!(provably_empty);
assert_eq!(reported, vec![missing.clone()]);

// The required term is present: warmup completes normally.
// All required terms present: nothing reported, so the split must be searched.
let warmup_info = warmup_info_with_required(&[&present], &[&present]);
assert_eq!(
warmup(&searcher, &warmup_info).await.unwrap(),
WarmupOutcome::Completed
);
let (provably_empty, reported) = run(&searcher, &warmup_info).await;
assert!(!provably_empty);
assert!(reported.is_empty());

// A missing term that is not required must not abort.
// A missing term that is not required is never reported (recording would be unsound
// without a required-term proof), so the split must be searched.
let warmup_info = warmup_info_with_required(&[&present, &missing], &[]);
let (provably_empty, reported) = run(&searcher, &warmup_info).await;
assert!(!provably_empty);
assert!(reported.is_empty());
}

#[test]
fn test_term_absence_cache_key() {
let mut schema_builder = Schema::builder();
let body = schema_builder.add_text_field("body", tantivy::schema::TEXT);
let title = schema_builder.add_text_field("title", tantivy::schema::TEXT);
let _schema = schema_builder.build();

let key = term_absence_cache_key(&Term::from_field_text(body, "hello"));
// Stable for the same term, and disjoint from the JSON query keys the `CacheNode`
// positive cache stores (which start with '{', never a hex field id).
assert!(!key.starts_with('{'));
assert_eq!(
warmup(&searcher, &warmup_info).await.unwrap(),
WarmupOutcome::Completed
key,
term_absence_cache_key(&Term::from_field_text(body, "hello"))
);
// A different value or a different field yields a different key.
assert_ne!(
key,
term_absence_cache_key(&Term::from_field_text(body, "world"))
);
assert_ne!(
key,
term_absence_cache_key(&Term::from_field_text(title, "hello"))
);
}

Expand Down
Loading
Loading