Skip to content

[spark] support distributed execution of vector search on spark#8108

Open
Stefanietry wants to merge 1 commit into
apache:masterfrom
Stefanietry:opt_vector_search_on_spark
Open

[spark] support distributed execution of vector search on spark#8108
Stefanietry wants to merge 1 commit into
apache:masterfrom
Stefanietry:opt_vector_search_on_spark

Conversation

@Stefanietry
Copy link
Copy Markdown
Contributor

Purpose
Purpose: Currently, vector search operation is executed on a single node within the driver, which may lead to performance bottlenecks when dealing with large amounts of data. This issue aims to implement a distributed execution capability.
Linked issue: #8107

Tests
Add distributed vector search test via the parameter vector-search.distribute.enabled on org.apache.paimon.spark.SparkMultimodalITCase#testVector

Broadcast<RoaringNavigableMap64> preFilterBroadcast =
preFilter == null ? null : engineContext.broadcast(preFilter);

SerializableFunction<List<byte[]>, Optional<byte[]>> task =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This distributed path returns java.util.Optional<byte[]> from the Spark task and then collects it back to the driver. java.util.Optional is not Serializable in Java 8, so Spark will fail serializing the task result with NotSerializableException once this branch actually runs. Could we return a serializable value instead, for example byte[] with null meaning empty, or a small serializable wrapper?

assertThat(df.columns()).hasSize(4);
rows = df.collectAsList();
assertThat(rows).hasSize(5);
spark.sql("set spark.paimon.vector-search.distribute.enabled = true;");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion does not seem to exercise the new Spark-distributed path: the table only has a small number of vector splits, while SparkVectorReadImpl falls back to super.read unless splits.size() >= global-index.thread-num * 2 (default 64 splits). Because of that, the serialization/distributed execution code can be broken and this test would still pass. Could we force the distributed branch in this test, for example by setting spark.paimon.global-index.thread-num=1 or by creating enough index shards/splits?

return dataOutputSerializer.getCopyOfBuffer();
}

public ScoredGlobalIndexResult deserialize(byte[] data) throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helper cannot round-trip an empty ScoredGlobalIndexResult. serialize() writes only scoreSize=0 for scored results whose bitmap is empty, and the existing deserializer interprets scoreSize == 0 as a plain GlobalIndexResult; this deserialize(byte[]) method then fails the instanceof ScoredGlobalIndexResult check. In the distributed reader, a split group can legitimately produce an empty scored result when the scalar pre-filter excludes all rows in that group, so this can make filtered distributed searches fail even though the local path handles empty optionals. We probably need an explicit scored/non-scored marker in the serialization format, or avoid serializing empty scored results as successful task results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants