Skip to content
Open
10 changes: 4 additions & 6 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1756,12 +1756,10 @@ DEFINE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction, "true");
DEFINE_String(ann_index_ivf_list_cache_limit, "70%");
// Stale sweep time for ANN index IVF list cache in seconds. 3600s is 1 hour.
DEFINE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec, "3600");

// Chunk size for ANN/vector index building per training/adding batch
// 1M By default.
DEFINE_mInt64(ann_index_build_chunk_size, "1000000");
DEFINE_Validator(ann_index_build_chunk_size,
[](const int64_t config) -> bool { return config > 0; });
// Minimum segment rows required to persist an ANN index. 0 keeps the default behavior.
DEFINE_mInt64(ann_index_build_min_segment_rows, "0");
DEFINE_Validator(ann_index_build_min_segment_rows,
[](const int64_t config) -> bool { return config >= 0; });

DEFINE_mBool(enable_wal_tde, "false");

Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1809,8 +1809,8 @@ DECLARE_mInt32(max_segment_partial_column_cache_size);
DECLARE_String(ann_index_ivf_list_cache_limit);
// Stale sweep time for ANN index IVF list cache in seconds.
DECLARE_mInt32(ann_index_ivf_list_cache_stale_sweep_time_sec);
// Chunk size for ANN/vector index building per training/adding batch
DECLARE_mInt64(ann_index_build_chunk_size);
// Minimum segment rows required to persist an ANN index.
DECLARE_mInt64(ann_index_build_min_segment_rows);

DECLARE_mBool(enable_prefill_output_dbm_agg_cache_after_compaction);
DECLARE_mBool(enable_prefill_all_dbm_agg_cache_after_compaction);
Expand Down
119 changes: 46 additions & 73 deletions be/src/storage/index/ann/ann_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

#include "storage/index/ann/ann_index_writer.h"

#include <algorithm>
#include <cstddef>
#include <memory>
#include <string>

#include "common/cast_set.h"
#include "common/config.h"
#include "storage/index/ann/faiss_ann_index.h"
#include "storage/index/inverted/inverted_index_fs_directory.h"

Expand All @@ -39,7 +41,7 @@ AnnIndexColumnWriter::AnnIndexColumnWriter(IndexFileWriter* index_file_writer,
const TabletIndex* index_meta)
: _index_file_writer(index_file_writer), _index_meta(index_meta) {}

AnnIndexColumnWriter::~AnnIndexColumnWriter() {}
AnnIndexColumnWriter::~AnnIndexColumnWriter() = default;

Status AnnIndexColumnWriter::init() {
Result<std::shared_ptr<DorisFSDirectory>> compound_dir = _index_file_writer->open(_index_meta);
Expand Down Expand Up @@ -77,17 +79,17 @@ Status AnnIndexColumnWriter::init() {
index_type, build_parameter.dim, metric_type, build_parameter.max_degree,
build_parameter.ef_construction, quantizer);

size_t block_size = AnnIndexColumnWriter::chunk_size() * build_parameter.dim;
_float_array.reserve(block_size);

return Status::OK();
}

Status AnnIndexColumnWriter::add_values(const std::string fn, const void* values, size_t count) {
return Status::OK();
}

void AnnIndexColumnWriter::close_on_error() {}
void AnnIndexColumnWriter::close_on_error() {
PODArray<float> empty_buffered_vectors;
_buffered_vectors.swap(empty_buffered_vectors);
}

Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* value_ptr,
const uint8_t* null_map, const uint8_t* offsets_ptr,
Expand All @@ -109,26 +111,10 @@ Status AnnIndexColumnWriter::add_array_values(size_t field_size, const void* val

const float* p = reinterpret_cast<const float*>(value_ptr);

const size_t full_elements = AnnIndexColumnWriter::chunk_size() * dim;
size_t remaining_elements = num_rows * dim;
size_t src_offset = 0;
while (remaining_elements > 0) {
size_t available_space = full_elements - _float_array.size();
size_t elements_to_add = std::min(remaining_elements, available_space);

_float_array.insert(_float_array.end(), p + src_offset, p + src_offset + elements_to_add);
src_offset += elements_to_add;
remaining_elements -= elements_to_add;

if (_float_array.size() == full_elements) {
RETURN_IF_ERROR(
_vector_index->train(AnnIndexColumnWriter::chunk_size(), _float_array.data()));
RETURN_IF_ERROR(
_vector_index->add(AnnIndexColumnWriter::chunk_size(), _float_array.data()));
_float_array.clear();
_need_save_index = true;
}
}
// The offsets check above guarantees every array row matches the ANN index dimension.
DCHECK(p != nullptr);
_buffered_vectors.insert(_buffered_vectors.end(), p, p + num_rows * dim);
_total_rows += cast_set<int64_t>(num_rows);

return Status::OK();
}
Expand All @@ -146,54 +132,41 @@ int64_t AnnIndexColumnWriter::size() const {
}

Status AnnIndexColumnWriter::finish() {
Int64 min_train_rows = _vector_index->get_min_train_rows();

// Check if we have enough rows to train the index
// train/add the remaining data
if (_float_array.empty()) {
if (_need_save_index) {
return _vector_index->save(_dir.get());
} else {
// No data was added at all. This can happen if the segment has 0 rows
// or all rows were filtered out. We need to delete the directory entry
// to avoid writing an empty/invalid index file.
LOG_INFO("No data to train/add for ANN index. Skipping index building.");
return _index_file_writer->delete_index(_index_meta);
}
} else {
DCHECK(_float_array.size() % _vector_index->get_dimension() == 0);

Int64 num_rows = _float_array.size() / _vector_index->get_dimension();

if (num_rows >= min_train_rows) {
RETURN_IF_ERROR(_vector_index->train(num_rows, _float_array.data()));
RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data()));
_float_array.clear();
return _vector_index->save(_dir.get());
} else {
// It happens to have not enough data to train.
// If we have data to add before, we still need to save the index.
if (_need_save_index) {
// For IVF indexes, adding remaining vectors without training is acceptable
// because the quantizer was already trained on previous batches. These vectors
// are simply added to the nearest clusters without retraining.
RETURN_IF_ERROR(_vector_index->add(num_rows, _float_array.data()));
_float_array.clear();
return _vector_index->save(_dir.get());
} else {
// Not enough data to train and no data added before.
// Means this is a very small segment, we can skip the index building.
// We need to delete the directory entry from index_file_writer to avoid
// writing an empty/invalid index file which causes "IndexInput read past EOF" error.
LOG_INFO(
"Remaining data size {} is less than minimum {} rows required for ANN "
"index "
"training. Skipping index building for this segment.",
num_rows, min_train_rows);
_float_array.clear();
return _index_file_writer->delete_index(_index_meta);
}
}
if (_total_rows == 0) {
LOG_INFO("No data to train/add for ANN index. Skipping index building.");
return _index_file_writer->delete_index(_index_meta);
}

const Int64 min_train_rows = _vector_index->get_min_train_rows();
const Int64 effective_min_rows =
std::max(min_train_rows, cast_set<Int64>(config::ann_index_build_min_segment_rows));
if (_total_rows < effective_min_rows) {
LOG_INFO(
"Total data size {} is less than minimum {} rows required for ANN index build. "
"Skipping index building for this segment.",
_total_rows, effective_min_rows);
PODArray<float> empty_buffered_vectors;
_buffered_vectors.swap(empty_buffered_vectors);
return _index_file_writer->delete_index(_index_meta);
}

return _build_and_save(min_train_rows, effective_min_rows);
}

Status AnnIndexColumnWriter::_build_and_save(Int64 min_train_rows, Int64 effective_min_rows) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个函数,为什么要有min_train_rows 这个参数?

const size_t dim = _vector_index->get_dimension();
DCHECK(_buffered_vectors.size() % dim == 0);
const Int64 train_rows = cast_set<Int64>(_buffered_vectors.size() / dim);
DORIS_CHECK(train_rows == _total_rows);
DORIS_CHECK(train_rows >= effective_min_rows);
if (min_train_rows > 0) {
RETURN_IF_ERROR(_vector_index->train(train_rows, _buffered_vectors.data()));
}
RETURN_IF_ERROR(_vector_index->add(train_rows, _buffered_vectors.data()));
// PODArray::clear() keeps the allocated capacity. Swap with an empty array so the
// full-segment build buffer is released before saving the index.
PODArray<float> empty_buffered_vectors;
_buffered_vectors.swap(empty_buffered_vectors);
return _vector_index->save(_dir.get());
}
} // namespace doris::segment_v2
20 changes: 8 additions & 12 deletions be/src/storage/index/ann/ann_index_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <roaring/roaring.hh>
#include <string>

#include "common/config.h"
#include "core/pod_array.h"
#include "storage/index/ann/ann_index.h"
#include "storage/index/index_file_writer.h"
Expand All @@ -38,13 +37,6 @@
namespace doris::segment_v2 {
class AnnIndexColumnWriter : public IndexColumnWriter {
public:
static inline int64_t chunk_size() {
#ifdef BE_TEST
return 10;
#else
return config::ann_index_build_chunk_size;
#endif
}
static constexpr const char* INDEX_TYPE = "index_type";
static constexpr const char* METRIC_TYPE = "metric_type";
static constexpr const char* DIM = "dim";
Expand All @@ -71,16 +63,20 @@ class AnnIndexColumnWriter : public IndexColumnWriter {
Status finish() override;

private:
Status _build_and_save(Int64 min_train_rows, Int64 effective_min_rows);

#ifdef BE_TEST
friend class TestAnnIndexColumnWriter;
#endif

// VectorIndex shoule be managed by some cache.
// VectorIndex should be weak shared by AnnIndexWriter and VectorIndexReader
// This should be a weak_ptr
std::shared_ptr<VectorIndex> _vector_index;
// _float_array is used to buffer the float data before training/adding to vector index
// if we dont do this, the performance(recall) will be very poor when adding small number of vectors one by one
PODArray<float> _float_array;
PODArray<float> _buffered_vectors;
int64_t _total_rows = 0;
IndexFileWriter* _index_file_writer;
const TabletIndex* _index_meta;
std::shared_ptr<DorisFSDirectory> _dir;
bool _need_save_index = false;
};
} // namespace doris::segment_v2
3 changes: 2 additions & 1 deletion be/src/storage/index/ann/faiss_ann_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,8 @@ Int64 FaissVectorIndex::get_min_train_rows() const {
// For IVF indexes, the minimum number of training points should be at least
// equal to the number of clusters (nlist). FAISS requires this for k-means clustering.
Int64 ivf_min = 0;
if (_params.index_type == FaissBuildParameter::IndexType::IVF) {
if (_params.index_type == FaissBuildParameter::IndexType::IVF ||
_params.index_type == FaissBuildParameter::IndexType::IVF_ON_DISK) {
ivf_min = _params.ivf_nlist;
}

Expand Down
Loading
Loading