Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "4.1.18"
version = "4.1.19"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand All @@ -26,6 +26,7 @@ class HomeObjectConan(ConanFile):
"coverage": ['True', 'False'],
"sanitize": ['True', 'False'],
}

default_options = {
'shared': False,
'fPIC': True,
Expand Down
11 changes: 5 additions & 6 deletions src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority
const auto pg_id = EXvchunk->m_pg_id.value();
m_hs_home_object->gc_manager()->incr_pg_pending_gc_task(pg_id);

if (!m_hs_home_object->can_chunks_in_pg_be_gc(pg_id)) {
if (!m_hs_home_object->is_pg_alive(pg_id)) {
LOGDEBUGMOD(gcmgr, "chunk_id={} belongs to pg {}, which is not eligible for gc at this moment!",
move_from_chunk, pg_id)
m_hs_home_object->gc_manager()->decr_pg_pending_gc_task(pg_id);
Expand Down Expand Up @@ -504,7 +504,7 @@ void GCManager::pdev_gc_actor::handle_recovered_gc_task(
}

// we have no gc_task_guard for recovered gc task, so we need to do this manually to make sure the gc task can be
// marked as completed and the pg can be marked as available for new gc task
// marked as completed
on_gc_task_completed(priority, pg_id, move_from_chunk, move_to_chunk, vchunk_id, true, 0);

GCLOGD(RECOVERD_GC_TASK_ID, pg_id, NO_SHARD_ID,
Expand Down Expand Up @@ -797,9 +797,8 @@ bool GCManager::pdev_gc_actor::copy_valid_data(
move_from_chunk);
}

// check if all the pbas in the valid_blob_indexes are in move_from_chunk, if not, it means the
// shard is being modified during gc, we can not guarantee the data consistency, so we fail this gc
// task and let it be retried later.
// check if all the pbas in the valid_blob_indexes are in move_from_chunk, if not, we cancel this task and retry
// later.
for (const auto& [blob, v] : valid_blob_indexes) {
auto pba = v.pbas();
if (pba.chunk_num() != move_from_chunk) {
Expand Down Expand Up @@ -1100,7 +1099,7 @@ bool GCManager::pdev_gc_actor::purge_reserved_chunk(chunk_id_t chunk, const uint
RELEASE_ASSERT(!vchunk->m_pg_id.has_value(),
"chunk_id={} is expected to be a reserved chunk, and not belong to a pg", chunk);
RELEASE_ASSERT(vchunk->m_state == ChunkState::GC,
"chunk_id={} is a reserved chunk, expected to have a GC state, but actuall state is {} ", chunk,
"chunk_id={} is a reserved chunk, expected to have a GC state, but the actual state is {} ", chunk,
vchunk->m_state);

// Clear all rreqs on the reserved chunk BEFORE reset() resets its allocator.
Expand Down
4 changes: 3 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,8 @@ class HSHomeObject : public HomeObjectImpl {
*/
bool pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine = false);

void destroy_pg_resource(pg_id_t pg_id);

bool pause_pg_state_machine(pg_id_t pg_id);

bool resume_pg_state_machine(pg_id_t pg_id);
Expand Down Expand Up @@ -977,7 +979,7 @@ class HSHomeObject : public HomeObjectImpl {
* @param pg_id The ID of the PG whose shards are to be destroyed.
* @return True if the chunks in the PG can be garbage collected, false otherwise.
*/
bool can_chunks_in_pg_be_gc(pg_id_t pg_id) const;
bool is_pg_alive(pg_id_t pg_id) const;

bool pg_exists(pg_id_t pg_id) const;

Expand Down
42 changes: 19 additions & 23 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -688,29 +688,32 @@ std::optional< pg_id_t > HSHomeObject::get_pg_id_with_group_id(group_id_t group_

void HSHomeObject::_destroy_pg(pg_id_t pg_id) { pg_destroy(pg_id); }

void HSHomeObject::destroy_pg_resource(pg_id_t pg_id) {
destroy_shards(pg_id);
destroy_hs_resources(pg_id);
destroy_pg_index_table(pg_id);
destroy_pg_superblk(pg_id);

// return pg chunks to dev heap
// which must be done after destroying pg super blk to avoid multiple pg use same chunks
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
RELEASE_ASSERT(res, "Failed to return pg={} chunks to dev_heap", pg_id);
LOGI("resource of pg={} is destroyed", pg_id);
}

bool HSHomeObject::pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine) {
if (need_to_pause_pg_state_machine && !pause_pg_state_machine(pg_id)) {
LOGI("Failed to pause pg state machine, pg_id={}", pg_id);
return false;
}

LOGI("Destroying pg={}", pg_id);
mark_pg_destroyed(pg_id);

// we have the assumption that after pg is marked as destroyed, it will not be marked as alive again.
// TODO:: if this assumption is broken, we need to handle it.
gc_mgr_->drain_pg_pending_gc_task(pg_id);

destroy_shards(pg_id);
destroy_hs_resources(pg_id);
destroy_pg_index_table(pg_id);
destroy_pg_superblk(pg_id);

// return pg chunks to dev heap
// which must be done after destroying pg super blk to avoid multiple pg use same chunks
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
RELEASE_ASSERT(res, "Failed to return pg={} chunks to dev_heap", pg_id);

LOGI("pg={} is destroyed", pg_id);
destroy_pg_resource(pg_id);
return true;
}

Expand Down Expand Up @@ -800,7 +803,7 @@ void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
LOGD("pg={} is marked as destroyed", pg_id);
}

bool HSHomeObject::can_chunks_in_pg_be_gc(pg_id_t pg_id) const {
bool HSHomeObject::is_pg_alive(pg_id_t pg_id) const {
auto lg = std::scoped_lock(_pg_lock);
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
if (hs_pg == nullptr) {
Expand Down Expand Up @@ -949,8 +952,7 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;
} else {
RELEASE_ASSERT(hs_pg->pg_sb_->state == PGState::DESTROYED, "IndexTable should be recovered before PG");
hs_pg->index_table_ = nullptr;
RELEASE_ASSERT(hs_pg->pg_sb_->state == PGState::DESTROYED, "IndexTable should be recovered for alive PG");
LOGI("Index table not found for destroyed pg={}, index_table_uuid={}", pg_id, uuid_str);
}

Expand Down Expand Up @@ -1262,17 +1264,11 @@ uint32_t HSHomeObject::get_pg_tombstone_blob_count(pg_id_t pg_id) const {
}

void HSHomeObject::refresh_pg_statistics(pg_id_t pg_id) {
RELEASE_ASSERT(is_pg_alive(pg_id), "pg={} should be alive", pg_id);
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
RELEASE_ASSERT(hs_pg, "Failed to get pg={} for statistics refresh", pg_id);
auto pg_index_table = hs_pg->index_table_;
if (!pg_index_table) {
if (hs_pg->pg_sb_->state == PGState::DESTROYED) {
LOGI("pg={} is destroyed, skip statistics refresh", pg_id);
} else {
RELEASE_ASSERT(false, "index table is not found for pg={} and not in PGState::DESTROYED state", pg_id);
}
return;
}
RELEASE_ASSERT(pg_index_table, "pg is alive, index table should be found for pg={}", pg_id);

// Step 1: Scan index table to count active and tombstone blobs in one pass
uint64_t active_count = 0;
Expand Down
43 changes: 41 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
set_snapshot_context(context); // Update the snapshot context in case apply_snapshot is not called
auto hs_pg = home_object_->get_hs_pg(m_snp_rcv_handler->get_context_pg_id());
hs_pg->pg_state_.clear_state(PGStateMask::BASELINE_RESYNC);
// we only reset this if destroying pg happens in BR case. for other cases (on_destroy and _exit_pg),
Comment thread
xiaoxichen marked this conversation as resolved.
// since this replica will leave the PG and no later logs will be received, no need to reset this.
reset_no_space_left_error_info();
repl_dev()->reset_latch_lsn();
return;
}

Expand Down Expand Up @@ -499,7 +503,7 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
if (home_object_->pg_exists(pg_data->pg_id())) {
LOGI("pg already exists, clean pg resources before snapshot, pg={} {}", pg_data->pg_id(), log_suffix);
// Need to pause state machine before destroying the PG, if fail, let raft retry.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments out of date, as well as we dont have a branch that returns false as of now.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let`s remove this out-of-date comments after addressing other comments for this PR

if (!home_object_->pg_destroy(pg_data->pg_id(), true /* pause state machine */)) {
if (!home_object_->pg_destroy(pg_data->pg_id())) {
LOGE("failed to destroy existing pg, let raft retry, pg={} {}", pg_data->pg_id(), log_suffix);
return;
}
Expand Down Expand Up @@ -1030,7 +1034,42 @@ void ReplicationStateMachine::on_log_replay_done(const homestore::group_id_t& gr
const auto pg_id = pg_id_opt.value();
RELEASE_ASSERT(home_object_->pg_exists(pg_id), "pg={} should exist, but not! fatal error!", pg_id);

const auto& shards_in_pg = (const_cast< HSHomeObject::HS_PG* >(home_object_->_get_hs_pg_unlocked(pg_id)))->shards_;
const auto hs_pg = (const_cast< HSHomeObject::HS_PG* >(home_object_->get_hs_pg(pg_id)));
RELEASE_ASSERT(hs_pg, "Failed to get pg={} when log replay done", pg_id);
if (hs_pg->pg_sb_->state == PGState::DESTROYED) {
// if we find a pg with a state of destroyed in recovery path, we can make sure that pg_destroy was called and
// crash occured before it was completed.

// pg_destroy will be called in the following scenarios:
// 1 baseline resync: when the first snapshot message is received, if the pg already exists, we will call
// pg_destroy to clean up the stale pg resources before resync.

// 2 exit_pg: when processing exit_pg request, we will call pg_destroy to destroy the pg resources.

// 3 RaftReplDev::leave() is called and thus RaftReplDev::permanent_destroy() is called from nuraft_mesg. and
// RaftReplDev::leave() will be called in the following scenarios:
// a. destroy_repl_dev: commit log entry of journal_type_t::HS_CTRL_DESTROY
// b. removed from raft group: the move_out member in replace member.

// for baseline resync, no need to redo destroy pg here since the first snapshot message will be received again
// and trigger pg_destroy again if the pg already exists. but for other cases, we need to redo destory pg to
// clean up the stale pg resources since no message will be received to trigger pg_destroy again. so, we call it
// here for all the above cases to make sure the stale pg resources are cleaned up.

// there is also a concern that in baseline resync case, if some resource is destroyed in pg_destroy but not all
// the resources are destroyed before crash, then when recovery, log replay will hit those destroyed resources
// and cause error. for example, we have a put_blob log after cp_lsn and before dc_lsn, and the pg_index_table
// is destory but before pg_super_blk is destroyed, crash happens. when recovery, log replay will hit the
// put_blob log and try to write to the index table, but since the index table is destroyed, it will cause
// error. but actually, this is not a problem. since before we starting pg_destroy in baseline resync,
// m_rd_sb->last_snapshot_lsn will be persisted upto the snapshot.get_last_log_idx(). then all the log less than
// or equal to m_rd_sb->last_snapshot_lsn will not be replayed or committed after recovery. so, the concern is
Comment on lines +1064 to +1066

@JacksonYao287 JacksonYao287 Jun 18, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiaoxichen in baseline resync case , before we start destroying pg, we will m_rd_sb->last_snapshot_lsn upto snapshot.get_last_log_idx(). then raft_repl_dev#need_skip_processing will help us skipping replaying all the logs in recovery path(so that we will not hit those destroyed resources , like pg_index_table, etc.). so we don`t need wait for all the appended log to be committed in pg_destroy for BR case

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so basically you reverted your changes

@xiaoxichen xiaoxichen Jun 18, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont get the point of this comments, you said for br no need to redo destroy PG but we do it anyway.

The concern of log replay vs destroy is not valid here ... if we reach here the log replay had been done...If we want to record the thinking why waiting for log commit is not needed, better rephrase this paragraph and move it to destroy_pg

Similar for L1043-1052, those lines explains the situations that a PG can be destroy, better to move to destroy_pg rather than here, especially we use same action in recovery path , for all source

// not valid. pls refer to raft_repl_dev#need_skip_processing for more details.
home_object_->destroy_pg_resource(pg_id);
Comment thread
xiaoxichen marked this conversation as resolved.
return;
}

const auto& shards_in_pg = hs_pg->shards_;
auto chunk_selector = home_object_->chunk_selector();

for (const auto& shard_iter : shards_in_pg) {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class HomeObjectFixture : public ::testing::Test {

HSHomeObject::_hs_chunk_size = SISL_OPTIONS["chunk_size"].as< uint64_t >() * Mi;
_obj_inst = std::dynamic_pointer_cast< HSHomeObject >(g_helper->build_new_homeobject());

// Used to export metrics, it should be called after init_homeobject
if (SISL_OPTIONS["enable_http"].as< bool >()) { g_helper->app->start_http_server(); }
if (!g_helper->is_current_testcase_restarted()) {
Expand Down
38 changes: 38 additions & 0 deletions src/lib/homestore_backend/tests/hs_pg_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,4 +517,42 @@ TEST_F(HomeObjectFixture, PGRefreshStatisticsTest) {
EXPECT_EQ(pg_stats_restart.num_tombstone_objects, num_tombstones)
<< "Tombstone blob count should be preserved after restart";
EXPECT_EQ(pg_stats_restart.used_bytes, used_bytes_after) << "Used bytes should be preserved after restart";
}

TEST_F(HomeObjectFixture, IsPgAliveTest) {
LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num());

pg_id_t pg_id{1};
create_pg(pg_id);

EXPECT_TRUE(_obj_inst->is_pg_alive(pg_id));
EXPECT_FALSE(_obj_inst->is_pg_alive(999));

_obj_inst->mark_pg_destroyed(pg_id);
EXPECT_FALSE(_obj_inst->is_pg_alive(pg_id));
}

// Simulate a crash that occurs after pg is marked destroyed but before destroy_pg_resource completes.
// On restart, on_log_replay_done should detect the DESTROYED state and call destroy_pg_resource to
// finish cleaning up stale resources.
TEST_F(HomeObjectFixture, DestroyedPgRecoveryTest) {
LOGINFO("HomeObject replica={} setup completed", g_helper->replica_num());

pg_id_t pg_id{1};
create_pg(pg_id);
auto shard_info = create_shard(pg_id, 64 * Mi, "shard meta");
auto shard_id = shard_info.id;

auto hs_pg = _obj_inst->get_hs_pg(pg_id);
ASSERT_NE(hs_pg, nullptr);
auto index_table_uuid_str = boost::uuids::to_string(hs_pg->index_table_->uuid());

// Simulate crash: persist DESTROYED state without cleaning up resources
_obj_inst->mark_pg_destroyed(pg_id);
EXPECT_FALSE(_obj_inst->is_pg_alive(pg_id));

// on_log_replay_done detects DESTROYED state and calls destroy_pg_resource
restart();

verify_pg_destroy(pg_id, index_table_uuid_str, {shard_id});
}
7 changes: 3 additions & 4 deletions src/lib/homestore_backend/tests/hs_shard_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ TEST_F(HomeObjectFixture, ShardManagerRecovery) {
create_pg(pg_id);

// create one shard;
auto shard_info = create_shard(pg_id, Mi, "shard meta");;
auto shard_info = create_shard(pg_id, Mi, "shard meta");
auto shard_id = shard_info.id;
EXPECT_EQ(ShardInfo::State::OPEN, shard_info.state);
EXPECT_EQ(Mi, shard_info.total_capacity_bytes);
Expand Down Expand Up @@ -162,7 +162,7 @@ TEST_F(HomeObjectFixture, ShardManagerRecovery) {
EXPECT_EQ(1, pg_result->shard_sequence_num_);

// re-create new shards on this pg works too even homeobject is restarted twice.
auto new_shard_info = create_shard(pg_id, Mi, "shard meta");;
auto new_shard_info = create_shard(pg_id, Mi, "shard meta");
EXPECT_NE(shard_id, new_shard_info.id);

EXPECT_EQ(ShardInfo::State::OPEN, new_shard_info.state);
Expand All @@ -177,7 +177,7 @@ TEST_F(HomeObjectFixture, SealedShardRecovery) {
create_pg(pg_id);

// create one shard and seal it.
auto shard_info = create_shard(pg_id, Mi, "shard meta");;
auto shard_info = create_shard(pg_id, Mi, "shard meta");
auto shard_id = shard_info.id;
shard_info = seal_shard(shard_id);
EXPECT_EQ(ShardInfo::State::SEALED, shard_info.state);
Expand Down Expand Up @@ -282,7 +282,6 @@ TEST_F(HomeObjectFixture, CreateShardOnDiskLostMemeber) {
ASSERT_TRUE(s.hasError()) << "degraded pg on error member should return seal shard fail, pg_id "
<< degrade_pg_id << "shard_id " << pg_shard_id_map[degrade_pg_id]
<< " replica number " << g_helper->replica_num();

tid = generateRandomTraceId();
s = _obj_inst->shard_manager()->create_shard(degrade_pg_id, 64 * Mi, "shard meta", tid).get();
ASSERT_TRUE(s.hasError()) << "degraded pg on error member should return create shard fail, pg_id "
Expand Down
Loading