Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@ resources/*.xml
*.o
.vscode
cpp/pixels-retina/third_party/

# AI tools
.codex
.claude/
.cursor/
.continue/
.aider*
.ai/
.notes/
CLAUDE.local.md
AGENTS.md.local
3 changes: 2 additions & 1 deletion cpp/pixels-retina/include/RGVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class RGVisibility : public pixels::RetinaBase<RGVisibility<CAPACITY>> {
const std::vector<uint64_t>* initialBitmap = nullptr);
~RGVisibility() override;

void deleteRGRecord(uint32_t rowId, uint64_t timestamp);
void deleteRGRecord(uint32_t rowId, uint64_t timestamp,
ReplayMode replayMode = ReplayMode::NORMAL);
uint64_t* getRGVisibilityBitmap(uint64_t timestamp);

std::vector<uint64_t> collectRGGarbage(uint64_t timestamp);
Expand Down
4 changes: 2 additions & 2 deletions cpp/pixels-retina/include/RGVisibilityJni.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion cpp/pixels-retina/include/TileVisibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ inline uint64_t extractTimestamp(uint64_t raw) {
return (raw & 0x0000FFFFFFFFFFFFULL);
}

/**
* Controls how DELETE replay interacts with the compacted base bitmap.
*
* NORMAL is the live append path: the caller provides a current delete
* timestamp and the record is appended to the chain. VERSIONED is used when
* replay may race with READY readers; historical deletes publish a new
* VersionedData with a folded baseBitmap. EXCLUSIVE is used only while recovery
* blocks readers and GC; historical deletes may update baseBitmap in place, but
* concurrent recovery writers still need tile-level synchronization.
*/
enum class ReplayMode : uint8_t {
NORMAL = 0,
VERSIONED = 1,
EXCLUSIVE = 2
};

struct DeleteIndexBlock : public pixels::RetinaBase<DeleteIndexBlock> {
static constexpr size_t BLOCK_CAPACITY = 8;
uint64_t items[BLOCK_CAPACITY] = {0};
Expand Down Expand Up @@ -96,7 +112,7 @@ class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {
// timestamp defaults to 0; bitmap defaults to all-zeros.
explicit TileVisibility(uint64_t timestamp = 0, const uint64_t* bitmap = nullptr);
~TileVisibility() override;
void deleteTileRecord(uint16_t rowId, uint64_t ts);
void deleteTileRecord(uint16_t rowId, uint64_t ts, ReplayMode replayMode = ReplayMode::NORMAL);
void getTileVisibilityBitmap(uint64_t ts, uint64_t* outBitmap) const;
void collectTileGarbage(uint64_t ts, uint64_t* gcSnapshotBitmap);
void exportChainItemsAfter(uint32_t tileId, uint64_t safeGcTs,
Expand All @@ -109,6 +125,14 @@ class TileVisibility : public pixels::RetinaBase<TileVisibility<CAPACITY>> {

void reclaimRetiredVersions();

void appendDeleteChain(uint16_t rowId, uint64_t ts);

// VERSIONED: replay with possible readers; historical deletes use COW fold.
void deleteTileRecordVersioned(uint16_t rowId, uint64_t ts);

// EXCLUSIVE: recovery replay without readers; historical deletes fold in place.
void deleteTileRecordExclusive(uint16_t rowId, uint64_t ts);

std::atomic<VersionedData<CAPACITY>*> currentVersion;
std::atomic<DeleteIndexBlock *> tail;
std::atomic<size_t> tailUsed;
Expand Down
5 changes: 3 additions & 2 deletions cpp/pixels-retina/lib/RGVisibility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ TileVisibility<CAPACITY>* RGVisibility<CAPACITY>::getTileVisibility(uint32_t row
}

template<size_t CAPACITY>
void RGVisibility<CAPACITY>::deleteRGRecord(uint32_t rowId, uint64_t timestamp) {
void RGVisibility<CAPACITY>::deleteRGRecord(uint32_t rowId, uint64_t timestamp,
ReplayMode replayMode) {
TileVisibility<CAPACITY>* tileVisibility = getTileVisibility(rowId);
tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp);
tileVisibility->deleteTileRecord(rowId % VISIBILITY_RECORD_CAPACITY, timestamp, replayMode);
}

template<size_t CAPACITY>
Expand Down
17 changes: 14 additions & 3 deletions cpp/pixels-retina/lib/RGVisibilityJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@
#include "RGVisibility.h"
#include <stdexcept>

namespace {
ReplayMode toReplayMode(jint mode) {
switch (mode) {
case 0: return ReplayMode::NORMAL;
case 1: return ReplayMode::VERSIONED;
case 2: return ReplayMode::EXCLUSIVE;
default: throw std::invalid_argument("unknown ReplayMode");
}
}
}

/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: createNativeObject
Expand Down Expand Up @@ -72,13 +83,13 @@ JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_destroyNative
/*
* Class: io_pixelsdb_pixels_retina_RGVisibility
* Method: deleteRecord
* Signature: (JJJ)V
* Signature: (IJJI)V
*/
JNIEXPORT void JNICALL Java_io_pixelsdb_pixels_retina_RGVisibility_deleteRecord
(JNIEnv* env, jobject, jint rowId, jlong timestamp, jlong handle) {
(JNIEnv* env, jobject, jint rowId, jlong timestamp, jlong handle, jint replayMode) {
try {
auto* rgVisibility = reinterpret_cast<RGVisibilityInstance*>(handle);
rgVisibility->deleteRGRecord(rowId, timestamp);
rgVisibility->deleteRGRecord(rowId, timestamp, toReplayMode(replayMode));
} catch (const std::exception& e) {
env->ThrowNew(env->FindClass("java/lang/RuntimeException"), e.what());
}
Expand Down
66 changes: 65 additions & 1 deletion cpp/pixels-retina/lib/TileVisibility.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,71 @@ TileVisibility<CAPACITY>::~TileVisibility() {
}

template<size_t CAPACITY>
void TileVisibility<CAPACITY>::deleteTileRecord(uint16_t rowId, uint64_t ts) {
void TileVisibility<CAPACITY>::deleteTileRecord(uint16_t rowId, uint64_t ts,
ReplayMode replayMode) {
switch (replayMode) {
case ReplayMode::NORMAL:
appendDeleteChain(rowId, ts);
return;
case ReplayMode::VERSIONED:
deleteTileRecordVersioned(rowId, ts);
return;
case ReplayMode::EXCLUSIVE:
deleteTileRecordExclusive(rowId, ts);
return;
default:
throw std::invalid_argument("unknown ReplayMode");
}
}

template<size_t CAPACITY>
void TileVisibility<CAPACITY>::deleteTileRecordVersioned(uint16_t rowId, uint64_t ts) {
// READY backlog replay can race with getTileVisibilityBitmap readers. Fold
// historical deletes by publishing a new VersionedData instead of mutating
// baseBitmap observed by an existing reader.
// Keep ts=0 out of this path because item=0 is the chain-slot sentinel.
while (ts > 0) {
VersionedData<CAPACITY>* cur = currentVersion.load(std::memory_order_acquire);
if (ts > cur->baseTimestamp) {
break;
}
if ((cur->baseBitmap[rowId / 64] & (1ULL << (rowId % 64))) != 0) {
return;
}
uint64_t newBaseBitmap[NUM_WORDS];
std::memcpy(newBaseBitmap, cur->baseBitmap, NUM_WORDS * sizeof(uint64_t));
SET_BITMAP_BIT(newBaseBitmap, rowId);
VersionedData<CAPACITY>* newVer =
new VersionedData<CAPACITY>(cur->baseTimestamp, newBaseBitmap, cur->head);
if (currentVersion.compare_exchange_strong(cur, newVer, std::memory_order_acq_rel)) {
pendingRetire.store(cur, std::memory_order_release);
return;
}
delete newVer;
}

appendDeleteChain(rowId, ts);
}

template<size_t CAPACITY>
void TileVisibility<CAPACITY>::deleteTileRecordExclusive(uint16_t rowId, uint64_t ts) {
// RECOVERING replay blocks readers and GC, so historical deletes can fold
// into baseBitmap in place. Atomic OR prevents lost updates when concurrent
// recovery writers touch the same bitmap word.
VersionedData<CAPACITY>* cur = currentVersion.load(std::memory_order_acquire);
if (ts > 0 && ts <= cur->baseTimestamp) {
uint64_t mask = 1ULL << (rowId % 64);
__atomic_fetch_or(&cur->baseBitmap[rowId / 64], mask, __ATOMIC_RELAXED);
return;
}

appendDeleteChain(rowId, ts);
}

template<size_t CAPACITY>
void TileVisibility<CAPACITY>::appendDeleteChain(uint16_t rowId, uint64_t ts) {
// Normal live apply assumes a current timestamp and records the delete in
// the append-only chain, leaving baseBitmap untouched for the hot path.
uint64_t item = makeDeleteIndex(rowId, ts);
while (true) {
DeleteIndexBlock *curTail = tail.load(std::memory_order_acquire);
Expand Down
72 changes: 72 additions & 0 deletions cpp/pixels-retina/test/RGVisibilityTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,50 @@ class RGVisibilityTest : public ::testing::Test {
RGVisibilityInstance* rgVisibility;
};

static bool rgBitSet(const uint64_t* bitmap, uint32_t rowId) {
return ((bitmap[rowId / 64] >> (rowId % 64)) & 1ULL) != 0;
}

static void runConcurrentRGDeletes(RGVisibilityInstance* visibility,
ReplayMode mode,
uint64_t ts,
int rowCount = 64,
int threadCount = 8) {
ASSERT_EQ(rowCount % threadCount, 0);
std::atomic<bool> start{false};
std::vector<std::thread> threads;
int rowsPerThread = rowCount / threadCount;

for (int t = 0; t < threadCount; t++) {
threads.emplace_back([&, t]() {
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
for (int i = 0; i < rowsPerThread; i++) {
uint32_t rowId = static_cast<uint32_t>(t * rowsPerThread + i);
visibility->deleteRGRecord(rowId, ts, mode);
}
});
}

start.store(true, std::memory_order_release);
for (auto& thread : threads) {
thread.join();
}
}

static void expectRGRows(RGVisibilityInstance* visibility,
uint64_t queryTs,
int rowCount,
bool expectedSet) {
uint64_t* bitmap = visibility->getRGVisibilityBitmap(queryTs);
for (int row = 0; row < rowCount; row++) {
EXPECT_EQ(expectedSet, rgBitSet(bitmap, static_cast<uint32_t>(row)))
<< "row=" << row << " queryTs=" << queryTs;
}
delete[] bitmap;
}

TEST_F(RGVisibilityTest, BasicDeleteAndVisibility) {
uint64_t timestamp1 = 100;
uint64_t timestamp2 = 200;
Expand All @@ -67,6 +111,34 @@ TEST_F(RGVisibilityTest, BasicDeleteAndVisibility) {
delete[] bitmap2;
}

TEST_F(RGVisibilityTest, ConcurrentNormalModeAppendsDeleteChain) {
constexpr uint64_t baseTs = 100;
RGVisibilityInstance visibility(ROW_COUNT, baseTs, nullptr);

runConcurrentRGDeletes(&visibility, ReplayMode::NORMAL, baseTs + 1);

expectRGRows(&visibility, baseTs, 64, false);
expectRGRows(&visibility, baseTs + 1, 64, true);
}

TEST_F(RGVisibilityTest, ConcurrentVersionedModeFoldsWithCow) {
constexpr uint64_t baseTs = 100;
RGVisibilityInstance visibility(ROW_COUNT, baseTs, nullptr);

runConcurrentRGDeletes(&visibility, ReplayMode::VERSIONED, baseTs - 1);

expectRGRows(&visibility, baseTs, 64, true);
}

TEST_F(RGVisibilityTest, ConcurrentExclusiveModeFoldsWithAtomicOr) {
constexpr uint64_t baseTs = 100;
RGVisibilityInstance visibility(ROW_COUNT, baseTs, nullptr);

runConcurrentRGDeletes(&visibility, ReplayMode::EXCLUSIVE, baseTs - 1);

expectRGRows(&visibility, baseTs, 64, true);
}

TEST_F(RGVisibilityTest, MultiThread) {
struct DeleteRecord {
uint64_t timestamp;
Expand Down
Loading
Loading