diff --git a/examples/splinterdb_custom_ipv4_addr_sortcmp_example.c b/examples/splinterdb_custom_ipv4_addr_sortcmp_example.c index 6a5113f0..933fb35c 100644 --- a/examples/splinterdb_custom_ipv4_addr_sortcmp_example.c +++ b/examples/splinterdb_custom_ipv4_addr_sortcmp_example.c @@ -317,7 +317,7 @@ do_inserts(splinterdb *spl_handle, kv_pair *kv_pairs, int num_kv_pairs) slice_create(strlen(kv_pairs[ictr].kv_key), kv_pairs[ictr].kv_key); slice value = slice_create(WWW_PING_SIZE(&kv_pairs[ictr].kv_val), (const char *)&kv_pairs[ictr].kv_val); - int rc = splinterdb_insert(spl_handle, key, value); + int rc = splinterdb_insert(spl_handle, key, value, NULL); if (rc) { printf( "Insert of key '%s' failed; rc=%d\n", kv_pairs[ictr].kv_key, rc); diff --git a/examples/splinterdb_intro_example.c b/examples/splinterdb_intro_example.c index c75cdec8..0fe611cc 100644 --- a/examples/splinterdb_intro_example.c +++ b/examples/splinterdb_intro_example.c @@ -53,21 +53,21 @@ main() slice key = slice_create((size_t)strlen(fruit), fruit); slice value = slice_create((size_t)strlen(descr), descr); - rc = splinterdb_insert(spl_handle, key, value); + rc = splinterdb_insert(spl_handle, key, value, NULL); printf("Inserted key '%s'\n", fruit); fruit = "Orange"; descr = "Is a good source of vitamin-C."; key = slice_create((size_t)strlen(fruit), fruit); value = slice_create((size_t)strlen(descr), descr); - rc = splinterdb_insert(spl_handle, key, value); + rc = splinterdb_insert(spl_handle, key, value, NULL); printf("Inserted key '%s'\n", fruit); fruit = "Mango"; descr = "Mango is the king of fruits."; key = slice_create((size_t)strlen(fruit), fruit); value = slice_create((size_t)strlen(descr), descr); - rc = splinterdb_insert(spl_handle, key, value); + rc = splinterdb_insert(spl_handle, key, value, NULL); printf("Inserted key '%s'\n", fruit); // Retrieve a key-value pair. diff --git a/examples/splinterdb_iterators_example.c b/examples/splinterdb_iterators_example.c index 1462c510..630d2e24 100644 --- a/examples/splinterdb_iterators_example.c +++ b/examples/splinterdb_iterators_example.c @@ -115,7 +115,7 @@ do_inserts(splinterdb *spl_handle, kv_pair *kv_pairs, int num_kv_pairs) slice_create(strlen(kv_pairs[ictr].kv_key), kv_pairs[ictr].kv_key); slice value = slice_create(strlen(kv_pairs[ictr].kv_val), kv_pairs[ictr].kv_val); - int rc = splinterdb_insert(spl_handle, key, value); + int rc = splinterdb_insert(spl_handle, key, value, NULL); if (rc) { printf( "Insert for key='%s' failed, rc=%d\n", kv_pairs[ictr].kv_key, rc); diff --git a/examples/splinterdb_wide_values_example.c b/examples/splinterdb_wide_values_example.c index fa96e76d..765aca95 100644 --- a/examples/splinterdb_wide_values_example.c +++ b/examples/splinterdb_wide_values_example.c @@ -62,7 +62,7 @@ main() slice key = slice_create(strlen(key_buf), key_buf); slice value = slice_create(val_len, val_buf); - rc = splinterdb_insert(spl_handle, key, value); + rc = splinterdb_insert(spl_handle, key, value, NULL); if (rc) { break; } diff --git a/include/splinterdb/splinterdb.h b/include/splinterdb/splinterdb.h index f6fcb830..2fd1f18c 100644 --- a/include/splinterdb/splinterdb.h +++ b/include/splinterdb/splinterdb.h @@ -143,6 +143,10 @@ typedef struct splinterdb_config { uint64 queue_scale_percent; } splinterdb_config; +/////////////////////////////////////// +// Lifecycle +/////////////////////////////////////// + // Opaque handle to an opened instance of SplinterDB typedef struct splinterdb splinterdb; @@ -174,20 +178,10 @@ splinterdb_open(const splinterdb_config *cfg, splinterdb **kvs); void splinterdb_close(splinterdb **kvs); -// Insert a key and value. Overwrites any previous value associated with the -// key. -int -splinterdb_insert(splinterdb *kvsb, slice key, slice value); - -// Delete a given key and any associated value / messages -int -splinterdb_delete(splinterdb *kvsb, slice key); - -// Update the value associated with key. -int -splinterdb_update(splinterdb *kvsb, slice key, slice delta); +//////////////////////////////////// // Lookups +//////////////////////////////////// typedef uint64 splinterdb_lookup_flags; @@ -201,7 +195,7 @@ typedef uint64 splinterdb_lookup_flags; // // Once initialized, a splinterdb_lookup_result may be used for multiple // lookups. It is not safe to use from multiple threads. -typedef struct { +typedef struct splinterdb_lookup_result { char opaque[SPLINTERDB_LOOKUP_BUFSIZE]; } __attribute__((__aligned__(8))) splinterdb_lookup_result; @@ -267,6 +261,36 @@ splinterdb_lookup(splinterdb *kvs, // IN ); +///////////////////////////////// +// Updates +///////////////////////////////// + + +// Insert a key and value. Overwrites any previous value associated with the +// key. +// +// If old_result is non-NULL, it must have been initialized using +// splinterdb_lookup_result_init() and will receive the value previously +// associated with key. +int +splinterdb_insert(splinterdb *kvsb, + slice key, + slice value, + splinterdb_lookup_result *old_result); + +// Delete a given key and any associated value / messages +int +splinterdb_delete(splinterdb *kvsb, + slice key, + splinterdb_lookup_result *old_result); + +// Update the value associated with key. +int +splinterdb_update(splinterdb *kvsb, + slice key, + slice delta, + splinterdb_lookup_result *old_result); + /* Iterator API (range query) diff --git a/src/btree.c b/src/btree.c index 9cbefa74..2031de43 100644 --- a/src/btree.c +++ b/src/btree.c @@ -572,6 +572,21 @@ spec_message(const leaf_incorporate_spec *spec) } } +static inline platform_status +btree_record_old_result(const btree_config *cfg, + const btree_hdr *hdr, + const leaf_incorporate_spec *spec, + lookup_result *old_result) +{ + if (old_result == NULL || spec->old_entry_state != ENTRY_STILL_EXISTS) { + return STATUS_OK; + } + + leaf_entry *entry = btree_get_leaf_entry(cfg, hdr, spec->idx); + return lookup_result_update( + old_result, leaf_entry_key(entry), leaf_entry_message(entry)); +} + platform_status btree_create_leaf_incorporate_spec(const btree_config *cfg, platform_heap_id heap_id, @@ -1276,7 +1291,7 @@ btree_dec_ref(cache *cc, * * This function violates our locking rules. See comment at top of file. */ -static inline int +static inline platform_status btree_split_child_leaf(cache *cc, const btree_config *cfg, mini_allocator *mini, @@ -1285,6 +1300,7 @@ btree_split_child_leaf(cache *cc, uint64 index_of_child_in_parent, btree_node *child, leaf_incorporate_spec *spec, + lookup_result *old_result, uint64 *generation) // OUT { btree_node right_child; @@ -1329,7 +1345,7 @@ btree_split_child_leaf(cache *cc, btree_node_full_unlock(cc, cfg, parent); btree_node_unclaim(cc, cfg, child); btree_node_unget(cc, cfg, child); - return -1; + return STATUS_BUSY; } platform_sleep_ns(child_next_wait); child_next_wait = @@ -1343,6 +1359,18 @@ btree_split_child_leaf(cache *cc, btree_node_lock(cc, cfg, child); /* p: write, c: write, rc: write, cn: write if exists */ + platform_status rc = + btree_record_old_result(cfg, child->hdr, spec, old_result); + if (!SUCCESS(rc)) { + if (child_next.addr != 0) { + btree_node_full_unlock(cc, cfg, &child_next); + } + btree_node_full_unlock(cc, cfg, &right_child); + btree_node_full_unlock(cc, cfg, parent); + btree_node_full_unlock(cc, cfg, child); + return rc; + } + { /* limit the scope of pivot_key, since subsequent mutations of the nodes * may invalidate the memory it points to. @@ -1381,7 +1409,7 @@ btree_split_child_leaf(cache *cc, btree_node_full_unlock(cc, cfg, child); /* p: unlocked, c: unlocked, rc: unlocked, cn: unlocked */ - return 0; + return STATUS_OK; } /* @@ -1393,7 +1421,7 @@ btree_split_child_leaf(cache *cc, * - all nodes fully unlocked * - insertion is complete */ -static inline int +static inline platform_status btree_defragment_or_split_child_leaf(cache *cc, const btree_config *cfg, mini_allocator *mini, @@ -1402,6 +1430,7 @@ btree_defragment_or_split_child_leaf(cache *cc, uint64 index_of_child_in_parent, btree_node *child, leaf_incorporate_spec *spec, + lookup_result *old_result, uint64 *generation) // OUT { uint64 nentries = btree_num_entries(child->hdr); @@ -1425,6 +1454,12 @@ btree_defragment_or_split_child_leaf(cache *cc, btree_node_unclaim(cc, cfg, parent); btree_node_unget(cc, cfg, parent); btree_node_lock(cc, cfg, child); + platform_status rc = + btree_record_old_result(cfg, child->hdr, spec, old_result); + if (!SUCCESS(rc)) { + btree_node_full_unlock(cc, cfg, child); + return rc; + } btree_defragment_leaf(cfg, scratch, child->hdr, spec); bool32 incorporated = btree_try_perform_leaf_incorporate_spec( cfg, child->hdr, spec, generation); @@ -1439,10 +1474,11 @@ btree_defragment_or_split_child_leaf(cache *cc, index_of_child_in_parent, child, spec, + old_result, generation); } - return 0; + return STATUS_OK; } /* @@ -1698,8 +1734,8 @@ btree_insert(cache *cc, // IN mini_allocator *mini, // IN key tuple_key, // IN message msg, // IN - uint64 *generation, // OUT - bool32 *was_unique) // OUT + lookup_result *old_result, // IN/OUT + uint64 *generation) // OUT { platform_status rc; leaf_incorporate_spec spec; @@ -1721,6 +1757,10 @@ btree_insert(cache *cc, // IN log_trace_key(tuple_key, "btree_insert"); + if (old_result != NULL) { + lookup_result_reset(old_result); + } + start_over: btree_node_get(cc, cfg, &root_node, PAGE_TYPE_MEMTABLE); uint64 leaf_wait = 1; @@ -1738,10 +1778,16 @@ btree_insert(cache *cc, // IN goto start_over; } btree_node_lock(cc, cfg, &root_node); - if (btree_try_perform_leaf_incorporate_spec( - cfg, root_node.hdr, &spec, generation)) - { - *was_unique = spec.old_entry_state == ENTRY_DID_NOT_EXIST; + if (btree_can_perform_leaf_incorporate_spec(cfg, root_node.hdr, &spec)) { + rc = btree_record_old_result(cfg, root_node.hdr, &spec, old_result); + if (!SUCCESS(rc)) { + btree_node_full_unlock(cc, cfg, &root_node); + destroy_leaf_incorporate_spec(&spec); + return rc; + } + bool32 incorporated = btree_try_perform_leaf_incorporate_spec( + cfg, root_node.hdr, &spec, generation); + platform_assert(incorporated); btree_node_full_unlock(cc, cfg, &root_node); destroy_leaf_incorporate_spec(&spec); return STATUS_OK; @@ -1946,12 +1992,17 @@ btree_insert(cache *cc, // IN goto start_over; } btree_node_lock(cc, cfg, &child_node); + rc = btree_record_old_result(cfg, child_node.hdr, &spec, old_result); + if (!SUCCESS(rc)) { + btree_node_full_unlock(cc, cfg, &child_node); + destroy_leaf_incorporate_spec(&spec); + return rc; + } bool32 incorporated = btree_try_perform_leaf_incorporate_spec( cfg, child_node.hdr, &spec, generation); platform_assert(incorporated); btree_node_full_unlock(cc, cfg, &child_node); destroy_leaf_incorporate_spec(&spec); - *was_unique = spec.old_entry_state == ENTRY_DID_NOT_EXIST; return STATUS_OK; } @@ -1984,20 +2035,23 @@ btree_insert(cache *cc, // IN return rc; } } - int result = btree_defragment_or_split_child_leaf(cc, - cfg, - mini, - scratch, - &parent_node, - child_idx, - &child_node, - &spec, - generation); + rc = btree_defragment_or_split_child_leaf(cc, + cfg, + mini, + scratch, + &parent_node, + child_idx, + &child_node, + &spec, + old_result, + generation); destroy_leaf_incorporate_spec(&spec); - if (result < 0) { + if (STATUS_IS_EQ(rc, STATUS_BUSY)) { goto start_over; } - *was_unique = spec.old_entry_state == ENTRY_DID_NOT_EXIST; + if (!SUCCESS(rc)) { + return rc; + } return STATUS_OK; } diff --git a/src/btree.h b/src/btree.h index 2879cc58..35d8ce93 100644 --- a/src/btree.h +++ b/src/btree.h @@ -183,8 +183,8 @@ btree_insert(cache *cc, // IN mini_allocator *mini, // IN key tuple_key, // IN message data, // IN - uint64 *generation, // OUT - bool32 *was_unique); // OUT + lookup_result *old_result, // IN/OUT + uint64 *generation); // OUT uint64 btree_create(cache *cc, diff --git a/src/core.c b/src/core.c index 543e2602..4f6edd72 100644 --- a/src/core.c +++ b/src/core.c @@ -355,47 +355,57 @@ core_memtable_iterator_deinit(core_handle *spl, * * Returns: * success if succeeded - * locked if the current memtable is full - * lock_acquired if the current memtable is full and this thread is - * responsible for flushing it. + * locked if successful */ static platform_status -core_memtable_insert(core_handle *spl, key tuple_key, message msg) +core_memtable_insert(core_handle *spl, + key tuple_key, + message msg, + lookup_result *old_result, + uint64 *generation) { - uint64 generation; - platform_status rc = - memtable_maybe_rotate_and_begin_insert(&spl->mt_ctxt, &generation); + memtable_maybe_rotate_and_begin_insert(&spl->mt_ctxt, generation); while (STATUS_IS_EQ(rc, STATUS_BUSY)) { // Memtable isn't ready, do a task if available; may be required to // incorporate memtable that we're waiting on task_perform_one_if_needed(spl->ts, 0); - rc = memtable_maybe_rotate_and_begin_insert(&spl->mt_ctxt, &generation); + rc = memtable_maybe_rotate_and_begin_insert(&spl->mt_ctxt, generation); } if (!SUCCESS(rc)) { goto out; } // this call is safe because we hold the insert lock - memtable *mt = core_get_memtable(spl, generation); + memtable *mt = core_get_memtable(spl, *generation); uint64 leaf_generation; // used for ordering the log rc = memtable_insert(&spl->mt_ctxt, mt, PROCESS_PRIVATE_HEAP_ID, tuple_key, msg, + old_result, &leaf_generation); if (!SUCCESS(rc)) { goto unlock_insert_lock; } + /* TODO: FIXME: One way we could get stuck in a fetch-and-update is if the + * insert succeeds but the lookup fails (e.g. due to an I/O error while + * traversing the trunk). I think the promise we should make in that case is + * that we will preserve enough information in the log to enable the user + * to recover the old value. One way to do this might be to insert a + * reference to the trunk into the log. */ if (spl->cfg.use_log) { int crappy_rc = log_write(spl->log, tuple_key, msg, leaf_generation); if (crappy_rc != 0) { + rc = (platform_status){.r = crappy_rc}; goto unlock_insert_lock; } } + return STATUS_OK; + unlock_insert_lock: memtable_end_insert(&spl->mt_ctxt); out: @@ -716,6 +726,67 @@ core_memtable_lookup(core_handle *spl, cc, cfg, root_addr, type, target, result, NULL); } +static platform_status +core_lookup_memtables_locked(core_handle *spl, + uint64 mt_gen_start, + key target, + lookup_result *result, + bool32 *found_final) +{ + uint64 mt_gen_end = memtable_generation_retired(&spl->mt_ctxt); + platform_assert(mt_gen_start - mt_gen_end <= CORE_NUM_MEMTABLES); + + for (uint64 mt_gen = mt_gen_start; mt_gen != mt_gen_end; mt_gen--) { + platform_status rc = core_memtable_lookup(spl, mt_gen, target, result); + platform_assert_status_ok(rc); + if (!lookup_result_should_continue(result)) { + *found_final = TRUE; + return STATUS_OK; + } + } + + *found_final = FALSE; + return STATUS_OK; +} + +static platform_status +core_lookup_from_memtable_generation_locked(core_handle *spl, + uint64 mt_gen_start, + key target, + lookup_result *result) +{ + bool32 found_final = FALSE; + trunk_ondisk_node_handle root_handle; + + platform_status rc; + + if (mt_gen_start != (uint64)-1) { + rc = core_lookup_memtables_locked( + spl, mt_gen_start, target, result, &found_final); + if (found_final) { + memtable_end_lookup(&spl->mt_ctxt); + lookup_result_finalize(result, target); + return STATUS_OK; + } + } + + rc = trunk_init_root_handle(&spl->trunk_context, &root_handle); + memtable_end_lookup(&spl->mt_ctxt); + if (!SUCCESS(rc)) { + return rc; + } + + rc = trunk_merge_lookup( + &spl->trunk_context, &root_handle, target, result, NULL); + trunk_ondisk_node_handle_deinit(&root_handle); + if (!SUCCESS(rc)) { + return rc; + } + + lookup_result_finalize(result, target); + return STATUS_OK; +} + /* * Branch iterator wrapper functions */ @@ -1225,7 +1296,10 @@ core_range_iterator_deinit(core_range_iterator *range_itor) */ platform_status -core_insert(core_handle *spl, key tuple_key, message data) +core_insert(core_handle *spl, + key tuple_key, + message data, + lookup_result *old_result) { timestamp ts; const threadid tid = platform_get_tid(); @@ -1241,11 +1315,37 @@ core_insert(core_handle *spl, key tuple_key, message data) data = DELETE_MESSAGE; } - platform_status rc = core_memtable_insert(spl, tuple_key, data); + if (old_result != NULL) { + lookup_result_reset(old_result); + } + + uint64 generation; + platform_status rc = + core_memtable_insert(spl, tuple_key, data, old_result, &generation); if (!SUCCESS(rc)) { goto out; } + if (old_result != NULL) { + if (lookup_result_should_continue(old_result)) { + memtable_begin_lookup(&spl->mt_ctxt); + memtable_end_insert(&spl->mt_ctxt); + // Passing generation - 1 is allowed here + rc = core_lookup_from_memtable_generation_locked( + spl, generation - 1, tuple_key, old_result); + if (!SUCCESS(rc)) { + goto lookup_failed; + } + } else { + memtable_end_insert(&spl->mt_ctxt); + lookup_result_finalize(old_result, tuple_key); + } + } else { + memtable_end_insert(&spl->mt_ctxt); + } + +lookup_failed: + task_perform_one_if_needed(spl->ts, spl->cfg.queue_scale_percent); if (spl->cfg.use_stats) { @@ -1279,50 +1379,16 @@ core_insert(core_handle *spl, key tuple_key, message data) platform_status core_lookup(core_handle *spl, key target, lookup_result *result) { - // look in memtables - - // 1. get read lock on lookup lock - // --- 2. for [mt_no = mt->generation..mt->gen_to_incorp] - // 2. for gen = mt->generation; mt[gen % ...].gen == gen; gen --; - // also handles switch to READY ^^^^^ - lookup_result_reset(result); memtable_begin_lookup(&spl->mt_ctxt); - uint64 mt_gen_start = memtable_generation(&spl->mt_ctxt); - uint64 mt_gen_end = memtable_generation_retired(&spl->mt_ctxt); - platform_assert(mt_gen_start - mt_gen_end <= CORE_NUM_MEMTABLES); - - for (uint64 mt_gen = mt_gen_start; mt_gen != mt_gen_end; mt_gen--) { - platform_status rc = core_memtable_lookup(spl, mt_gen, target, result); - platform_assert_status_ok(rc); - if (!lookup_result_should_continue(result)) { - memtable_end_lookup(&spl->mt_ctxt); - goto found_final_answer; - } - } - - trunk_ondisk_node_handle root_handle; - platform_status rc; - rc = trunk_init_root_handle(&spl->trunk_context, &root_handle); - // release memtable lookup lock before we handle any errors - memtable_end_lookup(&spl->mt_ctxt); + uint64 mt_gen_start = memtable_generation(&spl->mt_ctxt); + platform_status rc = core_lookup_from_memtable_generation_locked( + spl, mt_gen_start, target, result); if (!SUCCESS(rc)) { return rc; } - - rc = trunk_merge_lookup( - &spl->trunk_context, &root_handle, target, result, NULL); - trunk_ondisk_node_handle_deinit(&root_handle); - if (!SUCCESS(rc)) { - return rc; - } - -found_final_answer: - - lookup_result_finalize(result, target); - if (spl->cfg.use_stats) { threadid tid = platform_get_tid(); if (lookup_result_found(result)) { @@ -1350,23 +1416,16 @@ core_lookup_async(core_lookup_async_state *state) lookup_result_reset(state->result); memtable_begin_lookup(&state->spl->mt_ctxt); - uint64 mt_gen_start = memtable_generation(&state->spl->mt_ctxt); - uint64 mt_gen_end = memtable_generation_retired(&state->spl->mt_ctxt); - platform_assert(mt_gen_start - mt_gen_end <= CORE_NUM_MEMTABLES); - - for (uint64 mt_gen = mt_gen_start; mt_gen != mt_gen_end; mt_gen--) { - platform_status rc; - - rc = - core_memtable_lookup(state->spl, mt_gen, state->target, state->result); - platform_assert_status_ok(rc); - if (!lookup_result_should_continue(state->result)) { - memtable_end_lookup(&state->spl->mt_ctxt); - goto found_final_answer; - } + uint64 mt_gen_start = memtable_generation(&state->spl->mt_ctxt); + bool32 found_final; + platform_status rc = core_lookup_memtables_locked( + state->spl, mt_gen_start, state->target, state->result, &found_final); + platform_assert_status_ok(rc); + if (found_final) { + memtable_end_lookup(&state->spl->mt_ctxt); + goto found_final_answer; } - platform_status rc; rc = trunk_init_root_handle(&state->spl->trunk_context, &state->root_handle); // release memtable lookup lock before we handle any errors memtable_end_lookup(&state->spl->mt_ctxt); diff --git a/src/core.h b/src/core.h index 32ae31ae..9e74f4ed 100644 --- a/src/core.h +++ b/src/core.h @@ -147,7 +147,10 @@ typedef struct core_range_iterator { */ platform_status -core_insert(core_handle *spl, key tuple_key, message data); +core_insert(core_handle *spl, + key tuple_key, + message data, + lookup_result *old_result); platform_status core_lookup(core_handle *spl, key target, lookup_result *result); diff --git a/src/memtable.c b/src/memtable.c index 581d0ab1..169960d4 100644 --- a/src/memtable.c +++ b/src/memtable.c @@ -14,8 +14,6 @@ #include "poison.h" -#define MEMTABLE_COUNT_GRANULARITY 128 - #define MEMTABLE_INSERT_LOCK_IDX 0 #define MEMTABLE_LOOKUP_LOCK_IDX 1 @@ -183,24 +181,6 @@ memtable_maybe_rotate_and_begin_insert(memtable_context *ctxt, } } -/* - *----------------------------------------------------------------------------- - * Increments the distributed tuple counter. Must hold a read lock on - * insert_lock. - * - * Add to local num_tuple counter. If at granularity, then increment the - * global counter. - * - * There is no race because thread_num_tuples is never accessed by another - * thread, it is only used by the current thread to determine when to change - * the global counter. - * - * each thread pads num_tuples by MEMTABLE_COUNT_GRANULARITY whenever it - * adds a tuple which takes its local count to - * k * MEMTABLE_COUNT_GRANULARITY + 1. Therefore, num_tuples is an upper - * bound except that each thread may still add 1 more tuple. - *----------------------------------------------------------------------------- - */ static inline void memtable_add_tuple(memtable_context *ctxt) { @@ -220,10 +200,10 @@ memtable_insert(memtable_context *ctxt, platform_heap_id heap_id, key tuple_key, message msg, + lookup_result *old_result, uint64 *leaf_generation) { const threadid tid = platform_get_tid(); - bool32 was_unique; btree_scratch *scratch = get_btree_scratch(ctxt, tid); platform_status rc = btree_insert(ctxt->cc, @@ -234,15 +214,13 @@ memtable_insert(memtable_context *ctxt, &mt->mini, tuple_key, msg, - leaf_generation, - &was_unique); + old_result, + leaf_generation); if (!SUCCESS(rc)) { return rc; } - if (was_unique) { - memtable_add_tuple(ctxt); - } + memtable_add_tuple(ctxt); return rc; } diff --git a/src/memtable.h b/src/memtable.h index 57721a28..568c2212 100644 --- a/src/memtable.h +++ b/src/memtable.h @@ -168,6 +168,7 @@ memtable_insert(memtable_context *ctxt, platform_heap_id heap_id, key tuple_key, message msg, + lookup_result *old_result, uint64 *generation); bool32 diff --git a/src/splinterdb.c b/src/splinterdb.c index 8b1c6ac9..0164ac54 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -474,53 +474,6 @@ splinterdb_close(splinterdb **kvs_in) // IN *kvs_in = (splinterdb *)NULL; } - -/* - *----------------------------------------------------------------------------- - * splinterdb_insert_raw_message -- - * - * Insert a key and a raw message into splinter - * - * Results: - * 0 on success, otherwise an errno - * - * Side effects: - * None. - *----------------------------------------------------------------------------- - */ -static int -splinterdb_insert_message(splinterdb *kvs, // IN - slice user_key, // IN - message msg // IN -) -{ - key tuple_key = key_create_from_slice(FALSE, user_key); - platform_assert(kvs != NULL); - platform_status status = core_insert(&kvs->spl, tuple_key, msg); - return platform_status_to_int(status); -} - -int -splinterdb_insert(splinterdb *kvsb, slice user_key, slice value) -{ - message msg = message_create(MESSAGE_TYPE_INSERT, value); - return splinterdb_insert_message(kvsb, user_key, msg); -} - -int -splinterdb_delete(splinterdb *kvsb, slice user_key) -{ - return splinterdb_insert_message(kvsb, user_key, DELETE_MESSAGE); -} - -int -splinterdb_update(splinterdb *kvsb, slice user_key, slice update) -{ - message msg = message_create(MESSAGE_TYPE_UPDATE, update); - platform_assert(kvsb->data_cfg->merge_tuples); - return splinterdb_insert_message(kvsb, user_key, msg); -} - void splinterdb_lookup_result_init(const splinterdb *kvs, // IN splinterdb_lookup_result *result, // IN/OUT @@ -602,6 +555,68 @@ splinterdb_lookup(splinterdb *kvs, // IN } +/* + *----------------------------------------------------------------------------- + * splinterdb_insert_raw_message -- + * + * Insert a key and a raw message into splinter + * + * Results: + * 0 on success, otherwise an errno + * + * Side effects: + * None. + *----------------------------------------------------------------------------- + */ +static int +splinterdb_insert_message(splinterdb *kvs, // IN + slice user_key, // IN + message msg, // IN + lookup_result *old_result // IN/OUT +) +{ + key tuple_key = key_create_from_slice(FALSE, user_key); + platform_assert(kvs != NULL); + platform_status status = core_insert(&kvs->spl, tuple_key, msg, old_result); + return platform_status_to_int(status); +} + +int +splinterdb_insert(splinterdb *kvsb, + slice user_key, + slice value, + splinterdb_lookup_result *old_result) +{ + message msg = message_create(MESSAGE_TYPE_INSERT, value); + lookup_result *_old_result = + old_result == NULL ? NULL : lookup_result_from_splinterdb(old_result); + return splinterdb_insert_message(kvsb, user_key, msg, _old_result); +} + +int +splinterdb_delete(splinterdb *kvsb, + slice user_key, + splinterdb_lookup_result *old_result) +{ + lookup_result *_old_result = + old_result == NULL ? NULL : lookup_result_from_splinterdb(old_result); + return splinterdb_insert_message( + kvsb, user_key, DELETE_MESSAGE, _old_result); +} + +int +splinterdb_update(splinterdb *kvsb, + slice user_key, + slice update, + splinterdb_lookup_result *old_result) +{ + message msg = message_create(MESSAGE_TYPE_UPDATE, update); + lookup_result *_old_result = + old_result == NULL ? NULL : lookup_result_from_splinterdb(old_result); + platform_assert(kvsb->data_cfg->merge_tuples); + return splinterdb_insert_message(kvsb, user_key, msg, _old_result); +} + struct splinterdb_iterator { core_range_iterator sri; platform_status last_rc; diff --git a/tests/functional/btree_test.c b/tests/functional/btree_test.c index 798435d6..ecbe5634 100644 --- a/tests/functional/btree_test.c +++ b/tests/functional/btree_test.c @@ -102,6 +102,7 @@ test_btree_insert(test_memtable_context *ctxt, key tuple_key, message data) ctxt->heap_id, tuple_key, data, + NULL, &dummy_leaf_generation); out: diff --git a/tests/functional/splinter_test.c b/tests/functional/splinter_test.c index ea97c63f..aac0568d 100644 --- a/tests/functional/splinter_test.c +++ b/tests/functional/splinter_test.c @@ -180,10 +180,10 @@ test_trunk_insert_thread(void *arg) core_max_key_size(spl), test_cfg[spl_idx].period); generate_test_message(test_cfg->gen, insert_num, &msg); - platform_status rc = - core_insert(spl, - key_buffer_key(&keybuf), - merge_accumulator_to_message(&msg)); + platform_status rc = core_insert(spl, + key_buffer_key(&keybuf), + merge_accumulator_to_message(&msg), + NULL); platform_assert_status_ok(rc); if (spl->cfg.use_stats) { ts = platform_timestamp_elapsed(ts); @@ -600,11 +600,11 @@ do_operation(test_splinter_thread_params *params, core_max_key_size(spl), test_cfg[spl_idx].period); generate_test_message(test_cfg->gen, op_num, &msg); - ts = platform_get_timestamp(); - platform_status rc = - core_insert(spl, - key_buffer_key(&keybuf), - merge_accumulator_to_message(&msg)); + ts = platform_get_timestamp(); + platform_status rc = core_insert(spl, + key_buffer_key(&keybuf), + merge_accumulator_to_message(&msg), + NULL); platform_assert_status_ok(rc); ts = platform_timestamp_elapsed(ts); params->insert_stats.duration += ts; diff --git a/tests/functional/test_functionality.c b/tests/functional/test_functionality.c index e3da0f52..2a823378 100644 --- a/tests/functional/test_functionality.c +++ b/tests/functional/test_functionality.c @@ -581,7 +581,8 @@ insert_random_messages(core_handle *spl, } test_data_generate_message(spl->cfg.data_cfg, op, ref_count, &msg); - rc = core_insert(spl, tuple_key, merge_accumulator_to_message(&msg)); + rc = + core_insert(spl, tuple_key, merge_accumulator_to_message(&msg), NULL); if (!SUCCESS(rc)) { goto cleanup; } diff --git a/tests/functional/ycsb_test.c b/tests/functional/ycsb_test.c index ec93e959..9b5596f4 100644 --- a/tests/functional/ycsb_test.c +++ b/tests/functional/ycsb_test.c @@ -360,7 +360,7 @@ ycsb_thread(void *arg) message_create(MESSAGE_TYPE_INSERT, slice_create(YCSB_DATA_SIZE, ops->value)); rc = core_insert( - spl, key_create(FALSE, YCSB_KEY_SIZE, ops->key), val); + spl, key_create(FALSE, YCSB_KEY_SIZE, ops->key), val, NULL); platform_assert_status_ok(rc); break; } diff --git a/tests/unit/btree_stress_test.c b/tests/unit/btree_stress_test.c index 62788b71..6b439c89 100644 --- a/tests/unit/btree_stress_test.c +++ b/tests/unit/btree_stress_test.c @@ -202,7 +202,6 @@ CTEST2(btree_stress, iterator_basics) for (int i = 0; i < 1000; i++) { uint64 generation; - bool32 was_unique; iterator_tests((cache *)&data->cc, &data->dbtree_cfg, root_addr, @@ -227,8 +226,8 @@ CTEST2(btree_stress, iterator_basics) &mini, gen_key(&data->dbtree_cfg, i, keybuf, sizeof(keybuf)), gen_msg(&data->dbtree_cfg, i, msgbuf, sizeof(msgbuf)), - &generation, - &was_unique))) + NULL, + &generation))) { ASSERT_TRUE(FALSE, "Failed to insert 4-byte %d\n", i); } @@ -358,6 +357,133 @@ CTEST2(btree_stress, test_random_inserts_concurrent) platform_free(hid, threads); } +CTEST2(btree_stress, overwrite_returns_old_value_after_tree_growth) +{ + btree_scratch *scratch = TYPED_MANUAL_ZALLOC( + data->hid, scratch, btree_scratch_size(&data->dbtree_cfg)); + ASSERT_NOT_NULL(scratch); + + mini_allocator mini; + uint64 root_addr = btree_create( + (cache *)&data->cc, &data->dbtree_cfg, &mini, PAGE_TYPE_MEMTABLE); + + uint64 bt_page_size = btree_page_size(&data->dbtree_cfg); + uint8 *keybuf = TYPED_MANUAL_MALLOC(data->hid, keybuf, bt_page_size); + uint8 *msgbuf = TYPED_MANUAL_MALLOC(data->hid, msgbuf, bt_page_size); + ASSERT_NOT_NULL(keybuf); + ASSERT_NOT_NULL(msgbuf); + + lookup_result uniqueness_result; + lookup_result_init(&uniqueness_result, + data->dbtree_cfg.data_cfg, + SPLINTERDB_LOOKUP_MIGHT_EXIST, + 0, + NULL); + + for (uint64 i = 0; i < 2048; i++) { + uint64 generation; + + platform_status rc = + btree_insert((cache *)&data->cc, + &data->dbtree_cfg, + data->hid, + scratch, + root_addr, + &mini, + gen_key(&data->dbtree_cfg, i, keybuf, bt_page_size), + gen_msg(&data->dbtree_cfg, i, msgbuf, bt_page_size), + &uniqueness_result, + &generation); + ASSERT_TRUE(SUCCESS(rc)); + ASSERT_FALSE(lookup_result_found(&uniqueness_result)); + } + + lookup_result old_result; + lookup_result verify_result; + lookup_result_init( + &old_result, data->dbtree_cfg.data_cfg, SPLINTERDB_LOOKUP_VALUE, 0, NULL); + lookup_result_init(&verify_result, + data->dbtree_cfg.data_cfg, + SPLINTERDB_LOOKUP_VALUE, + 0, + NULL); + + merge_accumulator update_msg; + merge_accumulator expected_new_msg; + merge_accumulator probe_msg; + merge_accumulator_init(&update_msg, PROCESS_PRIVATE_HEAP_ID); + merge_accumulator_init(&expected_new_msg, PROCESS_PRIVATE_HEAP_ID); + merge_accumulator_init(&probe_msg, PROCESS_PRIVATE_HEAP_ID); + + uint8 merged_ref_count = 2; + uint64 merged_msg_len = 0; + for (uint8 ref_count = 2; ref_count < 127; ref_count++) { + test_data_generate_message( + data->dbtree_cfg.data_cfg, MESSAGE_TYPE_INSERT, ref_count, &probe_msg); + if (merged_msg_len < merge_accumulator_length(&probe_msg)) { + merged_msg_len = merge_accumulator_length(&probe_msg); + merged_ref_count = ref_count; + } + } + ASSERT_TRUE(merged_msg_len > sizeof(data_handle)); + + test_data_generate_message(data->dbtree_cfg.data_cfg, + MESSAGE_TYPE_UPDATE, + merged_ref_count - 1, + &update_msg); + test_data_generate_message(data->dbtree_cfg.data_cfg, + MESSAGE_TYPE_INSERT, + merged_ref_count, + &expected_new_msg); + + for (uint64 i = 0; i < 2048; i += 257) { + uint64 generation; + + platform_status rc = + btree_insert((cache *)&data->cc, + &data->dbtree_cfg, + data->hid, + scratch, + root_addr, + &mini, + gen_key(&data->dbtree_cfg, i, keybuf, bt_page_size), + merge_accumulator_to_message(&update_msg), + &old_result, + &generation); + ASSERT_TRUE(SUCCESS(rc)); + ASSERT_TRUE(lookup_result_found(&old_result)); + ASSERT_EQUAL( + 0, + message_lex_cmp(merge_accumulator_to_message( + lookup_result_accumulator(&old_result)), + gen_msg(&data->dbtree_cfg, i, msgbuf, bt_page_size))); + + rc = btree_lookup((cache *)&data->cc, + &data->dbtree_cfg, + root_addr, + PAGE_TYPE_MEMTABLE, + gen_key(&data->dbtree_cfg, i, keybuf, bt_page_size), + &verify_result); + ASSERT_TRUE(SUCCESS(rc)); + ASSERT_TRUE(lookup_result_found(&verify_result)); + ASSERT_EQUAL( + 0, + message_lex_cmp(merge_accumulator_to_message( + lookup_result_accumulator(&verify_result)), + merge_accumulator_to_message(&expected_new_msg))); + } + + merge_accumulator_deinit(&probe_msg); + merge_accumulator_deinit(&expected_new_msg); + merge_accumulator_deinit(&update_msg); + lookup_result_deinit(&verify_result); + lookup_result_deinit(&old_result); + lookup_result_deinit(&uniqueness_result); + platform_free(data->hid, msgbuf); + platform_free(data->hid, keybuf); + platform_free(data->hid, scratch); +} + /* * ******************************************************************************** * Define minions and helper functions used by this test suite. @@ -388,7 +514,6 @@ insert_tests(cache *cc, int end) { uint64 generation; - bool32 was_unique; uint64 bt_page_size = btree_page_size(cfg); int keybuf_size = bt_page_size; @@ -405,8 +530,8 @@ insert_tests(cache *cc, mini, gen_key(cfg, i, keybuf, keybuf_size), gen_msg(cfg, i, msgbuf, msgbuf_size), - &generation, - &was_unique))) + NULL, + &generation))) { ASSERT_TRUE(FALSE, "Failed to insert 4-byte %ld\n", i); } diff --git a/tests/unit/large_inserts_stress_test.c b/tests/unit/large_inserts_stress_test.c index 0b9c1069..a1aba1a5 100644 --- a/tests/unit/large_inserts_stress_test.c +++ b/tests/unit/large_inserts_stress_test.c @@ -194,7 +194,7 @@ CTEST2_SKIP(large_inserts_stress, slice key = slice_create(strlen(key_data), key_data); slice val = slice_create(strlen(val_data), val_data); - int rc = splinterdb_insert(data->kvsb, key, val); + int rc = splinterdb_insert(data->kvsb, key, val, NULL); ASSERT_EQUAL(0, rc); } uint64 elapsed_ns = platform_timestamp_elapsed(start_time); @@ -707,7 +707,7 @@ exec_worker_thread(void *w) slice key = slice_create(key_len, key_data); slice val = slice_create(val_len, val_data); - int rc = splinterdb_insert(kvsb, key, val); + int rc = splinterdb_insert(kvsb, key, val, NULL); ASSERT_EQUAL(0, rc); } if (verbose_progress) { diff --git a/tests/unit/splinter_test.c b/tests/unit/splinter_test.c index 3372a604..2a9f9db9 100644 --- a/tests/unit/splinter_test.c +++ b/tests/unit/splinter_test.c @@ -729,8 +729,10 @@ splinter_do_inserts(void *datap, test_key(&keybuf, TEST_RANDOM, insert_num, 0, 0, key_size, 0); generate_test_message(&data->gen, insert_num, &msg); - rc = core_insert( - spl, key_buffer_key(&keybuf), merge_accumulator_to_message(&msg)); + rc = core_insert(spl, + key_buffer_key(&keybuf), + merge_accumulator_to_message(&msg), + NULL); ASSERT_TRUE(SUCCESS(rc), "trunk_insert() FAILURE: %s\n", platform_status_to_string(rc)); diff --git a/tests/unit/splinterdb_forked_child_test.c b/tests/unit/splinterdb_forked_child_test.c index 6f6b0c6d..42d181fe 100644 --- a/tests/unit/splinterdb_forked_child_test.c +++ b/tests/unit/splinterdb_forked_child_test.c @@ -213,12 +213,12 @@ CTEST2(splinterdb_forked_child, test_one_insert_then_close_bug) size_t to_insert_len = strlen(to_insert_data); slice to_insert = slice_create(to_insert_len, to_insert_data); - rc = splinterdb_insert(spl_handle, key, to_insert); + rc = splinterdb_insert(spl_handle, key, to_insert, NULL); ASSERT_EQUAL(0, rc); key_len = snprintf(key_data, sizeof(key_data), "%d", 2); key = slice_create(key_len, key_data); - rc = splinterdb_insert(spl_handle, key, to_insert); + rc = splinterdb_insert(spl_handle, key, to_insert, NULL); ASSERT_EQUAL(0, rc); } @@ -559,7 +559,7 @@ do_many_inserts(splinterdb *kvsb, uint64 num_inserts) slice key = slice_create(key_len, key_data); slice val = slice_create(val_len, val_data); - int rc = splinterdb_insert(kvsb, key, val); + int rc = splinterdb_insert(kvsb, key, val, NULL); ASSERT_EQUAL(0, rc); } if (verbose_progress) { diff --git a/tests/unit/splinterdb_quick_test.c b/tests/unit/splinterdb_quick_test.c index ba44eb41..0a4d7919 100644 --- a/tests/unit/splinterdb_quick_test.c +++ b/tests/unit/splinterdb_quick_test.c @@ -81,6 +81,13 @@ custom_key_comparator(const data_config *cfg, user_key key1, user_key key2); static uint64 sum_branch_lookups(const splinterdb *kvsb); +static void +assert_lookup_result_not_found(const splinterdb_lookup_result *result); + +static void +assert_lookup_result_matches_slice(const splinterdb_lookup_result *result, + slice expected); + typedef struct { data_config super; uint64 num_comparisons; @@ -174,7 +181,7 @@ CTEST2(splinterdb_quick, test_basic_flow) slice to_insert = slice_create(to_insert_len, to_insert_data); // Basic insert of new key should succeed. - rc = splinterdb_insert(data->kvsb, user_key, to_insert); + rc = splinterdb_insert(data->kvsb, user_key, to_insert, NULL); ASSERT_EQUAL(0, rc); // Lookup of inserted key should succeed. @@ -189,7 +196,7 @@ CTEST2(splinterdb_quick, test_basic_flow) ASSERT_STREQN(to_insert_data, slice_data(value), slice_length(value)); // Delete key - rc = splinterdb_delete(data->kvsb, user_key); + rc = splinterdb_delete(data->kvsb, user_key, NULL); ASSERT_EQUAL(0, rc); // Deleted key should not be found @@ -216,7 +223,7 @@ CTEST2(splinterdb_quick, test_apis_for_max_key_length) slice to_insert = slice_create(to_insert_len, to_insert_data); // **** Insert of a max-size key should succeed. - int rc = splinterdb_insert(data->kvsb, large_key, to_insert); + int rc = splinterdb_insert(data->kvsb, large_key, to_insert, NULL); ASSERT_EQUAL(0, rc); splinterdb_lookup_result result; @@ -238,7 +245,7 @@ CTEST2(splinterdb_quick, test_apis_for_max_key_length) slice_length(value), "Large key-value did not match as expected."); - rc = splinterdb_delete(data->kvsb, large_key); + rc = splinterdb_delete(data->kvsb, large_key, NULL); ASSERT_EQUAL(0, rc); // **** Should not find this large-key once it's deleted @@ -260,14 +267,14 @@ CTEST2(splinterdb_quick, test_key_size_gt_max_key_size) slice too_large_key = slice_create(too_large_key_len, too_large_key_data); int rc = splinterdb_insert( - data->kvsb, too_large_key, slice_create(sizeof("foo"), "foo")); + data->kvsb, too_large_key, slice_create(sizeof("foo"), "foo"), NULL); ASSERT_EQUAL(EINVAL, rc); splinterdb_lookup_result result; splinterdb_lookup_result_init( data->kvsb, &result, SPLINTERDB_LOOKUP_VALUE, 0, NULL); - rc = splinterdb_delete(data->kvsb, too_large_key); + rc = splinterdb_delete(data->kvsb, too_large_key, NULL); ASSERT_EQUAL(EINVAL, rc); splinterdb_lookup_result_deinit(&result); @@ -291,7 +298,7 @@ CTEST2(splinterdb_quick, test_value_size_gt_max_value_size) slice_create(too_large_value_len, too_large_value_data); int rc = splinterdb_insert( - data->kvsb, slice_create(sizeof("foo"), "foo"), too_large_value); + data->kvsb, slice_create(sizeof("foo"), "foo"), too_large_value, NULL); ASSERT_EQUAL(EINVAL, rc); platform_free(data->cfg.heap_id, too_large_value_data); @@ -320,24 +327,30 @@ CTEST2(splinterdb_quick, test_variable_length_values) memset(max_length_string, 'b', TEST_MAX_VALUE_SIZE); // Insert keys with different value (lengths) - int rc = splinterdb_insert( - data->kvsb, key_empty, slice_create(sizeof(empty_string), empty_string)); + int rc = splinterdb_insert(data->kvsb, + key_empty, + slice_create(sizeof(empty_string), empty_string), + NULL); ASSERT_EQUAL(0, rc); - rc = splinterdb_insert( - data->kvsb, key_short, slice_create(sizeof(short_string), short_string)); + rc = splinterdb_insert(data->kvsb, + key_short, + slice_create(sizeof(short_string), short_string), + NULL); ASSERT_EQUAL(0, rc); rc = splinterdb_insert( data->kvsb, key_long, - slice_create(sizeof(almost_max_length_string), almost_max_length_string)); + slice_create(sizeof(almost_max_length_string), almost_max_length_string), + NULL); ASSERT_EQUAL(0, rc); rc = splinterdb_insert( data->kvsb, key_max, - slice_create(sizeof(max_length_string), max_length_string)); + slice_create(sizeof(max_length_string), max_length_string), + NULL); ASSERT_EQUAL(0, rc); @@ -725,7 +738,8 @@ CTEST2(splinterdb_quick, test_close_and_reopen) const char *val = "some-value"; const size_t val_len = strlen(val); - int rc = splinterdb_insert(data->kvsb, user_key, slice_create(val_len, val)); + int rc = + splinterdb_insert(data->kvsb, user_key, slice_create(val_len, val), NULL); ASSERT_EQUAL(0, rc); // Close and re-open the database @@ -770,7 +784,8 @@ CTEST2(splinterdb_quick, test_repeated_insert_close_reopen) for (int i = 0; i < 20; i++) { int rc = splinterdb_insert(data->kvsb, slice_create(key_len, keystring), - slice_create(val_len, val)); + slice_create(val_len, val), + NULL); ASSERT_EQUAL(0, rc, "Insert is expected to pass, iter=%d.", i); splinterdb_close(&data->kvsb); @@ -799,7 +814,7 @@ CTEST2(splinterdb_quick, test_custom_data_config) slice msg_slice = slice_create(sizeof(msg), &msg); ASSERT_EQUAL(0, rc); - rc = splinterdb_insert(data->kvsb, user_key, msg_slice); + rc = splinterdb_insert(data->kvsb, user_key, msg_slice, NULL); // confirm its there splinterdb_lookup_result result; @@ -816,7 +831,7 @@ CTEST2(splinterdb_quick, test_custom_data_config) // insert a message that adds to the refcount msg.ref_count = 5; - rc = splinterdb_update(data->kvsb, user_key, msg_slice); + rc = splinterdb_update(data->kvsb, user_key, msg_slice, NULL); ASSERT_EQUAL(0, rc); // check still found @@ -826,7 +841,7 @@ CTEST2(splinterdb_quick, test_custom_data_config) // insert a message that drops the refcount to zero msg.ref_count = -6; - rc = splinterdb_update(data->kvsb, user_key, msg_slice); + rc = splinterdb_update(data->kvsb, user_key, msg_slice, NULL); ASSERT_EQUAL(0, rc); // on lookup, merge will decide the tuple is deleted @@ -836,11 +851,11 @@ CTEST2(splinterdb_quick, test_custom_data_config) // add it back as a value msg.ref_count = 12; - rc = splinterdb_insert(data->kvsb, user_key, msg_slice); + rc = splinterdb_insert(data->kvsb, user_key, msg_slice, NULL); ASSERT_EQUAL(0, rc); // delete it using a raw message - rc = splinterdb_delete(data->kvsb, user_key); + rc = splinterdb_delete(data->kvsb, user_key, NULL); ASSERT_EQUAL(0, rc); // on lookup, it should not be found @@ -864,7 +879,7 @@ CTEST2(splinterdb_quick, test_existence_only_memtable_lookup) ASSERT_EQUAL(0, rc); ASSERT_FALSE(splinterdb_lookup_found(&result)); - rc = splinterdb_insert(data->kvsb, user_key, value); + rc = splinterdb_insert(data->kvsb, user_key, value, NULL); ASSERT_EQUAL(0, rc); rc = splinterdb_lookup(data->kvsb, user_key, &result); @@ -875,7 +890,7 @@ CTEST2(splinterdb_quick, test_existence_only_memtable_lookup) rc = splinterdb_lookup_result_value(&result, &looked_up_value); ASSERT_EQUAL(EINVAL, rc); - rc = splinterdb_delete(data->kvsb, user_key); + rc = splinterdb_delete(data->kvsb, user_key, NULL); ASSERT_EQUAL(0, rc); rc = splinterdb_lookup(data->kvsb, user_key, &result); @@ -897,7 +912,7 @@ CTEST2(splinterdb_quick, test_existence_only_trunk_lookup_skips_branches) slice user_key = slice_create(strlen("trunk-key"), "trunk-key"); slice value = slice_create(strlen("trunk-value"), "trunk-value"); - rc = splinterdb_insert(data->kvsb, user_key, value); + rc = splinterdb_insert(data->kvsb, user_key, value, NULL); ASSERT_EQUAL(0, rc); splinterdb_close(&data->kvsb); @@ -935,6 +950,189 @@ CTEST2(splinterdb_quick, test_existence_only_trunk_lookup_skips_branches) splinterdb_lookup_result_deinit(&normal_result); } +CTEST2(splinterdb_quick, test_write_api_returns_old_value) +{ + slice user_key = slice_create(strlen("old-value-key"), "old-value-key"); + slice value0 = slice_create(strlen("old-value-0"), "old-value-0"); + slice value1 = slice_create(strlen("old-value-1"), "old-value-1"); + + splinterdb_lookup_result old_result; + splinterdb_lookup_result_init( + data->kvsb, &old_result, SPLINTERDB_LOOKUP_VALUE, 0, NULL); + + int rc = splinterdb_insert(data->kvsb, user_key, value0, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_not_found(&old_result); + + rc = splinterdb_insert(data->kvsb, user_key, value1, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_matches_slice(&old_result, value0); + + rc = splinterdb_delete(data->kvsb, user_key, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_matches_slice(&old_result, value1); + + rc = splinterdb_insert(data->kvsb, user_key, value0, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_not_found(&old_result); + + splinterdb_lookup_result_deinit(&old_result); +} + +CTEST2(splinterdb_quick, test_write_api_old_result_existence_only) +{ + slice user_key = slice_create(strlen("exist-old"), "exist-old"); + slice value0 = slice_create(strlen("existence-old-0"), "existence-old-0"); + slice value1 = slice_create(strlen("existence-old-1"), "existence-old-1"); + + splinterdb_lookup_result old_result; + splinterdb_lookup_result_init( + data->kvsb, &old_result, SPLINTERDB_LOOKUP_MIGHT_EXIST, 0, NULL); + + int rc = splinterdb_insert(data->kvsb, user_key, value0, &old_result); + ASSERT_EQUAL(0, rc); + ASSERT_FALSE(splinterdb_lookup_found(&old_result)); + + rc = splinterdb_insert(data->kvsb, user_key, value1, &old_result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_lookup_found(&old_result)); + + slice looked_up_value; + rc = splinterdb_lookup_result_value(&old_result, &looked_up_value); + ASSERT_EQUAL(EINVAL, rc); + + rc = splinterdb_delete(data->kvsb, user_key, &old_result); + ASSERT_EQUAL(0, rc); + ASSERT_TRUE(splinterdb_lookup_found(&old_result)); + + rc = splinterdb_insert(data->kvsb, user_key, value0, &old_result); + ASSERT_EQUAL(0, rc); + ASSERT_FALSE(splinterdb_lookup_found(&old_result)); + + splinterdb_lookup_result_deinit(&old_result); +} + +CTEST2(splinterdb_quick, test_write_api_old_result_custom_merge_semantics) +{ + splinterdb_close(&data->kvsb); + data->cfg.data_cfg = test_data_config; + data->cfg.data_cfg->max_key_size = 20; + int rc = splinterdb_create(&data->cfg, &data->kvsb); + ASSERT_EQUAL(0, rc); + + slice user_key = slice_create(strlen("merge-key"), "merge-key"); + data_handle msg = {.ref_count = 1}; + slice msg_slice = slice_create(sizeof(msg), &msg); + + splinterdb_lookup_result old_result; + splinterdb_lookup_result_init( + data->kvsb, &old_result, SPLINTERDB_LOOKUP_VALUE, 0, NULL); + + rc = splinterdb_insert(data->kvsb, user_key, msg_slice, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_not_found(&old_result); + + msg.ref_count = 5; + msg_slice = slice_create(sizeof(msg), &msg); + rc = splinterdb_update(data->kvsb, user_key, msg_slice, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_matches_slice( + &old_result, slice_create(sizeof(msg), &(data_handle){.ref_count = 1})); + + merge_accumulator expected; + merge_accumulator_init(&expected, PROCESS_PRIVATE_HEAP_ID); + test_data_generate_message( + test_data_config, MESSAGE_TYPE_INSERT, 6, &expected); + + msg.ref_count = -6; + msg_slice = slice_create(sizeof(msg), &msg); + rc = splinterdb_update(data->kvsb, user_key, msg_slice, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_matches_slice(&old_result, + merge_accumulator_to_value(&expected)); + + merge_accumulator_deinit(&expected); + + msg.ref_count = 12; + msg_slice = slice_create(sizeof(msg), &msg); + rc = splinterdb_insert(data->kvsb, user_key, msg_slice, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_not_found(&old_result); + + splinterdb_lookup_result_deinit(&old_result); +} + +CTEST2(splinterdb_quick, test_write_api_old_result_merges_memtable_and_trunk) +{ + splinterdb_close(&data->kvsb); + data->cfg.data_cfg = test_data_config; + data->cfg.data_cfg->max_key_size = 20; + int rc = splinterdb_create(&data->cfg, &data->kvsb); + ASSERT_EQUAL(0, rc); + + slice user_key = slice_create(strlen("trunk-merge-key"), "trunk-merge-key"); + data_handle msg = {.ref_count = 3}; + slice msg_slice = slice_create(sizeof(msg), &msg); + + rc = splinterdb_insert(data->kvsb, user_key, msg_slice, NULL); + ASSERT_EQUAL(0, rc); + + splinterdb_close(&data->kvsb); + rc = splinterdb_open(&data->cfg, &data->kvsb); + ASSERT_EQUAL(0, rc); + + msg.ref_count = 4; + msg_slice = slice_create(sizeof(msg), &msg); + rc = splinterdb_update(data->kvsb, user_key, msg_slice, NULL); + ASSERT_EQUAL(0, rc); + + splinterdb_lookup_result old_result; + splinterdb_lookup_result_init( + data->kvsb, &old_result, SPLINTERDB_LOOKUP_VALUE, 0, NULL); + + merge_accumulator expected; + merge_accumulator_init(&expected, PROCESS_PRIVATE_HEAP_ID); + test_data_generate_message( + test_data_config, MESSAGE_TYPE_INSERT, 7, &expected); + + msg.ref_count = 2; + msg_slice = slice_create(sizeof(msg), &msg); + rc = splinterdb_update(data->kvsb, user_key, msg_slice, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_matches_slice(&old_result, + merge_accumulator_to_value(&expected)); + + merge_accumulator_deinit(&expected); + splinterdb_lookup_result_deinit(&old_result); +} + +CTEST2(splinterdb_quick, test_write_api_old_result_respects_trunk_delete_shadow) +{ + slice user_key = slice_create(strlen("shadow-key"), "shadow-key"); + slice value0 = slice_create(strlen("shadow-old"), "shadow-old"); + slice value1 = slice_create(strlen("shadow-new"), "shadow-new"); + + int rc = splinterdb_insert(data->kvsb, user_key, value0, NULL); + ASSERT_EQUAL(0, rc); + + splinterdb_close(&data->kvsb); + rc = splinterdb_open(&data->cfg, &data->kvsb); + ASSERT_EQUAL(0, rc); + + rc = splinterdb_delete(data->kvsb, user_key, NULL); + ASSERT_EQUAL(0, rc); + + splinterdb_lookup_result old_result; + splinterdb_lookup_result_init( + data->kvsb, &old_result, SPLINTERDB_LOOKUP_VALUE, 0, NULL); + + rc = splinterdb_insert(data->kvsb, user_key, value1, &old_result); + ASSERT_EQUAL(0, rc); + assert_lookup_result_not_found(&old_result); + + splinterdb_lookup_result_deinit(&old_result); +} + CTEST2(splinterdb_quick, test_iterator_custom_comparator) { // We need to reconfigure Splinter with user-specified key comparator fn. @@ -1100,6 +1298,24 @@ sum_branch_lookups(const splinterdb *kvsb) return total; } +static void +assert_lookup_result_not_found(const splinterdb_lookup_result *result) +{ + ASSERT_FALSE(splinterdb_lookup_found(result)); +} + +static void +assert_lookup_result_matches_slice(const splinterdb_lookup_result *result, + slice expected) +{ + slice value; + + ASSERT_TRUE(splinterdb_lookup_found(result)); + ASSERT_EQUAL(0, splinterdb_lookup_result_value(result, &value)); + ASSERT_EQUAL(slice_length(expected), slice_length(value)); + ASSERT_EQUAL(0, slice_lex_cmp(value, expected)); +} + /* * Helper function to insert n-keys (num_inserts), using pre-formatted * key and value strings. @@ -1118,8 +1334,10 @@ insert_some_keys(const int num_inserts, splinterdb *kvsb) ASSERT_EQUAL(KEY_FMT_LENGTH, snprintf(key, sizeof(key), key_fmt, i)); ASSERT_EQUAL(VAL_FMT_LENGTH, snprintf(val, sizeof(val), val_fmt, i)); - rc = splinterdb_insert( - kvsb, slice_create(sizeof(key), key), slice_create(sizeof(val), val)); + rc = splinterdb_insert(kvsb, + slice_create(sizeof(key), key), + slice_create(sizeof(val), val), + NULL); ASSERT_EQUAL(0, rc); } @@ -1157,8 +1375,10 @@ insert_keys(splinterdb *kvsb, const int minkey, int numkeys, const int incr) snprintf(key, sizeof(key), key_fmt, kctr); snprintf(val, sizeof(val), val_fmt, kctr); - rc = splinterdb_insert( - kvsb, slice_create(sizeof(key), key), slice_create(sizeof(val), val)); + rc = splinterdb_insert(kvsb, + slice_create(sizeof(key), key), + slice_create(sizeof(val), val), + NULL); ASSERT_EQUAL(0, rc); } return rc; diff --git a/tests/unit/splinterdb_stress_test.c b/tests/unit/splinterdb_stress_test.c index 89248bd3..41b01223 100644 --- a/tests/unit/splinterdb_stress_test.c +++ b/tests/unit/splinterdb_stress_test.c @@ -123,7 +123,8 @@ CTEST2(splinterdb_stress, test_naive_range_delete) random_bytes(&rand_state, value_buffer, TEST_VALUE_SIZE); int rc = splinterdb_insert(data->kvsb, slice_create(TEST_KEY_SIZE, key_buffer), - slice_create(TEST_VALUE_SIZE, value_buffer)); + slice_create(TEST_VALUE_SIZE, value_buffer), + NULL); ASSERT_EQUAL(0, rc); } @@ -156,7 +157,7 @@ CTEST2(splinterdb_stress, test_iterator_over_many_kvs) snprintf(key_str, sizeof(key_str), "key-%08x", i); slice key = slice_create(sizeof(key_str), key_str); slice val = slice_create(sizeof(value_str), value_str); - ASSERT_EQUAL(0, splinterdb_insert(data->kvsb, key, val)); + ASSERT_EQUAL(0, splinterdb_insert(data->kvsb, key, val, NULL)); } // create an iterator at end of keys @@ -247,7 +248,7 @@ CTEST2_SKIP(splinterdb_stress, test_issue_458_mini_destroy_unused_debug_assert) slice key = slice_create(strlen(key_data), key_data); slice val = slice_create(strlen(val_data), val_data); - int rc = splinterdb_insert(data->kvsb, key, val); + int rc = splinterdb_insert(data->kvsb, key, val, NULL); ASSERT_EQUAL(0, rc); } uint64 elapsed_ns = platform_timestamp_elapsed(start_time); @@ -299,7 +300,8 @@ exec_worker_thread(void *w) rc = splinterdb_insert(kvsb, slice_create(TEST_KEY_SIZE, key_buf), - slice_create(TEST_VALUE_SIZE, value_buf)); + slice_create(TEST_VALUE_SIZE, value_buf), + NULL); ASSERT_EQUAL(0, rc); if (i && (i % 100000 == 0)) { @@ -344,7 +346,7 @@ naive_range_delete(splinterdb *kvsb, slice start_key, uint32 count) for (uint32 i = 0; i < num_found; i++) { slice key_to_delete = slice_create(TEST_KEY_SIZE, keys_to_delete + i * TEST_KEY_SIZE); - splinterdb_delete(kvsb, key_to_delete); + splinterdb_delete(kvsb, key_to_delete, NULL); } free(keys_to_delete);