-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[globalindex] Support multi-column GlobalIndex framework #7933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
2179db0
9091d67
77bbc77
92692a0
2229970
5582952
7e6d5b0
230c97d
8a3bd94
df64ef1
6e79d86
62e6ac5
db9d30c
27548b1
df2d15b
a27c630
6a0c496
cf6f843
fde5010
dbaea6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.paimon.globalindex; | ||
|
|
||
| import org.apache.paimon.data.InternalRow; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| /** Index writer for global index that accepts multiple column values per row. */ | ||
| public interface GlobalIndexMultiColumnWriter extends GlobalIndexWriter { | ||
|
|
||
| /** | ||
| * Write one record's indexed columns at the given relative row id. | ||
| * | ||
| * @param rowId the record's row id relative to the current shard (0 to rowCnt - 1); a null row | ||
| * still advances the row id without indexing a value | ||
| * @param row a projected row containing only the indexed columns, whose layout matches the | ||
| * fields order passed to {@link GlobalIndexerFactory#create(java.util.List, | ||
| * org.apache.paimon.options.Options)} | ||
| */ | ||
| void write(long rowId, @Nullable InternalRow row); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,18 +24,29 @@ | |
| import org.apache.paimon.index.GlobalIndexMeta; | ||
| import org.apache.paimon.index.IndexFileMeta; | ||
| import org.apache.paimon.index.IndexPathFactory; | ||
| import org.apache.paimon.manifest.ManifestEntry; | ||
| import org.apache.paimon.options.Options; | ||
| import org.apache.paimon.schema.SchemaManager; | ||
| import org.apache.paimon.table.FileStoreTable; | ||
| import org.apache.paimon.types.DataField; | ||
| import org.apache.paimon.utils.Range; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import javax.annotation.Nullable; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Risk —
retainedMeta.indexFieldId() != addedMeta.indexFieldId()With Suggestion: add
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for catching this! Fixed in IndexManifestFileHandler.validateRetainedIndexFiles().
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor —
Suggestion: when
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! Fixed in TableIndexesTable.toRow(). When indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID, resolve field names from extraFieldIds() and join with commas (e.g. "title,vec"). Single-column path unchanged.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion — add The sentinel check Consider adding a convenience method to public boolean isMultiColumn() {
return indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID;
}Then all call sites replace
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Added GlobalIndexMeta.isMultiColumn() and replaced all sentinel checks across the following classes:
|
||
|
|
||
| /** Utils for global index build. */ | ||
| public class GlobalIndexBuilderUtils { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexBuilderUtils.class); | ||
|
|
||
| public static List<IndexFileMeta> toIndexFileMetas( | ||
| FileIO fileIO, | ||
| IndexPathFactory indexPathFactory, | ||
|
|
@@ -45,12 +56,62 @@ public static List<IndexFileMeta> toIndexFileMetas( | |
| String indexType, | ||
| List<ResultEntry> entries) | ||
| throws IOException { | ||
| return toIndexFileMetas( | ||
| fileIO, indexPathFactory, options, range, indexFieldId, null, indexType, entries); | ||
| } | ||
|
|
||
| /** | ||
| * Builds the index file metas. The first column in {@code fields} is treated as the primary | ||
| * index column (e.g. the first column in {@code CREATE ... INDEX ON (a, b, c)}) and is stored | ||
| * as {@code indexFieldId}; the remaining columns go into {@code extraFieldIds}. Callers must | ||
| * pass {@code fields} in the intended column order. | ||
| */ | ||
| public static List<IndexFileMeta> toIndexFileMetas( | ||
| FileIO fileIO, | ||
| IndexPathFactory indexPathFactory, | ||
| CoreOptions options, | ||
| Range range, | ||
| List<DataField> fields, | ||
| String indexType, | ||
| List<ResultEntry> entries) | ||
| throws IOException { | ||
| // The first column is the primary index column and is stored as indexFieldId; the | ||
| // remaining columns (if any) go into extraFieldIds. | ||
| int indexFieldId = fields.get(0).id(); | ||
| int[] extraFieldIds = | ||
| fields.size() > 1 | ||
| ? fields.subList(1, fields.size()).stream() | ||
| .mapToInt(DataField::id) | ||
| .toArray() | ||
| : null; | ||
| return toIndexFileMetas( | ||
| fileIO, | ||
| indexPathFactory, | ||
| options, | ||
| range, | ||
| indexFieldId, | ||
| extraFieldIds, | ||
| indexType, | ||
| entries); | ||
| } | ||
|
|
||
| private static List<IndexFileMeta> toIndexFileMetas( | ||
| FileIO fileIO, | ||
| IndexPathFactory indexPathFactory, | ||
| CoreOptions options, | ||
| Range range, | ||
| int indexFieldId, | ||
| @Nullable int[] extraFieldIds, | ||
| String indexType, | ||
| List<ResultEntry> entries) | ||
| throws IOException { | ||
| List<IndexFileMeta> results = new ArrayList<>(); | ||
| for (ResultEntry entry : entries) { | ||
| String fileName = entry.fileName(); | ||
| long fileSize = fileIO.getFileSize(indexPathFactory.toPath(fileName)); | ||
| GlobalIndexMeta globalIndexMeta = | ||
| new GlobalIndexMeta(range.from, range.to, indexFieldId, null, entry.meta()); | ||
| new GlobalIndexMeta( | ||
| range.from, range.to, indexFieldId, extraFieldIds, entry.meta()); | ||
|
|
||
| Path externalPathDir = options.globalIndexExternalPath(); | ||
| String externalPathString = null; | ||
|
|
@@ -78,6 +139,77 @@ public static GlobalIndexWriter createIndexWriter( | |
| return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table)); | ||
| } | ||
|
|
||
| public static GlobalIndexWriter createIndexWriter( | ||
| FileStoreTable table, String indexType, List<DataField> fields, Options options) | ||
| throws IOException { | ||
| GlobalIndexer globalIndexer = GlobalIndexer.create(indexType, fields, options); | ||
| return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table)); | ||
| } | ||
|
|
||
| /** | ||
| * Find the minimum firstRowId among files whose schema does not contain all index columns. | ||
| * Files at or beyond this rowId cannot be indexed because the column was added later via ALTER | ||
| * TABLE. | ||
| * | ||
| * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the columns | ||
| */ | ||
| public static long findMinNonIndexableRowId( | ||
| SchemaManager schemaManager, List<ManifestEntry> entries, List<String> indexColumns) { | ||
| Map<Long, Boolean> schemaContainsColumns = new HashMap<>(); | ||
| long minRowId = Long.MAX_VALUE; | ||
| long minSchemaId = -1; | ||
| for (ManifestEntry entry : entries) { | ||
| long sid = entry.file().schemaId(); | ||
| boolean contains = | ||
| schemaContainsColumns.computeIfAbsent( | ||
| sid, | ||
| id -> schemaManager.schema(id).fieldNames().containsAll(indexColumns)); | ||
| if (!contains && entry.file().firstRowId() != null) { | ||
| long rowId = entry.file().nonNullFirstRowId(); | ||
| if (rowId < minRowId) { | ||
| minRowId = rowId; | ||
| minSchemaId = sid; | ||
| } | ||
| } | ||
| } | ||
| if (minRowId != Long.MAX_VALUE) { | ||
| List<String> schemaFields = schemaManager.schema(minSchemaId).fieldNames(); | ||
| List<String> missingColumns = new ArrayList<>(); | ||
| for (String col : indexColumns) { | ||
| if (!schemaFields.contains(col)) { | ||
| missingColumns.add(col); | ||
| } | ||
| } | ||
| LOG.info( | ||
| "Found non-indexable files: schemaId={} missing columns {}, boundaryRowId={}.", | ||
| minSchemaId, | ||
| missingColumns, | ||
| minRowId); | ||
| } | ||
| return minRowId; | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: the old
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add log done |
||
| /** Keep only entries whose firstRowId is strictly less than the given boundary. */ | ||
| public static List<ManifestEntry> filterEntriesBefore( | ||
| List<ManifestEntry> entries, long boundaryRowId) { | ||
| if (boundaryRowId == Long.MAX_VALUE) { | ||
| return entries; | ||
| } | ||
| List<ManifestEntry> result = new ArrayList<>(); | ||
| for (ManifestEntry entry : entries) { | ||
| if (entry.file().firstRowId() != null | ||
| && entry.file().nonNullFirstRowId() < boundaryRowId) { | ||
| result.add(entry); | ||
| } | ||
| } | ||
| LOG.info( | ||
| "Filtered {} files to {} indexable files (boundaryRowId={}).", | ||
| entries.size(), | ||
| result.size(), | ||
| boundaryRowId); | ||
| return result; | ||
| } | ||
|
|
||
| private static GlobalIndexFileReadWrite createGlobalIndexFileReadWrite(FileStoreTable table) { | ||
| IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); | ||
| return new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High risk — MERGE path crash:
MULTI_COLUMN_INDEX_FIELD_ID = -1breaks existing code that callsrowType.getField(globalIndexMeta.indexFieldId())without guarding against -1:MergeIntoUpdateChecker.java:104(Flink): scans index manifest entries and doesrowType.getField(globalIndexMeta.indexFieldId())— will throw when encountering a multi-column index.MergeIntoPaimonDataEvolutionTable.scala:514(Spark): same pattern —rowType.getField(globalIndexMeta.indexFieldId()).name().Once a table has a multi-column global index, any
MERGE INTOthat touches indexed columns will crash with "Cannot find field by field id: -1".Fix: these callers need to handle
MULTI_COLUMN_INDEX_FIELD_IDby readingextraFieldIds()to get the actual column list.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix:
Added getIndexedFieldNames helper in both Flink and Spark paths:
Both the index filter (which entries are affected) and the error reporting (conflicted column names) now correctly handle multi-column indexes.
Affected files: