Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,40 @@
/** Allows a query to be cancelled */
public class CancellableCollector implements Collector {

/** Thrown when a query gets cancelled */
public static class QueryCancelledException extends RuntimeException {}
/**
* Thrown when a query gets cancelled. This is a control-flow exception only — it is caught by the
* searcher and never inspected, so we skip filling in the stack trace to keep cancellation cheap
* (matches Lucene {@code CollectionTerminatedException} and {@code
* TimeLimitingBulkScorer.TimeExceededException}).
*/
@SuppressWarnings("serial")
public static class QueryCancelledException extends RuntimeException {
@Override
public Throwable fillInStackTrace() {
// never re-thrown so we can save the expensive stacktrace
return this;
}
}

private final Collector collector;
private final AtomicBoolean isQueryCancelled;

public CancellableCollector(Collector collector) {
this(collector, new AtomicBoolean());
}

/**
* Creates a {@link CancellableCollector} that polls a caller-supplied cancellation flag. Multiple
* per-slice collectors can share the same {@link AtomicBoolean} so that a single {@link
* #cancel()} on any instance is observed by all of them — required for cancelling a query that is
* fanned out across parallel segment slices.
*/
public CancellableCollector(Collector collector, AtomicBoolean cancellationFlag) {
Objects.requireNonNull(
collector, "Internal collector not provided but wrapper collector accessed");

Objects.requireNonNull(cancellationFlag, "cancellationFlag must not be null");
this.collector = collector;
this.isQueryCancelled = new AtomicBoolean();
this.isQueryCancelled = cancellationFlag;
}

@Override
Expand All @@ -50,11 +72,18 @@ public LeafCollector getLeafCollector(LeafReaderContext context) throws IOExcept
}

return new FilterLeafCollector(collector.getLeafCollector(context)) {
int collectCount = 0;

@Override
public void collect(int doc) throws IOException {
if (isQueryCancelled.get()) {
throw new QueryCancelledException();
// To avoid polling the AtomicBoolean (volatile read) on every collected document,
// which acts as a memory barrier and prevents JIT optimizations,
// we check it only every 1024 docs. This keeps the hot loop fast while maintaining
// responsive cancellation.
if ((collectCount++ & 0x3FF) == 0) {
if (isQueryCancelled.get()) {
throw new QueryCancelledException();
}
}
in.collect(doc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ public EarlyTerminatingCollector(
assert null != delegate;
this.maxDocsToCollect = maxDocsToCollect;
this.pendingDocsToCollect = docsToCollect;
this.chunkSize = Math.min(100, maxDocsToCollect / 10);
// chunkSize must be at least 1 (small maxDocsToCollect would otherwise divide-by-zero
// in the modulo check below).
this.chunkSize = Math.max(1, Math.min(100, maxDocsToCollect / 10));
}

@Override
Expand All @@ -74,9 +76,9 @@ public void collect(int doc) throws IOException {
numCollectedLocally++;
terminatedEarly = numCollectedLocally >= maxDocsToCollect;
if (pendingDocsToCollect != null) {
pendingDocsToCollect.increment();
if (numCollectedLocally % chunkSize == 0) {
final long overallCollectedDocCount = pendingDocsToCollect.intValue();
pendingDocsToCollect.add(chunkSize);
final long overallCollectedDocCount = pendingDocsToCollect.longValue();
terminatedEarly = overallCollectedDocCount >= maxDocsToCollect;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@
/**
* Thrown by {@link EarlyTerminatingCollector} when the maximum to abort the scoring / collection
* process early, when the specified maximum number of documents were collected.
*
* <p>This is a control-flow exception only — it is caught at the search entry point and its
* formatted {@link #getMessage() message} plus {@link #getNumberCollected()}, {@link
* #getNumberScanned()}, and {@link #getApproximateTotalHits(int)} are inspected, but the stack
* trace itself is never used. We skip {@link #fillInStackTrace()} to keep early-termination cheap,
* matching the Lucene {@code CollectionTerminatedException} and {@code
* TimeLimitingBulkScorer.TimeExceededException} convention.
*/
@SuppressWarnings("serial")
public class EarlyTerminatingCollectorException extends RuntimeException {
private static final long serialVersionUID = 5939241340763428118L;
private final int numberScanned;
Expand All @@ -41,6 +49,12 @@ public EarlyTerminatingCollectorException(int numberCollected, int numberScanned
this.numberScanned = numberScanned;
}

@Override
public Throwable fillInStackTrace() {
// never re-thrown so we can save the expensive stacktrace
return this;
}

/**
* The total number of documents in the index that were "scanned" by the index when collecting the
* {@link #getNumberCollected()} documents that triggered this exception.
Expand Down
Loading
Loading