diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 7b6db1636234aa..26d1a32b25b337 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1756,12 +1756,10 @@ DEFINE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction, "true"); DEFINE_String(ann_index_ivf_list_cache_limit, "70%"); // Stale sweep time for ANN index IVF list cache in seconds. 3600s is 1 hour. DEFINE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec, "3600"); - -// Chunk size for ANN/vector index building per training/adding batch -// 1M By default. -DEFINE_mInt64(ann_index_build_chunk_size, "1000000"); -DEFINE_Validator(ann_index_build_chunk_size, - [](const int64_t config) -> bool { return config > 0; }); +// Minimum segment rows required to persist an ANN index. 0 keeps the default behavior. +DEFINE_mInt64(ann_index_build_min_segment_rows, "0"); +DEFINE_Validator(ann_index_build_min_segment_rows, + [](const int64_t config) -> bool { return config >= 0; }); DEFINE_mBool(enable_wal_tde, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 427282a4452bc4..f39dee69c4ffd8 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1809,8 +1809,8 @@ DECLARE_mInt32(max_segment_partial_column_cache_size); DECLARE_String(ann_index_ivf_list_cache_limit); // Stale sweep time for ANN index IVF list cache in seconds. DECLARE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec); -// Chunk size for ANN/vector index building per training/adding batch -DECLARE_mInt64(ann_index_build_chunk_size); +// Minimum segment rows required to persist an ANN index. +DECLARE_mInt64(ann_index_build_min_segment_rows); DECLARE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction); DECLARE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction); diff --git a/be/src/storage/index/ann/ann_index_writer.cpp b/be/src/storage/index/ann/ann_index_writer.cpp index 28d348cc319a48..21911417c4f9a1 100644 --- a/be/src/storage/index/ann/ann_index_writer.cpp +++ b/be/src/storage/index/ann/ann_index_writer.cpp @@ -17,11 +17,13 @@ #include "storage/index/ann/ann_index_writer.h" +#include #include #include #include #include "common/cast_set.h" +#include "common/config.h" #include "storage/index/ann/faiss_ann_index.h" #include "storage/index/inverted/inverted_index_fs_directory.h" @@ -39,7 +41,7 @@ AnnIndexColumnWriter::AnnIndexColumnWriter(IndexFileWriter* index_file_writer, const TabletIndex* index_meta) : _index_file_writer(index_file_writer), _index_meta(index_meta) {} -AnnIndexColumnWriter::~AnnIndexColumnWriter() {} +AnnIndexColumnWriter::~AnnIndexColumnWriter() = default; Status AnnIndexColumnWriter::init() { Result> compound_dir = _index_file_writer->open(_index_meta); @@ -77,9 +79,6 @@ Status AnnIndexColumnWriter::init() { index_type, build_parameter.dim, metric_type, build_parameter.max_degree, build_parameter.ef_construction, quantizer); - size_t block_size = AnnIndexColumnWriter::chunk_size() * build_parameter.dim; - _float_array.reserve(block_size); - return Status::OK(); } @@ -87,7 +86,10 @@ Status AnnIndexColumnWriter::add_values(const std::string fn, const void* values return Status::OK(); } -void AnnIndexColumnWriter::close_on_error() {} +void AnnIndexColumnWriter::close_on_error() { + PODArray empty_buffered_vectors; + _buffered_vectors.swap(empty_buffered_vectors); +} Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* value_ptr, const uint8_t* null_map, const uint8_t* offsets_ptr, @@ -109,26 +111,10 @@ Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* val const float* p = reinterpret_cast(value_ptr); - const size_t full_elements = AnnIndexColumnWriter::chunk_size() * dim; - size_t remaining_elements = num_rows * dim; - size_t src_offset = 0; - while (remaining_elements > 0) { - size_t available_space = full_elements - _float_array.size(); - size_t elements_to_add = std::min(remaining_elements, available_space); - - _float_array.insert(_float_array.end(), p + src_offset, p + src_offset + elements_to_add); - src_offset += elements_to_add; - remaining_elements -= elements_to_add; - - if (_float_array.size() == full_elements) { - RETURN_IF_ERROR( - _vector_index->train(AnnIndexColumnWriter::chunk_size(), _float_array.data())); - RETURN_IF_ERROR( - _vector_index->add(AnnIndexColumnWriter::chunk_size(), _float_array.data())); - _float_array.clear(); - _need_save_index = true; - } - } + // The offsets check above guarantees every array row matches the ANN index dimension. + DCHECK(p != nullptr); + _buffered_vectors.insert(_buffered_vectors.end(), p, p + num_rows * dim); + _total_rows += cast_set(num_rows); return Status::OK(); } @@ -146,54 +132,41 @@ int64_t AnnIndexColumnWriter::size() const { } Status AnnIndexColumnWriter::finish() { - Int64 min_train_rows = _vector_index->get_min_train_rows(); - - // Check if we have enough rows to train the index - // train/add the remaining data - if (_float_array.empty()) { - if (_need_save_index) { - return _vector_index->save(_dir.get()); - } else { - // No data was added at all. This can happen if the segment has 0 rows - // or all rows were filtered out. We need to delete the directory entry - // to avoid writing an empty/invalid index file. - LOG_INFO("No data to train/add for ANN index. Skipping index building."); - return _index_file_writer->delete_index(_index_meta); - } - } else { - DCHECK(_float_array.size() % _vector_index->get_dimension() == 0); - - Int64 num_rows = _float_array.size() / _vector_index->get_dimension(); - - if (num_rows >= min_train_rows) { - RETURN_IF_ERROR(_vector_index->train(num_rows, _float_array.data())); - RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data())); - _float_array.clear(); - return _vector_index->save(_dir.get()); - } else { - // It happens to have not enough data to train. - // If we have data to add before, we still need to save the index. - if (_need_save_index) { - // For IVF indexes, adding remaining vectors without training is acceptable - // because the quantizer was already trained on previous batches. These vectors - // are simply added to the nearest clusters without retraining. - RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data())); - _float_array.clear(); - return _vector_index->save(_dir.get()); - } else { - // Not enough data to train and no data added before. - // Means this is a very small segment, we can skip the index building. - // We need to delete the directory entry from index_file_writer to avoid - // writing an empty/invalid index file which causes "IndexInput read past EOF" error. - LOG_INFO( - "Remaining data size {} is less than minimum {} rows required for ANN " - "index " - "training. Skipping index building for this segment.", - num_rows, min_train_rows); - _float_array.clear(); - return _index_file_writer->delete_index(_index_meta); - } - } + if (_total_rows == 0) { + LOG_INFO("No data to train/add for ANN index. Skipping index building."); + return _index_file_writer->delete_index(_index_meta); + } + + const Int64 min_train_rows = _vector_index->get_min_train_rows(); + const Int64 effective_min_rows = + std::max(min_train_rows, cast_set(config::ann_index_build_min_segment_rows)); + if (_total_rows < effective_min_rows) { + LOG_INFO( + "Total data size {} is less than minimum {} rows required for ANN index build. " + "Skipping index building for this segment.", + _total_rows, effective_min_rows); + PODArray empty_buffered_vectors; + _buffered_vectors.swap(empty_buffered_vectors); + return _index_file_writer->delete_index(_index_meta); + } + + return _build_and_save(min_train_rows, effective_min_rows); +} + +Status AnnIndexColumnWriter::_build_and_save(Int64 min_train_rows, Int64 effective_min_rows) { + const size_t dim = _vector_index->get_dimension(); + DCHECK(_buffered_vectors.size() % dim == 0); + const Int64 train_rows = cast_set(_buffered_vectors.size() / dim); + DORIS_CHECK(train_rows == _total_rows); + DORIS_CHECK(train_rows >= effective_min_rows); + if (min_train_rows > 0) { + RETURN_IF_ERROR(_vector_index->train(train_rows, _buffered_vectors.data())); } + RETURN_IF_ERROR(_vector_index->add(train_rows, _buffered_vectors.data())); + // PODArray::clear() keeps the allocated capacity. Swap with an empty array so the + // full-segment build buffer is released before saving the index. + PODArray empty_buffered_vectors; + _buffered_vectors.swap(empty_buffered_vectors); + return _vector_index->save(_dir.get()); } } // namespace doris::segment_v2 diff --git a/be/src/storage/index/ann/ann_index_writer.h b/be/src/storage/index/ann/ann_index_writer.h index 7b7e63f8574439..67061bef9219a8 100644 --- a/be/src/storage/index/ann/ann_index_writer.h +++ b/be/src/storage/index/ann/ann_index_writer.h @@ -27,7 +27,6 @@ #include #include -#include "common/config.h" #include "core/pod_array.h" #include "storage/index/ann/ann_index.h" #include "storage/index/index_file_writer.h" @@ -38,13 +37,6 @@ namespace doris::segment_v2 { class AnnIndexColumnWriter : public IndexColumnWriter { public: - static inline int64_t chunk_size() { -#ifdef BE_TEST - return 10; -#else - return config::ann_index_build_chunk_size; -#endif - } static constexpr const char* INDEX_TYPE = "index_type"; static constexpr const char* METRIC_TYPE = "metric_type"; static constexpr const char* DIM = "dim"; @@ -71,16 +63,20 @@ class AnnIndexColumnWriter : public IndexColumnWriter { Status finish() override; private: + Status _build_and_save(Int64 min_train_rows, Int64 effective_min_rows); + +#ifdef BE_TEST + friend class TestAnnIndexColumnWriter; +#endif + // VectorIndex shoule be managed by some cache. // VectorIndex should be weak shared by AnnIndexWriter and VectorIndexReader // This should be a weak_ptr std::shared_ptr _vector_index; - // _float_array is used to buffer the float data before training/adding to vector index - // if we dont do this, the performance(recall) will be very poor when adding small number of vectors one by one - PODArray _float_array; + PODArray _buffered_vectors; + int64_t _total_rows = 0; IndexFileWriter* _index_file_writer; const TabletIndex* _index_meta; std::shared_ptr _dir; - bool _need_save_index = false; }; } // namespace doris::segment_v2 diff --git a/be/src/storage/index/ann/faiss_ann_index.cpp b/be/src/storage/index/ann/faiss_ann_index.cpp index f933f3c683f940..68b06db2b9061a 100644 --- a/be/src/storage/index/ann/faiss_ann_index.cpp +++ b/be/src/storage/index/ann/faiss_ann_index.cpp @@ -501,7 +501,8 @@ Int64 FaissVectorIndex::get_min_train_rows() const { // For IVF indexes, the minimum number of training points should be at least // equal to the number of clusters (nlist). FAISS requires this for k-means clustering. Int64 ivf_min = 0; - if (_params.index_type == FaissBuildParameter::IndexType::IVF) { + if (_params.index_type == FaissBuildParameter::IndexType::IVF || + _params.index_type == FaissBuildParameter::IndexType::IVF_ON_DISK) { ivf_min = _params.ivf_nlist; } diff --git a/be/test/storage/index/ann/ann_index_writer_test.cpp b/be/test/storage/index/ann/ann_index_writer_test.cpp index bb30f9e19794af..20107c90779501 100644 --- a/be/test/storage/index/ann/ann_index_writer_test.cpp +++ b/be/test/storage/index/ann/ann_index_writer_test.cpp @@ -26,10 +26,13 @@ #include #include +#include "common/config.h" +#include "storage/index/ann/faiss_ann_index.h" #include "storage/index/ann/vector_search_utils.h" #include "storage/index/index_file_writer.h" #include "storage/index/inverted/inverted_index_fs_directory.h" #include "storage/tablet/tablet_schema.h" +#include "util/defer_op.h" using namespace doris::vector_search_utils; @@ -60,7 +63,8 @@ class TestAnnIndexColumnWriter : public AnnIndexColumnWriter { : AnnIndexColumnWriter(index_file_writer, index_meta) {} void set_vector_index(std::shared_ptr index) { _vector_index = index; } - void set_need_save_index(bool value) { _need_save_index = value; } + size_t buffered_vector_capacity() const { return _buffered_vectors.capacity(); } + size_t buffered_vector_rows(size_t dim) const { return _buffered_vectors.size() / dim; } }; class AnnIndexWriterTest : public ::testing::Test { @@ -165,6 +169,18 @@ TEST_F(AnnIndexWriterTest, TestInitWithDifferentProperties) { } } +TEST_F(AnnIndexWriterTest, TestInitDoesNotPreallocateBuildBuffer) { + auto writer = std::make_unique(_index_file_writer.get(), + _tablet_index.get()); + + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + EXPECT_EQ(writer->buffered_vector_capacity(), 0); +} + TEST_F(AnnIndexWriterTest, TestAddArrayValuesSuccess) { auto writer = std::make_unique(_index_file_writer.get(), _tablet_index.get()); @@ -415,7 +431,7 @@ TEST_F(AnnIndexWriterTest, TestInvalidMetricType) { EXPECT_THROW(writer->init(), doris::Exception); } -TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSize) { +TEST_F(AnnIndexWriterTest, TestNoTrainIndexAddsAtFinish) { auto mock_index = std::make_shared(); auto writer = std::make_unique(_index_file_writer.get(), _tablet_index.get()); @@ -427,55 +443,133 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSize) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(1) - .WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, train(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); - // CHUNK_SIZE = 10 const size_t dim = 4; - - { - const size_t num_rows = 6; - std::vector vectors = { - 1.0f, 2.0f, 3.0f, 4.0f, // Row 0 - 5.0f, 6.0f, 7.0f, 8.0f, // Row 1 - 9.0f, 10.0f, 11.0f, 12.0f, // Row 2 - 13.0f, 14.0f, 15.0f, 16.0f, // Row 3 - 17.0f, 18.0f, 19.0f, 20.0f, // Row 4 - 21.0f, 22.0f, 23.0f, 24.0f // Row 5 - }; - std::vector offsets = {0, 4, 8, 12, 16, 20, 24}; + constexpr size_t batch_rows = 6; + for (int batch = 0; batch < 2; ++batch) { + std::vector vectors(batch_rows * dim); + for (size_t i = 0; i < vectors.size(); ++i) { + vectors[i] = static_cast(batch * vectors.size() + i); + } + std::vector offsets; + for (size_t row = 0; row <= batch_rows; ++row) { + offsets.push_back(row * dim); + } Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, reinterpret_cast(offsets.data()), - num_rows); + batch_rows); EXPECT_TRUE(status.ok()); } + EXPECT_EQ(writer->buffered_vector_rows(dim), 2 * batch_rows); - { - const size_t num_rows = 6; - std::vector vectors = { - 25.0f, 26.0f, 27.0f, 28.0f, // Row 6 - 29.0f, 30.0f, 31.0f, 32.0f, // Row 7 - 33.0f, 34.0f, 35.0f, 36.0f, // Row 8 - 37.0f, 38.0f, 39.0f, 40.0f, // Row 9 - 41.0f, 42.0f, 43.0f, 44.0f, // Row 10 - 45.0f, 46.0f, 47.0f, 48.0f // Row 11 - }; - std::vector offsets = {0, 4, 8, 12, 16, 20, 24}; + EXPECT_TRUE(testing::Mock::VerifyAndClearExpectations(mock_index.get())); - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + { + testing::InSequence sequence; + EXPECT_CALL(*mock_index, add(12, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); } Status status = writer->finish(); EXPECT_TRUE(status.ok()); + EXPECT_EQ(writer->buffered_vector_rows(dim), 0); +} + +TEST_F(AnnIndexWriterTest, TestNoTrainIndexSkipsWhenRowsLessThanMinSegmentRows) { + const int64_t old_min_segment_rows = config::ann_index_build_min_segment_rows; + config::ann_index_build_min_segment_rows = 5; + doris::Defer restore_config { + [&] { config::ann_index_build_min_segment_rows = old_min_segment_rows; }}; + + auto mock_index = std::make_shared(); + auto writer = std::make_unique(_index_file_writer.get(), + _tablet_index.get()); + + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + writer->set_vector_index(mock_index); + + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(0)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); + + const size_t dim = 4; + const size_t num_rows = 3; + std::vector vectors(num_rows * dim); + for (size_t i = 0; i < vectors.size(); ++i) { + vectors[i] = static_cast(i); + } + std::vector offsets; + for (size_t row = 0; row <= num_rows; ++row) { + offsets.push_back(row * dim); + } + + Status status = + writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(writer->buffered_vector_rows(dim), num_rows); + + status = writer->finish(); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(writer->buffered_vector_rows(dim), 0); +} + +TEST_F(AnnIndexWriterTest, TestTrainRequiredIndexUsesEffectiveMinSegmentRows) { + const int64_t old_min_segment_rows = config::ann_index_build_min_segment_rows; + config::ann_index_build_min_segment_rows = 10; + doris::Defer restore_config { + [&] { config::ann_index_build_min_segment_rows = old_min_segment_rows; }}; + + auto mock_index = std::make_shared(); + auto writer = std::make_unique(_index_file_writer.get(), + _tablet_index.get()); + + auto fs_dir = std::make_shared(); + fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); + EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + + ASSERT_TRUE(writer->init().ok()); + writer->set_vector_index(mock_index); + + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); + + const size_t dim = 4; + const size_t num_rows = 6; + std::vector vectors(num_rows * dim); + for (size_t i = 0; i < vectors.size(); ++i) { + vectors[i] = static_cast(i); + } + std::vector offsets; + for (size_t row = 0; row <= num_rows; ++row) { + offsets.push_back(row * dim); + } + + Status status = + writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(writer->buffered_vector_rows(dim), num_rows); + + status = writer->finish(); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(writer->buffered_vector_rows(dim), 0); } TEST_F(AnnIndexWriterTest, TestCreateFromIndexColumnWriter) { @@ -566,7 +660,7 @@ TEST_F(AnnIndexWriterTest, TestAddArrayValuesIVF) { EXPECT_TRUE(status.ok()); } -TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSizeIVF) { +TEST_F(AnnIndexWriterTest, TestSmallTrainRequiredIndexUsesMemoryBuffer) { auto mock_index = std::make_shared(); auto properties = _properties; properties["index_type"] = "ivf"; @@ -587,133 +681,51 @@ TEST_F(AnnIndexWriterTest, TestAddMoreThanChunkSizeIVF) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(1) - .WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, train(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); - // CHUNK_SIZE = 10 const size_t dim = 4; - - { - const size_t num_rows = 6; - std::vector vectors = { - 1.0f, 2.0f, 3.0f, 4.0f, // Row 0 - 5.0f, 6.0f, 7.0f, 8.0f, // Row 1 - 9.0f, 10.0f, 11.0f, 12.0f, // Row 2 - 13.0f, 14.0f, 15.0f, 16.0f, // Row 3 - 17.0f, 18.0f, 19.0f, 20.0f, // Row 4 - 21.0f, 22.0f, 23.0f, 24.0f // Row 5 - }; - std::vector offsets = {0, 4, 8, 12, 16, 20, 24}; - - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); + const size_t num_rows = 4; + std::vector vectors(num_rows * dim); + for (size_t i = 0; i < vectors.size(); ++i) { + vectors[i] = static_cast(i); } - - { - const size_t num_rows = 6; - std::vector vectors = { - 25.0f, 26.0f, 27.0f, 28.0f, // Row 6 - 29.0f, 30.0f, 31.0f, 32.0f, // Row 7 - 33.0f, 34.0f, 35.0f, 36.0f, // Row 8 - 37.0f, 38.0f, 39.0f, 40.0f, // Row 9 - 41.0f, 42.0f, 43.0f, 44.0f, // Row 10 - 45.0f, 46.0f, 47.0f, 48.0f // Row 11 - }; - std::vector offsets = {0, 4, 8, 12, 16, 20, 24}; - - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); + std::vector offsets; + for (size_t row = 0; row <= num_rows; ++row) { + offsets.push_back(row * dim); } - Status status = writer->finish(); + Status status = + writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows); EXPECT_TRUE(status.ok()); -} - -TEST_F(AnnIndexWriterTest, TestSkipTrainWhenRemainderLessThanNlist) { - auto mock_index = std::make_shared(); - auto properties = _properties; - properties["index_type"] = "ivf"; - properties["nlist"] = "5"; // Set nlist to 5 - properties["quantizer"] = "flat"; - - auto tablet_index = std::make_unique(); - tablet_index->_properties = properties; - tablet_index->_index_id = 1; - - auto writer = std::make_unique(_index_file_writer.get(), - tablet_index.get()); - - auto fs_dir = std::make_shared(); - fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); - EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); + EXPECT_EQ(writer->buffered_vector_rows(dim), num_rows); + EXPECT_TRUE(testing::Mock::VerifyAndClearExpectations(mock_index.get())); - ASSERT_TRUE(writer->init().ok()); - writer->set_vector_index(mock_index); - - // CHUNK_SIZE = 10, nlist = 5 - // Add 12 rows: first 10 will be trained/added in one batch, remaining 2 < 5 - // Since we have trained data before (_need_save_index = true), we should add the remaining 2 rows and save - EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(1) - .WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - - const size_t dim = 4; - - // Add 12 rows total + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); { - const size_t num_rows = 10; - std::vector vectors(10 * 4); - for (size_t i = 0; i < 10 * 4; ++i) { - vectors[i] = static_cast(i); - } - std::vector offsets; - for (size_t i = 0; i <= num_rows; ++i) { - offsets.push_back(i * 4); - } - - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); - } - - // Add 2 more rows - { - const size_t num_rows = 2; - std::vector vectors = { - 40.0f, 41.0f, 42.0f, 43.0f, // Row 10 - 44.0f, 45.0f, 46.0f, 47.0f // Row 11 - }; - std::vector offsets = {0, 4, 8}; - - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); + testing::InSequence sequence; + EXPECT_CALL(*mock_index, train(4, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, add(4, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); } - Status status = writer->finish(); + status = writer->finish(); EXPECT_TRUE(status.ok()); + EXPECT_EQ(writer->buffered_vector_rows(dim), 0); } -TEST_F(AnnIndexWriterTest, TestLargeDataVolumeWithRemainderSkip) { +TEST_F(AnnIndexWriterTest, TestTrainRequiredIndexTrainsOnceAndAddsAllRows) { auto mock_index = std::make_shared(); auto properties = _properties; properties["index_type"] = "ivf"; - properties["nlist"] = "3"; // Set nlist to 3 + properties["nlist"] = "2"; properties["quantizer"] = "flat"; auto tablet_index = std::make_unique(); @@ -730,32 +742,24 @@ TEST_F(AnnIndexWriterTest, TestLargeDataVolumeWithRemainderSkip) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); - // CHUNK_SIZE = 10, nlist = 3 - // Add 23 rows: 2 full chunks of 10, remaining 3 == nlist, so train remaining - EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(3)); - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(2) - .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)) - .Times(2) - .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, train(3, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(3, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); const size_t dim = 4; - // Add 3 batches: 10 + 10 + 3 = 23 rows - for (int batch = 0; batch < 2; ++batch) { - const size_t num_rows = 10; - std::vector vectors(10 * 4); - for (size_t i = 0; i < 10 * 4; ++i) { - vectors[i] = static_cast(batch * 40 + i); - } - std::vector offsets; - for (size_t i = 0; i <= num_rows; ++i) { - offsets.push_back(i * 4); - } + { + const size_t num_rows = 6; + std::vector vectors = { + 1.0f, 2.0f, 3.0f, 4.0f, // Row 0 + 5.0f, 6.0f, 7.0f, 8.0f, // Row 1 + 9.0f, 10.0f, 11.0f, 12.0f, // Row 2 + 13.0f, 14.0f, 15.0f, 16.0f, // Row 3 + 17.0f, 18.0f, 19.0f, 20.0f, // Row 4 + 21.0f, 22.0f, 23.0f, 24.0f // Row 5 + }; + std::vector offsets = {0, 4, 8, 12, 16, 20, 24}; Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, reinterpret_cast(offsets.data()), @@ -763,15 +767,17 @@ TEST_F(AnnIndexWriterTest, TestLargeDataVolumeWithRemainderSkip) { EXPECT_TRUE(status.ok()); } - // Add remaining 3 rows { - const size_t num_rows = 3; + const size_t num_rows = 6; std::vector vectors = { - 80.0f, 81.0f, 82.0f, 83.0f, // Row 20 - 84.0f, 85.0f, 86.0f, 87.0f, // Row 21 - 88.0f, 89.0f, 90.0f, 91.0f // Row 22 + 25.0f, 26.0f, 27.0f, 28.0f, // Row 6 + 29.0f, 30.0f, 31.0f, 32.0f, // Row 7 + 33.0f, 34.0f, 35.0f, 36.0f, // Row 8 + 37.0f, 38.0f, 39.0f, 40.0f, // Row 9 + 41.0f, 42.0f, 43.0f, 44.0f, // Row 10 + 45.0f, 46.0f, 47.0f, 48.0f // Row 11 }; - std::vector offsets = {0, 4, 8, 12}; + std::vector offsets = {0, 4, 8, 12, 16, 20, 24}; Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, reinterpret_cast(offsets.data()), @@ -779,96 +785,29 @@ TEST_F(AnnIndexWriterTest, TestLargeDataVolumeWithRemainderSkip) { EXPECT_TRUE(status.ok()); } - Status status = writer->finish(); - EXPECT_TRUE(status.ok()); -} - -TEST_F(AnnIndexWriterTest, TestLargeDataVolumeSkipRemainder) { - auto mock_index = std::make_shared(); - auto properties = _properties; - properties["index_type"] = "ivf"; - properties["nlist"] = "4"; // Set nlist to 4 - properties["quantizer"] = "flat"; + EXPECT_EQ(writer->buffered_vector_rows(dim), 12); + EXPECT_TRUE(testing::Mock::VerifyAndClearExpectations(mock_index.get())); - auto tablet_index = std::make_unique(); - tablet_index->_properties = properties; - tablet_index->_index_id = 1; - - auto writer = std::make_unique(_index_file_writer.get(), - tablet_index.get()); - - auto fs_dir = std::make_shared(); - fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); - EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); - - ASSERT_TRUE(writer->init().ok()); - writer->set_vector_index(mock_index); - - // CHUNK_SIZE = 10, nlist = 4 - // Add 22 rows: 2 full chunks of 10, remaining 2 < 4 - // Since we have trained data before (_need_save_index = true), we should add the remaining 2 rows and save - EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(4)); - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(2) - .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)) - .Times(2) - .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - - const size_t dim = 4; - - // Add 2 batches of 10 rows - for (int batch = 0; batch < 2; ++batch) { - const size_t num_rows = 10; - std::vector vectors(10 * 4); - for (size_t i = 0; i < 10 * 4; ++i) { - vectors[i] = static_cast(batch * 40 + i); - } - std::vector offsets; - for (size_t i = 0; i <= num_rows; ++i) { - offsets.push_back(i * 4); - } - - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); - } - - // Add remaining 2 rows + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); { - const size_t num_rows = 2; - std::vector vectors = { - 80.0f, 81.0f, 82.0f, 83.0f, // Row 20 - 84.0f, 85.0f, 86.0f, 87.0f // Row 21 - }; - std::vector offsets = {0, 4, 8}; - - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); + testing::InSequence sequence; + EXPECT_CALL(*mock_index, train(12, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, add(12, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); } Status status = writer->finish(); EXPECT_TRUE(status.ok()); } -TEST_F(AnnIndexWriterTest, TestSkipIndexWhenTotalRowsLessThanNlist) { +TEST_F(AnnIndexWriterTest, TestTrainRequiredIndexTrainsWithAllBufferedRows) { auto mock_index = std::make_shared(); - auto properties = _properties; - properties["index_type"] = "ivf"; - properties["nlist"] = "5"; // Set nlist to 5 - properties["quantizer"] = "flat"; - - auto tablet_index = std::make_unique(); - tablet_index->_properties = properties; - tablet_index->_index_id = 1; - auto writer = std::make_unique(_index_file_writer.get(), - tablet_index.get()); + _tablet_index.get()); auto fs_dir = std::make_shared(); fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); @@ -876,95 +815,56 @@ TEST_F(AnnIndexWriterTest, TestSkipIndexWhenTotalRowsLessThanNlist) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); - writer->set_need_save_index(false); // No previous training, so should skip entirely - // Add only 3 rows, which is less than nlist (5) - // Since no data was trained before (_need_save_index = false), we should skip index building entirely - // No train, add, or save should be called - EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); EXPECT_CALL(*mock_index, save(testing::_)).Times(0); const size_t dim = 4; - - // Add 3 rows - { - const size_t num_rows = 3; - std::vector vectors = { - 1.0f, 2.0f, 3.0f, 4.0f, // Row 0 - 5.0f, 6.0f, 7.0f, 8.0f, // Row 1 - 9.0f, 10.0f, 11.0f, 12.0f // Row 2 - }; - std::vector offsets = {0, 4, 8, 12}; - - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); + const size_t num_rows = 20; + std::vector vectors(num_rows * dim); + for (size_t row = 0; row < num_rows; ++row) { + for (size_t col = 0; col < dim; ++col) { + vectors[row * dim + col] = static_cast(row); + } + } + std::vector offsets; + for (size_t row = 0; row <= num_rows; ++row) { + offsets.push_back(row * dim); } - Status status = writer->finish(); + Status status = + writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows); EXPECT_TRUE(status.ok()); -} - -TEST_F(AnnIndexWriterTest, TestPQMinTrainRows) { - // Test writer behavior under a large mocked min_train_rows threshold. - - auto mock_index = std::make_shared(); - auto writer = std::make_unique(_index_file_writer.get(), - _tablet_index.get()); - - auto fs_dir = std::make_shared(); - fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); - EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); - - ASSERT_TRUE(writer->init().ok()); - writer->set_vector_index(mock_index); - - // Set up expectations: mock a very large min_train_rows threshold. - // Since we only provide 1000 vectors, which is less than 131072, training will happen in batches - // but finish() will skip saving since remaining data is insufficient - EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(131072)); - // 1000 vectors will be processed in 100 batches of 10 vectors each - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(100) - .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)) - .Times(100) - .WillRepeatedly(testing::Return(Status::OK())); - // Since we have trained data in batches, the index will be saved even though total data is insufficient - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - - const size_t dim = 4; + EXPECT_EQ(writer->buffered_vector_rows(dim), num_rows); + EXPECT_TRUE(testing::Mock::VerifyAndClearExpectations(mock_index.get())); - // Add only 1000 rows, which is less than the required 131072 + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(2)); { - const size_t num_rows = 1000; - std::vector vectors(num_rows * dim); - for (size_t i = 0; i < num_rows * dim; ++i) { - vectors[i] = static_cast(i % 100); - } - std::vector offsets; - for (size_t i = 0; i <= num_rows; ++i) { - offsets.push_back(i * dim); - } - - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); + testing::InSequence sequence; + EXPECT_CALL(*mock_index, train(20, testing::_)) + .Times(1) + .WillOnce(testing::Invoke([&](Int64 n, const float* vec) { + EXPECT_EQ(n, num_rows); + for (size_t row = 0; row < static_cast(n); ++row) { + const auto row_id = static_cast(vec[row * dim]); + EXPECT_EQ(row_id, row); + } + return Status::OK(); + })); + EXPECT_CALL(*mock_index, add(20, testing::_)) + .Times(1) + .WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); } - // Finish should skip index building due to insufficient training data - Status status = writer->finish(); + status = writer->finish(); EXPECT_TRUE(status.ok()); } -TEST_F(AnnIndexWriterTest, TestSQMinTrainRows) { - // Test that SQ quantizer requires sufficient training data - // SQ requires at least nlist * 2 = 10 * 2 = 20 training vectors - +TEST_F(AnnIndexWriterTest, TestSkipIndexWhenTotalRowsLessThanMinTrainRows) { auto mock_index = std::make_shared(); auto writer = std::make_unique(_index_file_writer.get(), _tablet_index.get()); @@ -976,94 +876,49 @@ TEST_F(AnnIndexWriterTest, TestSQMinTrainRows) { ASSERT_TRUE(writer->init().ok()); writer->set_vector_index(mock_index); - // Set up expectations: SQ should require at least 20 training vectors - // Since we only provide 15 vectors, training will happen in batches but finish() will skip saving - EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(20)); - // 15 vectors will be processed in 1 batch of 10 vectors and remaining 5 vectors - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(1) - .WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(5, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - // Since we have trained data, the index will be saved even though total data is insufficient - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); const size_t dim = 4; - - // Add only 15 rows, which is less than the required 20 - { - const size_t num_rows = 15; - std::vector vectors(num_rows * dim); - for (size_t i = 0; i < num_rows * dim; ++i) { - vectors[i] = static_cast(i % 100); - } - std::vector offsets; - for (size_t i = 0; i <= num_rows; ++i) { - offsets.push_back(i * dim); - } - - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); + const size_t num_rows = 3; + std::vector vectors(num_rows * dim); + for (size_t i = 0; i < vectors.size(); ++i) { + vectors[i] = static_cast(i); + } + std::vector offsets; + for (size_t row = 0; row <= num_rows; ++row) { + offsets.push_back(row * dim); } - // Finish should skip index building due to insufficient training data - Status status = writer->finish(); + Status status = + writer->add_array_values(sizeof(float), vectors.data(), nullptr, + reinterpret_cast(offsets.data()), num_rows); EXPECT_TRUE(status.ok()); -} - -TEST_F(AnnIndexWriterTest, TestPQWithSufficientData) { - // Test that PQ works when sufficient training data is provided - - auto mock_index = std::make_shared(); - auto writer = std::make_unique(_index_file_writer.get(), - _tablet_index.get()); - - auto fs_dir = std::make_shared(); - fs_dir->init(doris::io::global_local_filesystem(), "./ut_dir/tmp_vector_search", nullptr); - EXPECT_CALL(*_index_file_writer, open(testing::_)).WillOnce(testing::Return(fs_dir)); - - ASSERT_TRUE(writer->init().ok()); - writer->set_vector_index(mock_index); - - // Mock min_train_rows to 131072 and provide exactly that amount. - EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(131072)); - // Since we provide exactly 131072 vectors, they will be trained and added in chunks - // Each chunk is 10 vectors, so we expect 13107 train calls and 13107 add calls for full chunks - EXPECT_CALL(*mock_index, train(10, testing::_)) - .Times(13107) - .WillRepeatedly(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, add(10, testing::_)) - .Times(13107) - .WillRepeatedly(testing::Return(Status::OK())); - // The remaining 2 vectors will be added without training since min_train_rows > 2 - EXPECT_CALL(*mock_index, add(2, testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - EXPECT_CALL(*mock_index, save(testing::_)).Times(1).WillOnce(testing::Return(Status::OK())); - const size_t dim = 4; + EXPECT_EQ(writer->buffered_vector_rows(dim), num_rows); + EXPECT_TRUE(testing::Mock::VerifyAndClearExpectations(mock_index.get())); + EXPECT_CALL(*mock_index, get_min_train_rows()).WillRepeatedly(testing::Return(5)); + EXPECT_CALL(*mock_index, train(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, add(testing::_, testing::_)).Times(0); + EXPECT_CALL(*mock_index, save(testing::_)).Times(0); - // Add exactly 131072 rows - { - const size_t num_rows = 131072; - std::vector vectors(num_rows * dim); - for (size_t i = 0; i < num_rows * dim; ++i) { - vectors[i] = static_cast(i % 100); - } - std::vector offsets; - for (size_t i = 0; i <= num_rows; ++i) { - offsets.push_back(i * dim); - } + status = writer->finish(); + EXPECT_TRUE(status.ok()); + EXPECT_EQ(writer->buffered_vector_rows(dim), 0); +} - Status status = writer->add_array_values(sizeof(float), vectors.data(), nullptr, - reinterpret_cast(offsets.data()), - num_rows); - EXPECT_TRUE(status.ok()); - } +TEST_F(AnnIndexWriterTest, TestIVFOnDiskMinTrainRows) { + FaissVectorIndex index; + FaissBuildParameter params; + params.index_type = FaissBuildParameter::IndexType::IVF_ON_DISK; + params.quantizer = FaissBuildParameter::Quantizer::FLAT; + params.dim = 4; + params.ivf_nlist = 7; - // Finish should successfully build the index - Status status = writer->finish(); - EXPECT_TRUE(status.ok()); + index.build(params); + EXPECT_EQ(index.get_min_train_rows(), 7); } } // namespace doris::segment_v2 diff --git a/regression-test/data/ann_index_p0/ivf_on_disk_index_test.out b/regression-test/data/ann_index_p0/ivf_on_disk_index_test.out index bcd94f4ac52341..239c104321c9a5 100644 --- a/regression-test/data/ann_index_p0/ivf_on_disk_index_test.out +++ b/regression-test/data/ann_index_p0/ivf_on_disk_index_test.out @@ -11,6 +11,10 @@ 1 2 +-- !sql_l2_insufficient_train_rows -- +1 +2 + -- !sql -- 1 [1, 2, 3] 2 [0.5, 2.1, 2.9] diff --git a/regression-test/data/ann_index_p0/ivf_pq_full_buffer_train_recall.out b/regression-test/data/ann_index_p0/ivf_pq_full_buffer_train_recall.out new file mode 100644 index 00000000000000..6c3458e27355a5 --- /dev/null +++ b/regression-test/data/ann_index_p0/ivf_pq_full_buffer_train_recall.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !target_in_top20 -- +1 + diff --git a/regression-test/data/ann_index_p0/ivf_pq_recall.out b/regression-test/data/ann_index_p0/ivf_pq_recall.out new file mode 100644 index 00000000000000..14aab16eedc5fa --- /dev/null +++ b/regression-test/data/ann_index_p0/ivf_pq_recall.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !row_count -- +800 + +-- !first_cluster_recall -- +20 + +-- !second_cluster_recall -- +20 diff --git a/regression-test/suites/ann_index_p0/ann_index_build_min_segment_rows.groovy b/regression-test/suites/ann_index_p0/ann_index_build_min_segment_rows.groovy new file mode 100644 index 00000000000000..01393444830471 --- /dev/null +++ b/regression-test/suites/ann_index_p0/ann_index_build_min_segment_rows.groovy @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("ann_index_build_min_segment_rows", "nonConcurrent") { + sql "unset variable all;" + sql "set enable_common_expr_pushdown=true;" + sql "set experimental_enable_virtual_slot_for_cse=true;" + sql "set enable_no_need_read_data_opt=true;" + sql "set parallel_pipeline_task_num=1;" + sql "set enable_sql_cache=false;" + sql "set enable_condition_cache=false;" + + setBeConfigTemporary([ann_index_build_min_segment_rows: 100]) { + sql "drop table if exists ann_index_build_min_segment_rows" + sql """ + create table ann_index_build_min_segment_rows ( + id int not null, + embedding array not null, + index idx_embedding(`embedding`) using ann properties( + "index_type"="hnsw", + "metric_type"="l2_distance", + "dim"="3" + ) + ) duplicate key(id) + distributed by hash(id) buckets 1 + properties("replication_num"="1"); + """ + + sql """ + insert into ann_index_build_min_segment_rows values + (1, [0.0, 0.0, 0.0]), + (2, [0.1, 0.0, 0.0]), + (3, [0.2, 0.0, 0.0]); + """ + + try { + GetDebugPoint().enableDebugPointForAllBEs( + "segment_iterator._read_columns_by_index", [column_name: "embedding"]) + test { + sql """ + select id + from ann_index_build_min_segment_rows + where l2_distance_approximate(embedding, [0.0, 0.0, 0.0]) < 1.0 + order by id; + """ + exception "does not need to read data" + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs("segment_iterator._read_columns_by_index") + } + } +} diff --git a/regression-test/suites/ann_index_p0/ivf_on_disk_index_test.groovy b/regression-test/suites/ann_index_p0/ivf_on_disk_index_test.groovy index a9eed51d7a4125..63fab34d072d3a 100644 --- a/regression-test/suites/ann_index_p0/ivf_on_disk_index_test.groovy +++ b/regression-test/suites/ann_index_p0/ivf_on_disk_index_test.groovy @@ -68,7 +68,7 @@ suite ("ivf_on_disk_index_test") { exception """nlist of ann index must be specified for ivf/ivf_on_disk type""" } - // ========== Error: not enough training points ========== + // Not enough training points: should not throw exception anymore, just skip index building. sql """ CREATE TABLE tbl_ivf_on_disk_l2 ( id INT NOT NULL, @@ -84,14 +84,12 @@ suite ("ivf_on_disk_index_test") { DISTRIBUTED BY HASH(id) BUCKETS 1 PROPERTIES ("replication_num" = "1"); """ - test { - sql """ - INSERT INTO tbl_ivf_on_disk_l2 VALUES - (1, [1.0, 2.0, 3.0]), - (2, [0.5, 2.1, 2.9]); - """ - exception """exception occurred during training""" - } + sql """ + INSERT INTO tbl_ivf_on_disk_l2 VALUES + (1, [1.0, 2.0, 3.0]), + (2, [0.5, 2.1, 2.9]); + """ + qt_sql_l2_insufficient_train_rows "select id from tbl_ivf_on_disk_l2 order by l2_distance_approximate(embedding, [1.0,2.0,3.0]) limit 2;" // ========== IVF_ON_DISK with inner product ========== sql "drop table if exists tbl_ivf_on_disk_ip" diff --git a/regression-test/suites/ann_index_p0/ivf_pq_full_buffer_train_recall.groovy b/regression-test/suites/ann_index_p0/ivf_pq_full_buffer_train_recall.groovy new file mode 100644 index 00000000000000..20cccfb28b6e25 --- /dev/null +++ b/regression-test/suites/ann_index_p0/ivf_pq_full_buffer_train_recall.groovy @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("ivf_pq_full_buffer_train_recall", "nonConcurrent") { + sql "set enable_common_expr_pushdown=true;" + sql "set enable_ann_index_result_cache=false;" + sql "set ivf_nprobe=8;" + + sql "drop table if exists tbl_ivf_pq_full_buffer_train_recall" + sql """ + CREATE TABLE tbl_ivf_pq_full_buffer_train_recall ( + id INT NOT NULL, + embedding ARRAY NOT NULL, + INDEX idx_emb (`embedding`) USING ANN PROPERTIES( + "index_type"="ivf", + "metric_type"="l2_distance", + "nlist"="8", + "dim"="4", + "quantizer"="pq", + "pq_m"="2", + "pq_nbits"="1" + ) + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ("replication_num" = "1"); + """ + + def insertData = [] + for (int i = 1; i <= 400; i++) { + if (i == 250) { + insertData.add("(${i}, [0.0, 0.0, 0.0, 0.0])") + } else if (i <= 200) { + insertData.add("(${i}, [1000.0, ${i}.0, ${(i % 17)}.0, ${(i % 19)}.0])") + } else { + insertData.add( + "(${i}, [${(i - 250) / 50.0}, ${(250 - i) / 50.0}, " + + "${(i % 7 - 3) / 10.0}, ${(i % 5 - 2) / 10.0}])") + } + } + sql "INSERT INTO tbl_ivf_pq_full_buffer_train_recall VALUES ${insertData.join(', ')};" + sql "sync" + + qt_target_in_top20 """ + select count(*) + from ( + select id + from tbl_ivf_pq_full_buffer_train_recall + order by l2_distance_approximate(embedding, [0.0, 0.0, 0.0, 0.0]), id + limit 20 + ) t + where id = 250; + """ +} diff --git a/regression-test/suites/ann_index_p0/ivf_pq_recall.groovy b/regression-test/suites/ann_index_p0/ivf_pq_recall.groovy new file mode 100644 index 00000000000000..c1c6a7b7651d03 --- /dev/null +++ b/regression-test/suites/ann_index_p0/ivf_pq_recall.groovy @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("ivf_pq_recall", "nonConcurrent") { + sql "set enable_common_expr_pushdown=true;" + sql "set enable_ann_index_result_cache=false;" + sql "set ivf_nprobe=8;" + + sql "drop table if exists ivf_pq_recall" + sql """ + create table ivf_pq_recall ( + id int not null, + embedding array not null, + index idx_embedding (`embedding`) using ann properties( + "index_type" = "ivf", + "metric_type" = "l2_distance", + "nlist" = "8", + "dim" = "4", + "quantizer" = "pq", + "pq_m" = "2", + "pq_nbits" = "2" + ) + ) engine=olap + duplicate key(id) + distributed by hash(id) buckets 1 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + def formatFloat = { double value -> + String.format(java.util.Locale.ROOT, "%.3f", value) + } + def vector = { double x -> + "[${formatFloat(x)}, ${formatFloat(x * 2)}, ${formatFloat(x * 3)}, ${formatFloat(x * 4)}]" + } + def rows = [] + for (int i = 1; i <= 400; i++) { + double x = (i - 1) / 1000.0 + rows.add("(${i}, ${vector(x)})") + } + for (int i = 401; i <= 800; i++) { + double x = 1000.0 + (i - 401) / 1000.0 + rows.add("(${i}, ${vector(x)})") + } + sql "insert into ivf_pq_recall values ${rows.join(',')};" + sql "sync" + + qt_row_count "select count(*) from ivf_pq_recall;" + + qt_first_cluster_recall """ + select count(*) from ( + select id + from ivf_pq_recall + order by l2_distance_approximate(embedding, [0.0, 0.0, 0.0, 0.0]) + limit 20 + ) t + where id between 1 and 400; + """ + + qt_second_cluster_recall """ + select count(*) from ( + select id + from ivf_pq_recall + order by l2_distance_approximate(embedding, [1000.0, 2000.0, 3000.0, 4000.0]) + limit 20 + ) t + where id between 401 and 800; + """ +}