diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexMultiColumnWriter.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexMultiColumnWriter.java new file mode 100644 index 000000000000..58a847b64ca8 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexMultiColumnWriter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex; + +import org.apache.paimon.data.InternalRow; + +import javax.annotation.Nullable; + +/** Index writer for global index that accepts multiple column values per row. */ +public interface GlobalIndexMultiColumnWriter extends GlobalIndexWriter { + + /** + * Write one record's indexed columns at the given relative row id. + * + * @param rowId the record's row id relative to the current shard (0 to rowCnt - 1); a null row + * still advances the row id without indexing a value + * @param row a projected row containing only the indexed columns, whose layout matches the + * fields order passed to {@link GlobalIndexerFactory#create(java.util.List, + * org.apache.paimon.options.Options)} + */ + void write(long rowId, @Nullable InternalRow row); +} diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java index 74d223a60467..6c46415cfeee 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java @@ -41,4 +41,9 @@ static GlobalIndexer create(String type, DataField dataField, Options options) { GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type); return globalIndexerFactory.create(dataField, options); } + + static GlobalIndexer create(String type, List fields, Options options) { + GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type); + return globalIndexerFactory.create(fields, options); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java index 6eabb6d25360..b028ba4470cb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java @@ -22,10 +22,30 @@ import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; +import java.util.List; + /** File index factory to construct {@link FileIndexer}. */ public interface GlobalIndexerFactory { String identifier(); GlobalIndexer create(DataField dataField, Options options); + + /** + * Whether this index type supports multi-column indexes. A factory that returns {@code true} + * must override {@link #create(List, Options)} to handle more than one column. + */ + default boolean supportsMultiColumn() { + return false; + } + + default GlobalIndexer create(List fields, Options options) { + if (fields.size() > 1) { + throw new UnsupportedOperationException( + String.format( + "Index type '%s' does not support multi-column index, got columns: %s", + identifier(), fields)); + } + return create(fields.get(0), options); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java index 085423efa851..62b13833b393 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java @@ -24,18 +24,29 @@ import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.index.IndexPathFactory; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; import org.apache.paimon.utils.Range; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** Utils for global index build. */ public class GlobalIndexBuilderUtils { + private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexBuilderUtils.class); + public static List toIndexFileMetas( FileIO fileIO, IndexPathFactory indexPathFactory, @@ -45,12 +56,62 @@ public static List toIndexFileMetas( String indexType, List entries) throws IOException { + return toIndexFileMetas( + fileIO, indexPathFactory, options, range, indexFieldId, null, indexType, entries); + } + + /** + * Builds the index file metas. The first column in {@code fields} is treated as the primary + * index column (e.g. the first column in {@code CREATE ... INDEX ON (a, b, c)}) and is stored + * as {@code indexFieldId}; the remaining columns go into {@code extraFieldIds}. Callers must + * pass {@code fields} in the intended column order. + */ + public static List toIndexFileMetas( + FileIO fileIO, + IndexPathFactory indexPathFactory, + CoreOptions options, + Range range, + List fields, + String indexType, + List entries) + throws IOException { + // The first column is the primary index column and is stored as indexFieldId; the + // remaining columns (if any) go into extraFieldIds. + int indexFieldId = fields.get(0).id(); + int[] extraFieldIds = + fields.size() > 1 + ? fields.subList(1, fields.size()).stream() + .mapToInt(DataField::id) + .toArray() + : null; + return toIndexFileMetas( + fileIO, + indexPathFactory, + options, + range, + indexFieldId, + extraFieldIds, + indexType, + entries); + } + + private static List toIndexFileMetas( + FileIO fileIO, + IndexPathFactory indexPathFactory, + CoreOptions options, + Range range, + int indexFieldId, + @Nullable int[] extraFieldIds, + String indexType, + List entries) + throws IOException { List results = new ArrayList<>(); for (ResultEntry entry : entries) { String fileName = entry.fileName(); long fileSize = fileIO.getFileSize(indexPathFactory.toPath(fileName)); GlobalIndexMeta globalIndexMeta = - new GlobalIndexMeta(range.from, range.to, indexFieldId, null, entry.meta()); + new GlobalIndexMeta( + range.from, range.to, indexFieldId, extraFieldIds, entry.meta()); Path externalPathDir = options.globalIndexExternalPath(); String externalPathString = null; @@ -78,6 +139,77 @@ public static GlobalIndexWriter createIndexWriter( return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table)); } + public static GlobalIndexWriter createIndexWriter( + FileStoreTable table, String indexType, List fields, Options options) + throws IOException { + GlobalIndexer globalIndexer = GlobalIndexer.create(indexType, fields, options); + return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table)); + } + + /** + * Find the minimum firstRowId among files whose schema does not contain all index columns. + * Files at or beyond this rowId cannot be indexed because the column was added later via ALTER + * TABLE. + * + * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the columns + */ + public static long findMinNonIndexableRowId( + SchemaManager schemaManager, List entries, List indexColumns) { + Map schemaContainsColumns = new HashMap<>(); + long minRowId = Long.MAX_VALUE; + long minSchemaId = -1; + for (ManifestEntry entry : entries) { + long sid = entry.file().schemaId(); + boolean contains = + schemaContainsColumns.computeIfAbsent( + sid, + id -> schemaManager.schema(id).fieldNames().containsAll(indexColumns)); + if (!contains && entry.file().firstRowId() != null) { + long rowId = entry.file().nonNullFirstRowId(); + if (rowId < minRowId) { + minRowId = rowId; + minSchemaId = sid; + } + } + } + if (minRowId != Long.MAX_VALUE) { + List schemaFields = schemaManager.schema(minSchemaId).fieldNames(); + List missingColumns = new ArrayList<>(); + for (String col : indexColumns) { + if (!schemaFields.contains(col)) { + missingColumns.add(col); + } + } + LOG.info( + "Found non-indexable files: schemaId={} missing columns {}, boundaryRowId={}.", + minSchemaId, + missingColumns, + minRowId); + } + return minRowId; + } + + /** Keep only entries whose firstRowId is strictly less than the given boundary. */ + public static List filterEntriesBefore( + List entries, long boundaryRowId) { + if (boundaryRowId == Long.MAX_VALUE) { + return entries; + } + List result = new ArrayList<>(); + for (ManifestEntry entry : entries) { + if (entry.file().firstRowId() != null + && entry.file().nonNullFirstRowId() < boundaryRowId) { + result.add(entry); + } + } + LOG.info( + "Filtered {} files to {} indexable files (boundaryRowId={}).", + entries.size(), + result.size(), + boundaryRowId); + return result; + } + private static GlobalIndexFileReadWrite createGlobalIndexFileReadWrite(FileStoreTable table) { IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); return new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory); diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java index 975b28183331..f7264b1eb4dc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java @@ -53,6 +53,7 @@ import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_THREAD_NUM; import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames; import static org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Scanner for shard-based global indexes. */ @@ -74,29 +75,73 @@ public GlobalIndexScanner( GlobalIndexReadThreadPool.getExecutorService(options.get(GLOBAL_INDEX_THREAD_NUM)); this.indexPathFactory = indexPathFactory; GlobalIndexFileReader indexFileReader = meta -> fileIO.newInputStream(meta.filePath()); - Map>>> indexMetas = new HashMap<>(); + Map indexMetas = new HashMap<>(); + Map> extraIndexMetas = new HashMap<>(); for (IndexFileMeta indexFile : indexFiles) { GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); - int fieldId = meta.indexFieldId(); String indexType = indexFile.indexType(); - indexMetas - .computeIfAbsent(fieldId, k -> new HashMap<>()) - .computeIfAbsent(indexType, k -> new HashMap<>()) - .computeIfAbsent( - new Range(meta.rowRangeStart(), meta.rowRangeEnd()), - k -> new ArrayList<>()) - .add(indexFile); + Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd()); + int indexFieldId = meta.indexFieldId(); + List fieldIds = meta.getIndexedFieldIds(); + IndexMetaFileGroup group = indexMetas.get(indexFieldId); + if (group == null) { + group = new IndexMetaFileGroup(indexFieldId, fieldIds); + indexMetas.put(indexFieldId, group); + if (meta.extraFieldIds() != null) { + for (int extra : meta.extraFieldIds()) { + extraIndexMetas.computeIfAbsent(extra, k -> new ArrayList<>()).add(group); + } + } + } else { + checkArgument( + group.fieldIds.equals(fieldIds), + "Primary field %s owns multiple indexes with different columns %s and %s; " + + "a primary column can own at most one index.", + indexFieldId, + group.fieldIds, + fieldIds); + } + group.addFile(indexType, range, indexFile); } IntFunction> readersFunction = - fieldId -> - createReaders( - indexFileReader, - indexMetas.get(fieldId), - rowType.getField(fieldId)); + fId -> { + IndexMetaFileGroup group = indexMetas.get(fId); + if (group == null) { + List extraGroups = extraIndexMetas.get(fId); + if (extraGroups == null || extraGroups.isEmpty()) { + return Collections.emptyList(); + } + group = extraGroups.get(0); + } + List fields = + group.fieldIds.stream() + .map(rowType::getField) + .collect(Collectors.toList()); + return createReaders(indexFileReader, group.metas, fields); + }; this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, readersFunction); } + /** All index files of one global index (single- or multi-column), grouped for reading. */ + private static class IndexMetaFileGroup { + + private final int indexFieldId; + private final List fieldIds; + private final Map>> metas = new HashMap<>(); + + IndexMetaFileGroup(int indexFieldId, List fieldIds) { + this.indexFieldId = indexFieldId; + this.fieldIds = fieldIds; + } + + void addFile(String indexType, Range range, IndexFileMeta indexFile) { + metas.computeIfAbsent(indexType, k -> new HashMap<>()) + .computeIfAbsent(range, k -> new ArrayList<>()) + .add(indexFile); + } + } + public static Optional create( FileStoreTable table, Collection indexFiles) { if (indexFiles.isEmpty()) { @@ -127,7 +172,19 @@ public static Optional create( if (globalIndex == null) { return false; } - return filterFieldIds.contains(globalIndex.indexFieldId()); + // Collect indexes whose primary column is filtered, and also multi-column + // indexes that have a filtered column as an extra (used as a fallback). + if (filterFieldIds.contains(globalIndex.indexFieldId())) { + return true; + } + if (globalIndex.extraFieldIds() != null) { + for (int id : globalIndex.extraFieldIds()) { + if (filterFieldIds.contains(id)) { + return true; + } + } + } + return false; }; List indexFiles = @@ -145,7 +202,7 @@ public Optional scan(Predicate predicate) { private Collection createReaders( GlobalIndexFileReader indexFileReadWrite, Map>> indexMetas, - DataField dataField) { + List fields) { if (indexMetas == null) { return Collections.emptyList(); } @@ -155,7 +212,7 @@ private Collection createReaders( String indexType = entry.getKey(); Map> metas = entry.getValue(); GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(indexType); - GlobalIndexer globalIndexer = globalIndexerFactory.create(dataField, options); + GlobalIndexer globalIndexer = globalIndexerFactory.create(fields, options); List> futures = new ArrayList<>(metas.size()); for (Map.Entry> rangeMetas : metas.entrySet()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java b/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java index c468bbffb3aa..a987e994f9ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java @@ -27,7 +27,9 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; /** Schema for global index. */ public class GlobalIndexMeta { @@ -78,6 +80,15 @@ public int indexFieldId() { return indexFieldId; } + /** + * Whether this index covers more than one column. {@link #indexFieldId} is always the primary + * column; {@link #extraFieldIds} holds the remaining columns and is null/empty for a + * single-column index. + */ + public boolean isMultiColumn() { + return extraFieldIds != null && extraFieldIds.length > 0; + } + @Nullable public int[] extraFieldIds() { return extraFieldIds; @@ -87,4 +98,32 @@ public int[] extraFieldIds() { public byte[] indexMeta() { return indexMeta; } + + /** All indexed field ids in order: the primary {@link #indexFieldId} followed by the rest. */ + public List getIndexedFieldIds() { + List ids = new ArrayList<>(); + ids.add(indexFieldId); + if (extraFieldIds != null) { + for (int id : extraFieldIds) { + ids.add(id); + } + } + return ids; + } + + public List getIndexedFields(RowType rowType) { + List fields = new ArrayList<>(); + for (int id : getIndexedFieldIds()) { + fields.add(rowType.getField(id)); + } + return fields; + } + + public List getIndexedFieldNames(RowType rowType) { + List names = new ArrayList<>(); + for (int id : getIndexedFieldIds()) { + names.add(rowType.getField(id).name()); + } + return names; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java index 3621483197f7..f99278085550 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -241,11 +242,13 @@ private void validateRetainedIndexFiles( GlobalIndexMeta addedMeta = added.indexFile().globalIndexMeta(); if (addedMeta == null || retainedMeta.indexFieldId() != addedMeta.indexFieldId() - || !Range.intersect( - retainedMeta.rowRangeStart(), - retainedMeta.rowRangeEnd(), - addedMeta.rowRangeStart(), - addedMeta.rowRangeEnd())) { + || (Arrays.equals( + retainedMeta.extraFieldIds(), addedMeta.extraFieldIds()) + && !Range.intersect( + retainedMeta.rowRangeStart(), + retainedMeta.rowRangeEnd(), + addedMeta.rowRangeStart(), + addedMeta.rowRangeEnd()))) { continue; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java index 37b5e4553713..7831ebb67a92 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java @@ -67,10 +67,20 @@ public GlobalIndexResult read(List splits) { return GlobalIndexResult.createEmpty(); } - String indexType = splits.get(0).fullTextIndexFiles().get(0).indexType(); - GlobalIndexer globalIndexer = - GlobalIndexerFactoryUtils.load(indexType) - .create(textColumn, table.coreOptions().toConfiguration()); + IndexFileMeta firstFile = splits.get(0).fullTextIndexFiles().get(0); + String indexType = firstFile.indexType(); + GlobalIndexMeta firstMeta = checkNotNull(firstFile.globalIndexMeta()); + GlobalIndexer globalIndexer; + if (firstMeta.isMultiColumn()) { + List fields = firstMeta.getIndexedFields(table.rowType()); + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(fields, table.coreOptions().toConfiguration()); + } else { + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(textColumn, table.coreOptions().toConfiguration()); + } IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); int parallelism = table.coreOptions().toConfiguration().get(GLOBAL_INDEX_THREAD_NUM); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java index a3402c3f1d66..e3210e95c144 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java @@ -84,10 +84,20 @@ public GlobalIndexResult read(List splits) { RoaringNavigableMap64 preFilter = preFilter(splits).orElse(null); - String indexType = splits.get(0).vectorIndexFiles().get(0).indexType(); - GlobalIndexer globalIndexer = - GlobalIndexerFactoryUtils.load(indexType) - .create(vectorColumn, table.coreOptions().toConfiguration()); + IndexFileMeta firstFile = splits.get(0).vectorIndexFiles().get(0); + String indexType = firstFile.indexType(); + GlobalIndexMeta firstMeta = checkNotNull(firstFile.globalIndexMeta()); + GlobalIndexer globalIndexer; + if (firstMeta.isMultiColumn()) { + List fields = firstMeta.getIndexedFields(table.rowType()); + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(fields, table.coreOptions().toConfiguration()); + } else { + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(vectorColumn, table.coreOptions().toConfiguration()); + } IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); int parallelism = table.coreOptions().toConfiguration().get(GLOBAL_INDEX_THREAD_NUM); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java index d3db6dd13d37..5098cc959129 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java @@ -94,7 +94,7 @@ public Plan scan() { Map> vectorByRange = new HashMap<>(); for (IndexFileMeta indexFile : allIndexFiles) { GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); - if (meta.indexFieldId() == vectorColumn.id()) { + if (isPrimaryColumn(meta, vectorColumn.id())) { Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd()); vectorByRange.computeIfAbsent(range, k -> new ArrayList<>()).add(indexFile); } @@ -111,7 +111,7 @@ public Plan scan() { f -> { GlobalIndexMeta globalIndex = checkNotNull(f.globalIndexMeta()); - if (globalIndex.indexFieldId() == vectorColumn.id()) { + if (isPrimaryColumn(globalIndex, vectorColumn.id())) { return false; } return range.hasIntersection(globalIndex.rowRange()); @@ -122,4 +122,8 @@ public Plan scan() { return () -> splits; } + + private static boolean isPrimaryColumn(GlobalIndexMeta meta, int fieldId) { + return meta.indexFieldId() == fieldId; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java index 320257ce1057..9ad88e977b3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java @@ -235,8 +235,16 @@ private InternalRow toRow( String indexFieldName = null; if (globalMeta != null) { try { - indexFieldName = logicalRowType.getField(globalMeta.indexFieldId()).name(); - } catch (RuntimeException ignored) { + indexFieldName = + String.join(",", globalMeta.getIndexedFieldNames(logicalRowType)); + } catch (RuntimeException e) { + // Indexed columns may no longer exist in the current schema (e.g. dropped via + // ALTER TABLE); leave the name empty instead of failing the listing. + LOG.debug( + "Failed to resolve indexed field names for index file {} (primary field {}).", + indexManifestEntry.indexFile().fileName(), + globalMeta.indexFieldId(), + e); } } return GenericRow.of( diff --git a/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java new file mode 100644 index 000000000000..67852ae925ff --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.index.IndexPathFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link GlobalIndexBuilderUtils}. */ +class GlobalIndexBuilderUtilsTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private IndexPathFactory indexPathFactory; + private CoreOptions coreOptions; + + @BeforeEach + void setUp() { + fileIO = new LocalFileIO(); + Path dir = new Path(tempDir.toString()); + indexPathFactory = + new IndexPathFactory() { + @Override + public Path toPath(String fileName) { + return new Path(dir, fileName); + } + + @Override + public Path newPath() { + return new Path(dir, UUID.randomUUID().toString()); + } + + @Override + public boolean isExternalPath() { + return false; + } + }; + coreOptions = new CoreOptions(new Options().toMap()); + } + + // Test: 2 columns (title + vec), primary column title is indexFieldId, rest in extraFieldIds + @Test + void testToIndexFileMetasMultiColumn() throws IOException { + DataField titleField = new DataField(1, "title", new VarCharType(Integer.MAX_VALUE)); + DataField vecField = new DataField(2, "vec", new ArrayType(new FloatType())); + List fields = Arrays.asList(titleField, vecField); + + List entries = createDummyResultEntries(); + Range range = new Range(0, 99); + + List metas = + GlobalIndexBuilderUtils.toIndexFileMetas( + fileIO, indexPathFactory, coreOptions, range, fields, "test-type", entries); + + assertThat(metas).hasSize(1); + assertThat(metas.get(0).globalIndexMeta().indexFieldId()).isEqualTo(1); + assertThat(metas.get(0).globalIndexMeta().extraFieldIds()).isEqualTo(new int[] {2}); + assertThat(metas.get(0).globalIndexMeta().rowRangeStart()).isEqualTo(0); + assertThat(metas.get(0).globalIndexMeta().rowRangeEnd()).isEqualTo(99); + } + + // Test: single column, extraFieldIds should be null (backward compatible with single-column + // path) + @Test + void testToIndexFileMetasSingleColumn() throws IOException { + DataField titleField = new DataField(1, "title", new VarCharType(Integer.MAX_VALUE)); + List fields = Collections.singletonList(titleField); + + List entries = createDummyResultEntries(); + Range range = new Range(0, 49); + + List metas = + GlobalIndexBuilderUtils.toIndexFileMetas( + fileIO, indexPathFactory, coreOptions, range, fields, "test-type", entries); + + assertThat(metas).hasSize(1); + assertThat(metas.get(0).globalIndexMeta().indexFieldId()).isEqualTo(1); + assertThat(metas.get(0).globalIndexMeta().extraFieldIds()).isNull(); + } + + // Test: 3 columns (title + vec + id), primary column title is indexFieldId, rest in + // extraFieldIds + @Test + void testToIndexFileMetasThreeColumns() throws IOException { + DataField titleField = new DataField(1, "title", new VarCharType(Integer.MAX_VALUE)); + DataField vecField = new DataField(2, "vec", new ArrayType(new FloatType())); + DataField idField = new DataField(3, "id", new IntType()); + List fields = Arrays.asList(titleField, vecField, idField); + + List entries = createDummyResultEntries(); + Range range = new Range(0, 199); + + List metas = + GlobalIndexBuilderUtils.toIndexFileMetas( + fileIO, indexPathFactory, coreOptions, range, fields, "test-type", entries); + + assertThat(metas).hasSize(1); + assertThat(metas.get(0).globalIndexMeta().indexFieldId()).isEqualTo(1); + assertThat(metas.get(0).globalIndexMeta().extraFieldIds()).isEqualTo(new int[] {2, 3}); + } + + private List createDummyResultEntries() throws IOException { + String fileName = "test-index-" + UUID.randomUUID(); + Path filePath = indexPathFactory.toPath(fileName); + fileIO.newOutputStream(filePath, false).close(); + return Collections.singletonList(new ResultEntry(fileName, 100, null)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java index 8b1122382aae..bdd0c0d49194 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java @@ -100,10 +100,12 @@ private void checkUpdatedColumns() { GlobalIndexMeta globalIndexMeta = entry.indexFile().globalIndexMeta(); if (globalIndexMeta != null) { - String fieldName = - rowType.getField(globalIndexMeta.indexFieldId()) - .name(); - return updatedColumns.contains(fieldName) + List indexedNames = + globalIndexMeta.getIndexedFieldNames(rowType); + boolean overlaps = + indexedNames.stream() + .anyMatch(updatedColumns::contains); + return overlaps && affectedPartitions.contains(entry.partition()); } return false; @@ -116,8 +118,8 @@ private void checkUpdatedColumns() { case THROW_ERROR: Set conflictedColumns = affectedEntries.stream() - .map(file -> file.indexFile().globalIndexMeta().indexFieldId()) - .map(id -> rowType.getField(id).name()) + .map(file -> file.indexFile().globalIndexMeta()) + .flatMap(meta -> meta.getIndexedFieldNames(rowType).stream()) .collect(Collectors.toSet()); throw new RuntimeException( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index 5896503ce09d..c94bc2deda65 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -29,7 +29,9 @@ import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; +import org.apache.paimon.globalindex.GlobalIndexMultiColumnWriter; import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; @@ -38,7 +40,6 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.sink.BatchWriteBuilder; @@ -50,6 +51,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.Range; import org.apache.flink.streaming.api.datastream.DataStream; @@ -65,7 +67,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -74,6 +75,8 @@ import java.util.stream.Collectors; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter; +import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.filterEntriesBefore; +import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.findMinNonIndexableRowId; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas; import static org.apache.paimon.io.CompactIncrement.emptyIncrement; import static org.apache.paimon.io.DataIncrement.deleteIndexIncrement; @@ -103,7 +106,7 @@ public static void buildIndexAndExecute( buildIndexAndExecute( env, table, - indexColumn, + Collections.singletonList(indexColumn), indexType, partitionPredicate, userOptions, @@ -119,12 +122,49 @@ public static void buildIndexAndExecute( Options userOptions, long maxIndexedRowId) throws Exception { + buildIndexAndExecute( + env, + table, + Collections.singletonList(indexColumn), + indexType, + partitionPredicate, + userOptions, + maxIndexedRowId); + } + + public static void buildIndexAndExecute( + StreamExecutionEnvironment env, + FileStoreTable table, + List indexColumns, + String indexType, + PartitionPredicate partitionPredicate, + Options userOptions) + throws Exception { + buildIndexAndExecute( + env, + table, + indexColumns, + indexType, + partitionPredicate, + userOptions, + NO_MAX_INDEXED_ROW_ID); + } + + public static void buildIndexAndExecute( + StreamExecutionEnvironment env, + FileStoreTable table, + List indexColumns, + String indexType, + PartitionPredicate partitionPredicate, + Options userOptions, + long maxIndexedRowId) + throws Exception { boolean hasIndexToBuild = buildIndex( env, () -> new GenericGlobalIndexBuilder(table), table, - indexColumn, + indexColumns, indexType, partitionPredicate, userOptions, @@ -149,13 +189,34 @@ public static boolean buildIndex( env, indexBuilderSupplier, table, - indexColumn, + Collections.singletonList(indexColumn), indexType, partitionPredicate, userOptions, NO_MAX_INDEXED_ROW_ID); } + public static boolean buildIndex( + StreamExecutionEnvironment env, + Supplier indexBuilderSupplier, + FileStoreTable table, + String indexColumn, + String indexType, + PartitionPredicate partitionPredicate, + Options userOptions, + long maxIndexedRowId) + throws Exception { + return buildIndex( + env, + indexBuilderSupplier, + table, + Collections.singletonList(indexColumn), + indexType, + partitionPredicate, + userOptions, + maxIndexedRowId); + } + /** * Builds a generic global index topology using a {@link GenericGlobalIndexBuilder} supplier. * @@ -166,7 +227,7 @@ public static boolean buildIndex( StreamExecutionEnvironment env, Supplier indexBuilderSupplier, FileStoreTable table, - String indexColumn, + List indexColumns, String indexType, PartitionPredicate partitionPredicate, Options userOptions, @@ -183,7 +244,7 @@ public static boolean buildIndex( return buildTopology( env, table, - indexColumn, + indexColumns, indexType, userOptions, entries, @@ -203,7 +264,7 @@ public static boolean buildIndex( private static boolean buildTopology( StreamExecutionEnvironment env, FileStoreTable table, - String indexColumn, + List indexColumns, String indexType, Options userOptions, List entries, @@ -212,24 +273,24 @@ private static boolean buildTopology( throws Exception { long totalRowCount = entries.stream().mapToLong(e -> e.file().rowCount()).sum(); LOG.info( - "Scanned {} files ({} rows) across {} partitions for {} index on column '{}'" + "Scanned {} files ({} rows) across {} partitions for {} index on columns '{}'" + (maxIndexedRowId >= 0 ? ", maxIndexedRowId={}." : "."), entries.size(), totalRowCount, entries.stream().map(ManifestEntry::partition).distinct().count(), indexType, - indexColumn, + indexColumns, maxIndexedRowId); long minNonIndexableRowId = - findMinNonIndexableRowId(table.schemaManager(), entries, indexColumn); + findMinNonIndexableRowId(table.schemaManager(), entries, indexColumns); entries = filterEntriesBefore(entries, minNonIndexableRowId); RowType rowType = table.rowType(); - DataField indexField = rowType.getField(indexColumn); - // Project indexColumn + _ROW_ID so we can read the actual row ID from data - List readColumns = new ArrayList<>(); - readColumns.add(indexColumn); + List indexFields = + indexColumns.stream().map(rowType::getField).collect(Collectors.toList()); + // Project indexColumns + _ROW_ID so we can read the actual row ID from data + List readColumns = new ArrayList<>(indexColumns); readColumns.add(SpecialFields.ROW_ID.name()); RowType projectedRowType = SpecialFields.rowTypeWithRowId(rowType).project(readColumns); @@ -277,7 +338,7 @@ private static boolean buildTopology( readBuilder, table, indexType, - indexField, + indexFields, projectedRowType, mergedOptions)) .setParallelism(parallelism); @@ -298,49 +359,6 @@ private static boolean buildTopology( return true; } - /** - * Find the minimum firstRowId among files whose schema does not contain the index column. Files - * at or beyond this rowId cannot be indexed because the column was added later via ALTER TABLE. - * - * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the column - */ - static long findMinNonIndexableRowId( - SchemaManager schemaManager, List entries, String indexColumn) { - Map schemaContainsColumn = new HashMap<>(); - long minRowId = Long.MAX_VALUE; - for (ManifestEntry entry : entries) { - long sid = entry.file().schemaId(); - boolean contains = - schemaContainsColumn.computeIfAbsent( - sid, id -> schemaManager.schema(id).fieldNames().contains(indexColumn)); - if (!contains && entry.file().firstRowId() != null) { - minRowId = Math.min(minRowId, entry.file().nonNullFirstRowId()); - } - } - return minRowId; - } - - /** Keep only entries whose firstRowId is strictly less than the given boundary. */ - static List filterEntriesBefore( - List entries, long boundaryRowId) { - if (boundaryRowId == Long.MAX_VALUE) { - return entries; - } - List result = new ArrayList<>(); - for (ManifestEntry entry : entries) { - if (entry.file().firstRowId() != null - && entry.file().nonNullFirstRowId() < boundaryRowId) { - result.add(entry); - } - } - LOG.info( - "Filtered {} files at or beyond rowId {}, {} files remain.", - entries.size() - result.size(), - boundaryRowId, - result.size()); - return result; - } - /** * Compute shard tasks for a full build (no rows to skip). * @@ -548,25 +566,27 @@ private static class BuildIndexOperator private final ReadBuilder readBuilder; private final FileStoreTable table; private final String indexType; - private final DataField indexField; + private final List indexFields; private final RowType projectedRowType; private final Options mergedOptions; private transient TableRead tableRead; - private transient InternalRow.FieldGetter indexFieldGetter; + private transient InternalRow.FieldGetter[] indexFieldGetters; private transient int rowIdFieldIndex; + private transient boolean multiColumn; + private transient ProjectedRow writerProjection; BuildIndexOperator( ReadBuilder readBuilder, FileStoreTable table, String indexType, - DataField indexField, + List indexFields, RowType projectedRowType, Options mergedOptions) { this.readBuilder = readBuilder; this.table = table; this.indexType = indexType; - this.indexField = indexField; + this.indexFields = indexFields; this.projectedRowType = projectedRowType; this.mergedOptions = mergedOptions; } @@ -575,10 +595,22 @@ private static class BuildIndexOperator public void open() throws Exception { super.open(); this.tableRead = readBuilder.newRead(); - this.indexFieldGetter = - InternalRow.createFieldGetter( - indexField.type(), projectedRowType.getFieldIndex(indexField.name())); + this.indexFieldGetters = new InternalRow.FieldGetter[indexFields.size()]; + for (int i = 0; i < indexFields.size(); i++) { + DataField field = indexFields.get(i); + indexFieldGetters[i] = + InternalRow.createFieldGetter( + field.type(), projectedRowType.getFieldIndex(field.name())); + } this.rowIdFieldIndex = projectedRowType.getFieldIndex(SpecialFields.ROW_ID.name()); + this.multiColumn = indexFields.size() > 1; + if (multiColumn) { + int[] projection = new int[indexFields.size()]; + for (int i = 0; i < indexFields.size(); i++) { + projection[i] = projectedRowType.getFieldIndex(indexFields.get(i).name()); + } + this.writerProjection = ProjectedRow.from(projection); + } } @Override @@ -595,9 +627,8 @@ public void processElement(StreamRecord element) throws Exception { task.split.dataFiles().size()); long startTime = System.currentTimeMillis(); - GlobalIndexSingletonWriter indexWriter = - (GlobalIndexSingletonWriter) - createIndexWriter(table, indexType, indexField, mergedOptions); + GlobalIndexWriter indexWriter = + createIndexWriter(table, indexType, indexFields, mergedOptions); try { long rowsSeen = 0; @@ -626,8 +657,14 @@ public void processElement(StreamRecord element) throws Exception { } // Only write rows within this shard's range if (currentRowId >= task.shardRange.from) { - Object fieldData = indexFieldGetter.getFieldOrNull(row); - indexWriter.write(fieldData); + if (multiColumn) { + long rowId = currentRowId - task.shardRange.from; + ((GlobalIndexMultiColumnWriter) indexWriter) + .write(rowId, writerProjection.replaceRow(row)); + } else { + Object fieldData = indexFieldGetters[0].getFieldOrNull(row); + ((GlobalIndexSingletonWriter) indexWriter).write(fieldData); + } rowsSeen++; } } @@ -664,7 +701,7 @@ public void processElement(StreamRecord element) throws Exception { table, partition, task.shardRange, - indexField, + indexFields, indexType, resultEntries); output.collect( @@ -688,7 +725,7 @@ private static CommitMessage flushIndex( FileStoreTable table, BinaryRow partition, Range rowRange, - DataField indexField, + List indexFields, String indexType, List resultEntries) throws IOException { @@ -698,14 +735,14 @@ private static CommitMessage flushIndex( table.store().pathFactory().globalIndexFileFactory(), table.coreOptions(), rowRange, - indexField.id(), + indexFields, indexType, resultEntries); return new CommitMessageImpl( partition, 0, null, indexIncrement(indexFileMetas), emptyIncrement()); } - private static void closeWriterQuietly(GlobalIndexSingletonWriter writer) { + private static void closeWriterQuietly(GlobalIndexWriter writer) { if (writer instanceof Closeable) { try { ((Closeable) writer).close(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java index 5f4855567047..dc3eccb513bf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.flink.btree.BTreeIndexTopoBuilder; import org.apache.paimon.flink.globalindex.GenericIndexTopoBuilder; +import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; @@ -32,8 +33,11 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -77,11 +81,23 @@ public String[] call( tableId); RowType rowType = table.rowType(); + List indexColumns = + Arrays.stream(indexColumn.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + checkArgument(!indexColumns.isEmpty(), "At least one column required."); checkArgument( - rowType.containsField(indexColumn), - "Column '%s' does not exist in table '%s'.", - indexColumn, - tableId); + indexColumns.size() == new HashSet<>(indexColumns).size(), + "Duplicate index columns are not allowed: %s", + indexColumns); + for (String col : indexColumns) { + checkArgument( + rowType.containsField(col), + "Column '%s' does not exist in table '%s'.", + col, + tableId); + } // Parse partition predicate PartitionPredicate partitionPredicate = parsePartitionPredicate(table, partitions); @@ -92,12 +108,21 @@ public String[] call( // Build global index based on index type indexType = indexType.toLowerCase().trim(); + if (indexColumns.size() > 1) { + // Whether multi-column is supported is decided by each index type's factory; fail fast + // up front instead of failing later in the build job. + checkArgument( + GlobalIndexerFactoryUtils.load(indexType).supportsMultiColumn(), + "Index type '%s' does not support multi-column index, got columns: %s", + indexType, + indexColumns); + } try { if ("btree".equals(indexType)) { BTreeIndexTopoBuilder.buildIndexAndExecute( procedureContext.getExecutionEnvironment(), table, - indexColumn, + indexColumns.get(0), partitionPredicate, userOptions); return new String[] { @@ -107,7 +132,7 @@ public String[] call( GenericIndexTopoBuilder.buildIndexAndExecute( procedureContext.getExecutionEnvironment(), table, - indexColumn, + indexColumns, indexType, partitionPredicate, userOptions); @@ -115,8 +140,8 @@ public String[] call( } catch (Exception e) { throw new RuntimeException( String.format( - "Failed to create %s index for column '%s' on table '%s'.", - indexType, indexColumn, table.name()), + "Failed to create %s index for columns '%s' on table '%s'.", + indexType, indexColumns, table.name()), e); } return new String[] { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java index 0de57077b295..c69b59ad6e3c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.BinaryString; import org.apache.paimon.fs.Path; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.io.PojoDataFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; @@ -472,10 +473,10 @@ void testAppendFilterOldFilesBeforeNewFiles() { entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 200L, 100, 0L)); List result = - GenericIndexTopoBuilder.filterEntriesBefore( + GlobalIndexBuilderUtils.filterEntriesBefore( entries, - GenericIndexTopoBuilder.findMinNonIndexableRowId( - schemaManager, entries, "vec")); + GlobalIndexBuilderUtils.findMinNonIndexableRowId( + schemaManager, entries, Collections.singletonList("vec"))); assertThat(result).hasSize(2); assertThat(result.get(0).file().nonNullFirstRowId()).isEqualTo(0L); diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 492d64bbf5bf..e72efe6efcaa 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile +import org.apache.paimon.index.GlobalIndexMeta import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.SparkTable @@ -508,9 +509,9 @@ case class MergeIntoPaimonDataEvolutionTable( if (globalIndexMeta == null) { false } else { - val fieldName = rowType.getField(globalIndexMeta.indexFieldId()).name() + val indexedNames = globalIndexMeta.getIndexedFieldNames(rowType).asScala affectedParts.contains(entry.partition()) && updateColumns.exists( - _.name.equals(fieldName)) + col => indexedNames.contains(col.name)) } } @@ -527,8 +528,7 @@ case class MergeIntoPaimonDataEvolutionTable( case GlobalIndexColumnUpdateAction.THROW_ERROR => val updatedColNames = updateColumns.map(_.name) val conflicted = affectedIndexEntries - .map(_.indexFile().globalIndexMeta().indexFieldId()) - .map(id => rowType.getField(id).name()) + .flatMap(e => e.indexFile().globalIndexMeta().getIndexedFieldNames(rowType).asScala) .toSet throw new RuntimeException( s"""MergeInto: update columns contain globally indexed columns, not supported now. diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java index 1485d14fac1c..bccf4899652c 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java @@ -20,23 +20,28 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.GlobalIndexMultiColumnWriter; import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.Range; import java.io.IOException; import java.io.Serializable; +import java.util.Collections; import java.util.List; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter; @@ -50,7 +55,7 @@ public class DefaultGlobalIndexBuilder implements Serializable { private final FileStoreTable table; private final BinaryRow partition; private final RowType readType; - private final DataField indexField; + private final List indexFields; private final String indexType; private final Range rowRange; private final Options options; @@ -63,10 +68,28 @@ public DefaultGlobalIndexBuilder( String indexType, Range rowRange, Options options) { + this( + table, + partition, + readType, + Collections.singletonList(indexField), + indexType, + rowRange, + options); + } + + public DefaultGlobalIndexBuilder( + FileStoreTable table, + BinaryRow partition, + RowType readType, + List indexFields, + String indexType, + Range rowRange, + Options options) { this.table = table; this.partition = partition; this.readType = readType; - this.indexField = indexField; + this.indexFields = indexFields; this.indexType = indexType; this.rowRange = rowRange; this.options = options; @@ -89,7 +112,7 @@ public CommitMessage build(CloseableIterator data) throws IOExcepti table.store().pathFactory().globalIndexFileFactory(), table.coreOptions(), rowRange, - indexField.id(), + indexFields, indexType, resultEntries); DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas); @@ -99,27 +122,49 @@ public CommitMessage build(CloseableIterator data) throws IOExcepti private List writePaimonRows( CloseableIterator rows, LongCounter rowCounter) throws IOException { - GlobalIndexSingletonWriter indexWriter = - (GlobalIndexSingletonWriter) - createIndexWriter(table, indexType, indexField, options); + GlobalIndexWriter indexWriter = createIndexWriter(table, indexType, indexFields, options); + boolean multiColumn = indexFields.size() > 1; try { - InternalRow.FieldGetter getter = - InternalRow.createFieldGetter( - indexField.type(), readType.getFieldIndex(indexField.name())); - rows.forEachRemaining( - row -> { - Object indexO = getter.getFieldOrNull(row); - indexWriter.write(indexO); - rowCounter.add(1); - }); + if (multiColumn) { + GlobalIndexMultiColumnWriter multiWriter = + (GlobalIndexMultiColumnWriter) indexWriter; + int[] projection = new int[indexFields.size()]; + for (int i = 0; i < indexFields.size(); i++) { + DataField field = indexFields.get(i); + projection[i] = readType.getFieldIndex(field.name()); + } + ProjectedRow projectedRow = ProjectedRow.from(projection); + int rowIdIndex = readType.getFieldIndex(SpecialFields.ROW_ID.name()); + while (rows.hasNext()) { + InternalRow row = rows.next(); + long absRowId = row.getLong(rowIdIndex); + if (absRowId < rowRange.from || absRowId > rowRange.to) { + continue; + } + multiWriter.write(absRowId - rowRange.from, projectedRow.replaceRow(row)); + rowCounter.add(1); + } + } else { + DataField indexField = indexFields.get(0); + GlobalIndexSingletonWriter singleWriter = (GlobalIndexSingletonWriter) indexWriter; + InternalRow.FieldGetter getter = + InternalRow.createFieldGetter( + indexField.type(), readType.getFieldIndex(indexField.name())); + rows.forEachRemaining( + row -> { + Object indexO = getter.getFieldOrNull(row); + singleWriter.write(indexO); + rowCounter.add(1); + }); + } return indexWriter.finish(); } finally { closeWriterQuietly(indexWriter); } } - private static void closeWriterQuietly(GlobalIndexSingletonWriter writer) { + private static void closeWriterQuietly(GlobalIndexWriter writer) { if (writer instanceof java.io.Closeable) { try { ((java.io.Closeable) writer).close(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java index afd954c39a5d..ea2cda4a8b85 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java @@ -21,12 +21,14 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.Path; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.globalindex.IndexedSplit; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageSerializer; @@ -77,6 +79,28 @@ public List buildIndex( DataField indexField, Options options) throws IOException { + return buildIndex( + spark, + relation, + partitionPredicate, + table, + indexType, + readType, + Collections.singletonList(indexField), + options); + } + + @Override + public List buildIndex( + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, + FileStoreTable table, + String indexType, + RowType readType, + List indexFields, + Options options) + throws IOException { Options tableOptions = table.coreOptions().toConfiguration(); long rowsPerShard = tableOptions @@ -88,6 +112,13 @@ public List buildIndex( List entries = table.store().newScan().withPartitionFilter(partitionPredicate).plan().files(); + List indexColumns = + indexFields.stream().map(DataField::name).collect(Collectors.toList()); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + long boundaryRowId = + GlobalIndexBuilderUtils.findMinNonIndexableRowId( + schemaManager, entries, indexColumns); + entries = GlobalIndexBuilderUtils.filterEntriesBefore(entries, boundaryRowId); // generate splits for each partition && shard Map> splits = split(table, entries, rowsPerShard); @@ -106,7 +137,7 @@ public List buildIndex( table, partition, readType, - indexField, + indexFields, indexType, indexedSplit.rowRanges().get(0), options); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java index 50c6ab34e153..3d751f4585ac 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java @@ -46,4 +46,31 @@ List buildIndex( DataField indexField, Options options) throws IOException; + + default List buildIndex( + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, + FileStoreTable table, + String indexType, + RowType readType, + List indexFields, + Options options) + throws IOException { + if (indexFields.size() > 1) { + throw new UnsupportedOperationException( + String.format( + "Topology builder '%s' does not support multi-column index, got columns: %s", + identifier(), indexFields)); + } + return buildIndex( + spark, + relation, + partitionPredicate, + table, + indexType, + readType, + indexFields.get(0), + options); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java index e25464b173d7..9bdb5c254290 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.spark.globalindex.GlobalIndexTopologyBuilder; @@ -43,11 +44,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -109,6 +112,11 @@ public InternalRow[] call(InternalRow args) { return modifySparkTable( tableIdent, sparkTable -> { + List indexColumns = + Arrays.stream(column.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); try { org.apache.paimon.table.Table t = sparkTable.getTable(); checkArgument( @@ -121,11 +129,24 @@ public InternalRow[] call(InternalRow args) { tableIdent); RowType rowType = table.rowType(); + checkArgument(!indexColumns.isEmpty(), "At least one column required."); checkArgument( - rowType.containsField(column), - "Column '%s' does not exist in table '%s'.", - column, - tableIdent); + indexColumns.size() == new HashSet<>(indexColumns).size(), + "Duplicate index columns are not allowed: %s", + indexColumns); + // No hard cap on the number of index columns: unlike row-store B-tree + // indexes (e.g. MySQL 16, PostgreSQL 32) whose limit comes from composing + // columns into a single key, the global index is built on per-type index + // frameworks. Whether multiple columns are supported, and any practical + // limit, is decided by each index type (single-column types reject + // multi-column via UnsupportedOperationException). + for (String col : indexColumns) { + checkArgument( + rowType.containsField(col), + "Column '%s' does not exist in table '%s'.", + col, + tableIdent); + } DataSourceV2Relation relation = createRelation(tableIdent, sparkTable); PartitionPredicate partitionPredicate = SparkProcedureUtils.convertToPartitionPredicate( @@ -134,15 +155,28 @@ public InternalRow[] call(InternalRow args) { spark(), relation); - DataField indexField = rowType.getField(column); - RowType projectedRowType = - rowType.project(Collections.singletonList(column)); + List indexFields = + indexColumns.stream() + .map(rowType::getField) + .collect(Collectors.toList()); + RowType projectedRowType = rowType.project(indexColumns); RowType readRowType = SpecialFields.rowTypeWithRowId(projectedRowType); HashMap parsedOptions = new HashMap<>(); ProcedureUtils.putAllOptions(parsedOptions, optionString); Options userOptions = Options.fromMap(parsedOptions); + if (indexColumns.size() > 1) { + // Whether multi-column is supported is decided by each index type's + // factory; fail fast up front instead of failing later in the build + // job. + checkArgument( + GlobalIndexerFactoryUtils.load(indexType).supportsMultiColumn(), + "Index type '%s' does not support multi-column index, got columns: %s", + indexType, + indexColumns); + } + GlobalIndexTopologyBuilder topoBuilder = GlobalIndexTopologyBuilderUtils.createTopoBuilder(indexType); @@ -154,7 +188,7 @@ public InternalRow[] call(InternalRow args) { table, indexType, readRowType, - indexField, + indexFields, userOptions); try (TableCommitImpl commit = @@ -170,8 +204,8 @@ public InternalRow[] call(InternalRow args) { } catch (Exception e) { throw new RuntimeException( String.format( - "Failed to create %s index for column '%s' on table '%s'.", - indexType, column, tableIdent), + "Failed to create %s index for columns '%s' on table '%s'.", + indexType, indexColumns, tableIdent), e); } }); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 96f8c0c5cc9f..8c84eafd1a5a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile +import org.apache.paimon.index.GlobalIndexMeta import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.SparkTable @@ -511,9 +512,9 @@ case class MergeIntoPaimonDataEvolutionTable( if (globalIndexMeta == null) { false } else { - val fieldName = rowType.getField(globalIndexMeta.indexFieldId()).name() + val indexedNames = globalIndexMeta.getIndexedFieldNames(rowType).asScala affectedParts.contains(entry.partition()) && updateColumns.exists( - _.name.equals(fieldName)) + col => indexedNames.contains(col.name)) } } @@ -530,8 +531,7 @@ case class MergeIntoPaimonDataEvolutionTable( case GlobalIndexColumnUpdateAction.THROW_ERROR => val updatedColNames = updateColumns.map(_.name) val conflicted = affectedIndexEntries - .map(_.indexFile().globalIndexMeta().indexFieldId()) - .map(id => rowType.getField(id).name()) + .flatMap(e => e.indexFile().globalIndexMeta().getIndexedFieldNames(rowType).asScala) .toSet throw new RuntimeException( s"""MergeInto: update columns contain globally indexed columns, not supported now.