diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 5e4f9eda..c968927d 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -56,13 +56,16 @@ jobs: name: ${{ matrix.compiler_mode }} ${{ matrix.build_mode }} ${{ matrix.target }} ${{ matrix.tests_function }} runs-on: ubuntu-latest env: - CC: ${{ startsWith(matrix.compiler_mode, 'gcc') && 'gcc' || 'clang' }} - LD: ${{ startsWith(matrix.compiler_mode, 'gcc') && 'gcc' || 'clang' }} + CC: ${{ startsWith(matrix.compiler_mode, 'gcc') && 'gcc' || 'clang-19' }} + LD: ${{ startsWith(matrix.compiler_mode, 'gcc') && 'gcc' || 'clang-19' }} BUILD_MODE: ${{ matrix.build_mode }} BUILD_ASAN: ${{ endsWith(matrix.compiler_mode, 'asan') && 1 || 0 }} BUILD_MSAN: ${{ endsWith(matrix.compiler_mode, 'msan') && 1 || 0 }} TESTS_FUNCTION: ${{ matrix.tests_function }} steps: + # - name: Free Runner Space + # # NOTE: Use a specific tag or commit shasum for immutability + # uses: justinthelaw/maximize-github-runner-space@v0.9.0 - name: Maximize build space uses: rtjohnso/maximize-build-space@v1 with: @@ -73,7 +76,7 @@ jobs: remove-android: true remove-haskell: true remove-codeql: true - remove-docker-images: false + remove-docker-images: true - uses: actions/checkout@v4 - uses: awalsh128/cache-apt-pkgs-action@latest with: diff --git a/Makefile b/Makefile index dd839b10..0645dc62 100644 --- a/Makefile +++ b/Makefile @@ -133,6 +133,25 @@ endif help:: @echo ' BUILD_MODE: "release", "debug", or "optimized-debug" (Default: "release")' +# ************************************************************************ +# Memory allocation fault injection +ifndef BUILD_MEMORY_FAULT_INJECTION + BUILD_MEMORY_FAULT_INJECTION=0 +endif + +ifeq "$(BUILD_MEMORY_FAULT_INJECTION)" "1" + CFLAGS += -DPLATFORM_MEMORY_FAULT_INJECTION=1 +else ifeq "$(BUILD_MEMORY_FAULT_INJECTION)" "0" + CFLAGS += -DPLATFORM_MEMORY_FAULT_INJECTION=0 +else + $(error Unknown BUILD_MEMORY_FAULT_INJECTION mode "$(BUILD_MEMORY_FAULT_INJECTION)". Valid values are "0" or "1". Default is "0") +endif + +help:: + @echo ' BUILD_MEMORY_FAULT_INJECTION={0,1}: Disable/enable allocation fault injection (Default: disabled)' + +TEST_MEMORY_FAULT_DISABLED_CFLAGS = -DPLATFORM_MEMORY_FAULT_INJECTION_DISABLED_IN_THIS_FILE + # ************************************************************************ # Address sanitizer # - Ctests will be silently skipped with clang builds. (Known issue.) @@ -245,6 +264,11 @@ FUNCTIONAL_TESTOBJ= $(FUNCTIONAL_TESTSRC:%.c=$(OBJDIR)/%.o) # Resolves to a list: obj/tests/unit/a.o obj/tests/unit/b.o obj/tests/unit/c.o FAST_UNIT_TESTOBJS := $(FAST_UNIT_TESTSRC:%.c=$(OBJDIR)/%.o) +TESTOBJ := $(TESTSRC:%.c=$(OBJDIR)/%.o) + +$(filter-out $(OBJDIR)/tests/unit/platform_apis_test.o,$(TESTOBJ)): \ + EXTRA_CFLAGS += $(TEST_MEMORY_FAULT_DISABLED_CFLAGS) + # ---- # Binaries from unit-test sources in tests/unit/ sub-dir # Although the sources are in, say, tests/unit/splinterdb_quick_test.c, and so on @@ -275,7 +299,7 @@ all-examples: $(EXAMPLES_BINS) # any mismatched config from a prior build, so we can ensure we never # accidentially build using a mixture of configs -CONFIG_HASH = $(shell echo $(CC) $(DEPFLAGS) $(CFLAGS) $(INCLUDE) $(TARGET_ARCH) $(LD) $(LDFLAGS) $(LIBS) $(AR) | md5sum | cut -f1 -d" ") +CONFIG_HASH = $(shell echo $(CC) $(DEPFLAGS) $(CFLAGS) $(TEST_MEMORY_FAULT_DISABLED_CFLAGS) $(INCLUDE) $(TARGET_ARCH) $(LD) $(LDFLAGS) $(LIBS) $(AR) | md5sum | cut -f1 -d" ") CONFIG_FILE_PREFIX = $(BUILD_PATH)/build-config. CONFIG_FILE = $(CONFIG_FILE_PREFIX)$(CONFIG_HASH) @@ -292,6 +316,7 @@ $(CONFIG_FILE): | $(BUILD_PATH)/. mismatched_config_file_check $(COMMAND) echo CC = $(CC) >> $@ $(COMMAND) echo DEPFLAGS = $(DEPFLAGS) >> $@ $(COMMAND) echo CFLAGS = $(CFLAGS) >> $@ + $(COMMAND) echo TEST_MEMORY_FAULT_DISABLED_CFLAGS = $(TEST_MEMORY_FAULT_DISABLED_CFLAGS) >> $@ $(COMMAND) echo INCLUDE = $(INCLUDE) >> $@ $(COMMAND) echo TARGET_ARCH = $(TARGET_ARCH) >> $@ $(COMMAND) echo LD = $(LD) >> $@ @@ -322,7 +347,7 @@ $(BINDIR)/%/.: # RECIPES # -COMPILE.c = $(CC) $(DEPFLAGS) -MT $@ -MF $(OBJDIR)/$*.d $(CFLAGS) $(GIT_VERSION_CFLAGS) $(INCLUDE) $(TARGET_ARCH) -c +COMPILE.c = $(CC) $(DEPFLAGS) -MT $@ -MF $(OBJDIR)/$*.d $(CFLAGS) $(GIT_VERSION_CFLAGS) $(EXTRA_CFLAGS) $(INCLUDE) $(TARGET_ARCH) -c $(OBJDIR)/%.o: %.c | $$(@D)/. $(CONFIG_FILE) $(BRIEF_FORMATTED) "%-20s %-50s [%s]\n" Compiling $< $@ diff --git a/src/btree.c b/src/btree.c index d6e50ce0..ecb1555a 100644 --- a/src/btree.c +++ b/src/btree.c @@ -3701,7 +3701,7 @@ btree_pack_loop(btree_pack_req *req, // IN/OUT log_trace_key(tuple_key, "btree_pack_loop (bottom)"); - if (req->max_tuples > 0 && req->cfg->data_cfg->key_hash != NULL) { + if (req->fingerprint_arr != NULL) { platform_assert(req->num_tuples < req->max_tuples); req->fingerprint_arr[req->num_tuples] = data_key_hash(req->cfg->data_cfg, tuple_key, req->seed); diff --git a/src/btree.h b/src/btree.h index 7fca0cc4..e75a8b5e 100644 --- a/src/btree.h +++ b/src/btree.h @@ -10,6 +10,7 @@ #pragma once #include "platform_hash.h" +#include "platform_log.h" #include "platform_typed_alloc.h" #include "async.h" #include "blob_build.h" @@ -358,6 +359,7 @@ btree_pack_req_init(btree_pack_req *req, iterator *itor, uint64 max_tuples, unsigned int seed, + bool32 collect_fingerprints, platform_heap_id hid) { memset(req, 0, sizeof(*req)); @@ -367,16 +369,15 @@ btree_pack_req_init(btree_pack_req *req, req->max_tuples = max_tuples; req->seed = seed; merge_accumulator_init(&req->blob_buffer, hid); - if (cfg->data_cfg->key_hash != NULL && max_tuples > 0) { + if (collect_fingerprints && cfg->data_cfg->key_hash != NULL + && max_tuples > 0) + { req->fingerprint_arr = TYPED_ARRAY_ZALLOC(hid, req->fingerprint_arr, max_tuples); - - // When we run with shared-memory configured, we expect that it is sized - // big-enough to not get OOMs from here. Hence, only a debug_assert(). - debug_assert(req->fingerprint_arr, - "Unable to allocate memory for %lu tuples", - max_tuples); if (!req->fingerprint_arr) { + platform_error_log("btree_pack_req_init: failed to allocate " + "fingerprint array for %lu tuples\n", + max_tuples); return STATUS_NO_MEMORY; } } diff --git a/src/clockcache.c b/src/clockcache.c index 4f9c66e6..55d1294f 100644 --- a/src/clockcache.c +++ b/src/clockcache.c @@ -842,7 +842,12 @@ clockcache_write_callback(void *wbs) return; } - platform_assert_status_ok(io_async_state_get_result(state->iostate)); + platform_status rc = io_async_state_get_result(state->iostate); + if (!SUCCESS(rc)) { + platform_error_log("clockcache_write_callback: async write failed: %s\n", + platform_status_to_string(rc)); + } + platform_assert_status_ok(rc); const struct iovec *iovec; uint64 count; @@ -882,7 +887,24 @@ clockcache_write_callback(void *wbs) } io_async_state_deinit(state->iostate); - platform_free(cc->heap_id, state); + platform_free(PROCESS_PRIVATE_HEAP_ID, state); +} + +static void +clockcache_abort_writeback_range(clockcache *cc, + uint64 first_addr, + uint64 end_addr) +{ + uint64 page_size = clockcache_page_size(cc); + + for (uint64 addr = first_addr; addr < end_addr; addr += page_size) { + uint32 entry_number = clockcache_lookup(cc, addr); + platform_assert(entry_number != CC_UNMAPPED_ENTRY); + + debug_only uint32 was_writeback = + clockcache_clear_flag(cc, entry_number, CC_WRITEBACK); + debug_assert(was_writeback); + } } @@ -963,27 +985,35 @@ clockcache_batch_start_writeback(clockcache *cc, uint64 batch, bool32 is_urgent) async_io_state *state; - while ((state = TYPED_MALLOC(cc->heap_id, state)) == NULL) { - clockcache_wait(cc); + state = TYPED_MALLOC(PROCESS_PRIVATE_HEAP_ID, state); + if (state == NULL) { + platform_error_log( + "clockcache_batch_start_writeback: async_io_state allocation " + "failed\n"); + clockcache_abort_writeback_range(cc, first_addr, end_addr); + goto close_log; } state->cc = cc; state->outstanding_pages = NULL; - io_async_state_init(state->iostate, - cc->io, - io_async_pwritev, - first_addr, - clockcache_write_callback, - state); + platform_status rc = io_async_state_init(state->iostate, + cc->io, + io_async_pwritev, + first_addr, + clockcache_write_callback, + state); + if (!SUCCESS(rc)) { + platform_error_log("clockcache_batch_start_writeback: " + "io_async_state_init failed: %s\n", + platform_status_to_string(rc)); + clockcache_abort_writeback_range(cc, first_addr, end_addr); + platform_free(PROCESS_PRIVATE_HEAP_ID, state); + goto close_log; + } uint64 req_count = clockcache_divide_by_page_size(cc, end_addr - first_addr); - if (cc->cfg->use_stats) { - cc->stats[tid].page_writes[entry->type] += req_count; - cc->stats[tid].writes_issued++; - } - for (i = 0; i < req_count; i++) { addr = first_addr + clockcache_multiply_by_page_size(cc, i); next_entry = clockcache_lookup_entry(cc, addr); @@ -994,12 +1024,29 @@ clockcache_batch_start_writeback(clockcache *cc, uint64 batch, bool32 is_urgent) "flush: entry %u addr %lu\n", next_entry_no, addr); - io_async_state_append_page(state->iostate, next_entry->page.data); + rc = io_async_state_append_page(state->iostate, + next_entry->page.data); + if (!SUCCESS(rc)) { + platform_error_log("clockcache_batch_start_writeback: " + "io_async_state_append_page failed: %s\n", + platform_status_to_string(rc)); + io_async_state_deinit(state->iostate); + clockcache_abort_writeback_range(cc, first_addr, end_addr); + platform_free(PROCESS_PRIVATE_HEAP_ID, state); + goto close_log; + } + } + + if (cc->cfg->use_stats) { + cc->stats[tid].page_writes[entry->type] += req_count; + cc->stats[tid].writes_issued++; } io_async_run(state->iostate); } } + +close_log: clockcache_close_log_stream(); } @@ -1571,6 +1618,15 @@ clockcache_get_in_cache(clockcache *cc, // IN return FALSE; } +static void +clockcache_release_unpublished_entry(clockcache_entry *entry) +{ + entry->page.disk_addr = CC_UNMAPPED_ADDR; + entry->type = PAGE_TYPE_INVALID; + platform_assert(entry->waiters.head == NULL); + entry->status = CC_FREE_STATUS; +} + static uint64 clockcache_acquire_entry_for_load(clockcache *cc, // IN uint64 addr, @@ -1592,9 +1648,7 @@ clockcache_acquire_entry_for_load(clockcache *cc, // IN &cc->lookup[lookup_no], CC_UNMAPPED_ENTRY, entry_number)) { clockcache_dec_ref(cc, entry_number, tid); - platform_assert(entry->waiters.head == NULL); - entry->type = PAGE_TYPE_INVALID; - entry->status = CC_FREE_STATUS; + clockcache_release_unpublished_entry(entry); clockcache_log(addr, entry_number, "get abort: entry: %u addr: %lu\n", @@ -1649,6 +1703,13 @@ clockcache_get_from_disk(clockcache *cc, // IN } platform_status status = io_read(cc->io, entry->page.data, page_size, addr); + if (!SUCCESS(status)) { + platform_error_log("clockcache_get_from_disk: io_read failed for addr " + "%lu, type %u: %s\n", + addr, + type, + platform_status_to_string(status)); + } platform_assert_status_ok(status); if (cc->cfg->use_stats) { @@ -1856,7 +1917,15 @@ clockcache_get_from_disk_async_callback(void *arg) if (io_async_run(state->iostate) != ASYNC_STATUS_DONE) { return; } - platform_assert_status_ok(io_async_state_get_result(state->iostate)); + platform_status rc = io_async_state_get_result(state->iostate); + if (!SUCCESS(rc)) { + platform_error_log("clockcache_get_from_disk_async_callback: async read " + "failed for addr %lu, entry %lu: %s\n", + state->addr, + state->entry_number, + platform_status_to_string(rc)); + } + platform_assert_status_ok(rc); clockcache_finish_load(state->cc, state->addr, state->entry_number); state->callback(state->callback_arg); @@ -1888,10 +1957,26 @@ clockcache_get_from_disk_async(clockcache_get_async_state *state, uint64 depth) // we've acquired an entry, because other threads could now be waiting on the // load to finish, and there is no way for them to handle our failure to load // the page. + if (!SUCCESS(state->rc)) { + platform_error_log("clockcache_get_from_disk_async: " + "io_async_state_init failed for addr %lu, entry %lu: " + "%s\n", + state->addr, + state->entry_number, + platform_status_to_string(state->rc)); + } platform_assert_status_ok(state->rc); state->rc = io_async_state_append_page(state->iostate, state->entry->page.data); + if (!SUCCESS(state->rc)) { + platform_error_log("clockcache_get_from_disk_async: " + "io_async_state_append_page failed for addr %lu, " + "entry %lu: %s\n", + state->addr, + state->entry_number, + platform_status_to_string(state->rc)); + } platform_assert_status_ok(state->rc); if (state->cc->cfg->use_stats) { @@ -1901,7 +1986,15 @@ clockcache_get_from_disk_async(clockcache_get_async_state *state, uint64 depth) while (io_async_run(state->iostate) != ASYNC_STATUS_DONE) { async_yield(state); } - platform_assert_status_ok(io_async_state_get_result(state->iostate)); + state->rc = io_async_state_get_result(state->iostate); + if (!SUCCESS(state->rc)) { + platform_error_log("clockcache_get_from_disk_async: async read failed " + "for addr %lu, entry %lu: %s\n", + state->addr, + state->entry_number, + platform_status_to_string(state->rc)); + } + platform_assert_status_ok(state->rc); io_async_state_deinit(state->iostate); if (state->cc->cfg->use_stats) { @@ -2183,20 +2276,53 @@ clockcache_page_sync(clockcache *cc, } if (!is_blocking) { - state = TYPED_MALLOC(cc->heap_id, state); + state = TYPED_MALLOC(PROCESS_PRIVATE_HEAP_ID, state); + if (state == NULL) { + platform_error_log("clockcache_page_sync: async_io_state allocation " + "failed for addr %lu, entry %u, type %u\n", + addr, + entry_number, + type); + } platform_assert(state); state->cc = cc; state->outstanding_pages = NULL; - io_async_state_init(state->iostate, - cc->io, - io_async_pwritev, - addr, - clockcache_write_callback, - state); - io_async_state_append_page(state->iostate, page->data); + status = io_async_state_init(state->iostate, + cc->io, + io_async_pwritev, + addr, + clockcache_write_callback, + state); + if (!SUCCESS(status)) { + platform_error_log("clockcache_page_sync: io_async_state_init failed " + "for addr %lu, entry %u, type %u: %s\n", + addr, + entry_number, + type, + platform_status_to_string(status)); + } + platform_assert_status_ok(status); + status = io_async_state_append_page(state->iostate, page->data); + if (!SUCCESS(status)) { + platform_error_log("clockcache_page_sync: io_async_state_append_page " + "failed for addr %lu, entry %u, type %u: %s\n", + addr, + entry_number, + type, + platform_status_to_string(status)); + } + platform_assert_status_ok(status); io_async_run(state->iostate); } else { status = io_write(cc->io, page->data, clockcache_page_size(cc), addr); + if (!SUCCESS(status)) { + platform_error_log("clockcache_page_sync: io_write failed for addr " + "%lu, entry %u, type %u: %s\n", + addr, + entry_number, + type, + platform_status_to_string(status)); + } platform_assert_status_ok(status); clockcache_log(addr, entry_number, @@ -2243,19 +2369,47 @@ clockcache_extent_sync(clockcache *cc, uint64 addr, uint64 *pages_outstanding) { if (state == NULL) { req_addr = page_addr; - state = TYPED_MALLOC(cc->heap_id, state); + state = TYPED_MALLOC(PROCESS_PRIVATE_HEAP_ID, state); + if (state == NULL) { + platform_error_log("clockcache_extent_sync: async_io_state " + "allocation failed for extent addr %lu, " + "page addr %lu, entry %u\n", + addr, + page_addr, + entry_number); + } platform_assert(state); state->cc = cc; state->outstanding_pages = pages_outstanding; - io_async_state_init(state->iostate, - cc->io, - io_async_pwritev, - req_addr, - clockcache_write_callback, - state); + platform_status rc = io_async_state_init(state->iostate, + cc->io, + io_async_pwritev, + req_addr, + clockcache_write_callback, + state); + if (!SUCCESS(rc)) { + platform_error_log("clockcache_extent_sync: " + "io_async_state_init failed for extent addr " + "%lu, req addr %lu, entry %u: %s\n", + addr, + req_addr, + entry_number, + platform_status_to_string(rc)); + } + platform_assert_status_ok(rc); } - io_async_state_append_page( + platform_status rc = io_async_state_append_page( state->iostate, clockcache_get_entry(cc, entry_number)->page.data); + if (!SUCCESS(rc)) { + platform_error_log("clockcache_extent_sync: " + "io_async_state_append_page failed for extent " + "addr %lu, page addr %lu, entry %u: %s\n", + addr, + page_addr, + entry_number, + platform_status_to_string(rc)); + } + platform_assert_status_ok(rc); req_count++; } else { // ALEX: There is maybe a race with eviction with this assertion @@ -2316,7 +2470,13 @@ clockcache_prefetch_callback(void *pfs) return; } - platform_assert_status_ok(io_async_state_get_result(state->iostate)); + platform_status rc = io_async_state_get_result(state->iostate); + if (!SUCCESS(rc)) { + platform_error_log("clockcache_prefetch_callback: async read failed: " + "%s\n", + platform_status_to_string(rc)); + } + platform_assert_status_ok(rc); const struct iovec *iovec; uint64 count; @@ -2351,7 +2511,7 @@ clockcache_prefetch_callback(void *pfs) } io_async_state_deinit(state->iostate); - platform_free(cc->heap_id, state); + platform_free(PROCESS_PRIVATE_HEAP_ID, state); } /* @@ -2365,6 +2525,7 @@ void clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) { async_io_state *state = NULL; + uint64 state_num_pages = 0; uint64 pages_per_extent = cc->cfg->pages_per_extent; threadid tid = platform_get_tid(); @@ -2385,17 +2546,17 @@ clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) clockcache_dec_ref(cc, entry_no, tid); // fallthrough case GET_RC_CONFLICT: - // in cache, issue IO req if started + // in cache, issue IO req if pages have been queued if (state != NULL) { + platform_assert(state_num_pages > 0); if (cc->cfg->use_stats) { threadid tid = platform_get_tid(); - uint64 count; - io_async_state_get_iovec(state->iostate, &count); - cc->stats[tid].page_reads[type] += count; + cc->stats[tid].page_reads[type] += state_num_pages; cc->stats[tid].prefetches_issued[type]++; } io_async_run(state->iostate); - state = NULL; + state = NULL; + state_num_pages = 0; } clockcache_log(addr, entry_no, @@ -2412,24 +2573,59 @@ clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) entry->page.disk_addr = addr; entry->type = type; uint64 lookup_no = clockcache_divide_by_page_size(cc, addr); - if (__sync_bool_compare_and_swap( - &cc->lookup[lookup_no], CC_UNMAPPED_ENTRY, free_entry_no)) - { + if (state == NULL) { + // start a new IO req before publishing the loading entry + state = TYPED_MALLOC(PROCESS_PRIVATE_HEAP_ID, state); if (state == NULL) { - // start a new IO req - state = TYPED_MALLOC(cc->heap_id, state); - platform_assert(state); - state->cc = cc; + platform_error_log("clockcache_prefetch: async_io_state " + "allocation failed for base addr %lu, " + "page addr %lu, type %u\n", + base_addr, + addr, + type); + clockcache_release_unpublished_entry(entry); + return; + } + state->cc = cc; + platform_status rc = io_async_state_init(state->iostate, cc->io, io_async_preadv, addr, clockcache_prefetch_callback, state); + if (!SUCCESS(rc)) { + platform_error_log("clockcache_prefetch: " + "io_async_state_init failed for base addr " + "%lu, page addr %lu, type %u: %s\n", + base_addr, + addr, + type, + platform_status_to_string(rc)); + clockcache_release_unpublished_entry(entry); + platform_free(PROCESS_PRIVATE_HEAP_ID, state); + state = NULL; + return; } + } + if (__sync_bool_compare_and_swap( + &cc->lookup[lookup_no], CC_UNMAPPED_ENTRY, free_entry_no)) + { platform_status rc = io_async_state_append_page(state->iostate, entry->page.data); + if (!SUCCESS(rc)) { + platform_error_log("clockcache_prefetch: " + "io_async_state_append_page failed for " + "base addr %lu, page addr %lu, entry %u, " + "type %u: %s\n", + base_addr, + addr, + free_entry_no, + type, + platform_status_to_string(rc)); + } platform_assert_status_ok(rc); + state_num_pages++; clockcache_log(addr, entry_no, "prefetch (load): entry %u addr %lu\n", @@ -2440,10 +2636,12 @@ clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) * someone else is already loading this page, release the free * entry and retry */ - entry->page.disk_addr = CC_UNMAPPED_ADDR; - entry->type = PAGE_TYPE_INVALID; - platform_assert(entry->waiters.head == NULL); - entry->status = CC_FREE_STATUS; + clockcache_release_unpublished_entry(entry); + if (state_num_pages == 0) { + io_async_state_deinit(state->iostate); + platform_free(PROCESS_PRIVATE_HEAP_ID, state); + state = NULL; + } page_off--; } break; @@ -2452,17 +2650,17 @@ clockcache_prefetch(clockcache *cc, uint64 base_addr, page_type type) platform_assert(0); } } - // issue IO req if started + // issue IO req if pages have been queued if (state != NULL) { + platform_assert(state_num_pages > 0); if (cc->cfg->use_stats) { threadid tid = platform_get_tid(); - uint64 count; - io_async_state_get_iovec(state->iostate, &count); - cc->stats[tid].page_reads[type] += count; + cc->stats[tid].page_reads[type] += state_num_pages; cc->stats[tid].prefetches_issued[type]++; } io_async_run(state->iostate); - state = NULL; + state = NULL; + state_num_pages = 0; } } @@ -3146,6 +3344,10 @@ clockcache_init(clockcache *cc, // OUT platform_status rc = platform_buffer_init( &cc->lookup_bh, allocator_page_capacity * sizeof(cc->lookup[0])); if (!SUCCESS(rc)) { + platform_error_log("clockcache_init: failed to allocate lookup table " + "(%lu bytes): %s\n", + allocator_page_capacity * sizeof(cc->lookup[0]), + platform_status_to_string(rc)); goto alloc_error; } cc->lookup = platform_buffer_getaddr(&cc->lookup_bh); @@ -3156,6 +3358,10 @@ clockcache_init(clockcache *cc, // OUT rc = platform_buffer_init(&cc->entry_bh, cc->cfg->page_capacity * sizeof(cc->entry[0])); if (!SUCCESS(rc)) { + platform_error_log("clockcache_init: failed to allocate entries " + "(%lu bytes): %s\n", + cc->cfg->page_capacity * sizeof(cc->entry[0]), + platform_status_to_string(rc)); goto alloc_error; } cc->entry = platform_buffer_getaddr(&cc->entry_bh); @@ -3163,6 +3369,10 @@ clockcache_init(clockcache *cc, // OUT /* data must be aligned because of O_DIRECT */ rc = platform_buffer_init(&cc->data_bh, cc->cfg->capacity); if (!SUCCESS(rc)) { + platform_error_log("clockcache_init: failed to allocate page data " + "(%lu bytes): %s\n", + cc->cfg->capacity, + platform_status_to_string(rc)); goto alloc_error; } cc->data = platform_buffer_getaddr(&cc->data_bh); @@ -3183,6 +3393,10 @@ clockcache_init(clockcache *cc, // OUT rc = platform_buffer_init(&cc->rc_bh, refcount_size); if (!SUCCESS(rc)) { + platform_error_log("clockcache_init: failed to allocate refcounts " + "(%lu bytes): %s\n", + refcount_size, + platform_status_to_string(rc)); goto alloc_error; } cc->refcount = platform_buffer_getaddr(&cc->rc_bh); @@ -3191,6 +3405,10 @@ clockcache_init(clockcache *cc, // OUT rc = platform_buffer_init(&cc->pincount_bh, cc->cfg->page_capacity * sizeof(cc->pincount[0])); if (!SUCCESS(rc)) { + platform_error_log("clockcache_init: failed to allocate pincounts " + "(%lu bytes): %s\n", + cc->cfg->page_capacity * sizeof(cc->pincount[0]), + platform_status_to_string(rc)); goto alloc_error; } cc->pincount = platform_buffer_getaddr(&cc->pincount_bh); @@ -3207,6 +3425,11 @@ clockcache_init(clockcache *cc, // OUT cc->cfg->page_capacity / CC_ENTRIES_PER_BATCH * sizeof(cc->batch_busy[0])); if (!SUCCESS(rc)) { + platform_error_log("clockcache_init: failed to allocate batch state " + "(%lu bytes): %s\n", + cc->cfg->page_capacity / CC_ENTRIES_PER_BATCH + * sizeof(cc->batch_busy[0]), + platform_status_to_string(rc)); goto alloc_error; } cc->batch_busy = platform_buffer_getaddr(&cc->batch_bh); diff --git a/src/core.c b/src/core.c index e9cea7f6..0c5232dc 100644 --- a/src/core.c +++ b/src/core.c @@ -133,7 +133,7 @@ typedef struct ONDISK core_super_block { * Super block functions *----------------------------------------------------------------------------- */ -static void +static platform_status core_set_super_block(core_handle *spl, bool32 is_checkpoint, bool32 is_unmount, @@ -150,7 +150,14 @@ core_set_super_block(core_handle *spl, } else { rc = allocator_get_super_addr(spl->al, spl->id, &super_addr); } - platform_assert_status_ok(rc); + if (!SUCCESS(rc)) { + platform_error_log("core_set_super_block: failed to %s super block " + "address for root id %lu: %s\n", + is_create ? "allocate" : "get", + spl->id, + platform_status_to_string(rc)); + return rc; + } super_page = cache_get(spl->cc, super_addr, TRUE, PAGE_TYPE_SUPERBLOCK); while (!cache_try_claim(spl->cc, super_page)) { platform_sleep_ns(wait); @@ -166,13 +173,9 @@ core_set_super_block(core_handle *spl, trunk_init_root_handle(&spl->trunk_context, &root_handle); uint64 root_addr = trunk_ondisk_node_handle_addr(&root_handle); if (root_addr != 0) { - super->root_addr = root_addr; - rc = trunk_inc_ref(spl->al, super->root_addr); - platform_assert_status_ok(rc); - - } else { - super->root_addr = 0; + trunk_inc_ref(spl->al, root_addr); } + super->root_addr = root_addr; trunk_ondisk_node_handle_deinit(&root_handle); if (spl->cfg.use_log) { @@ -205,8 +208,16 @@ core_set_super_block(core_handle *spl, spl->al, spl->ts, old_root_addr); - platform_assert_status_ok(rc); + if (!SUCCESS(rc)) { + platform_error_log("core_set_super_block: trunk_dec_ref failed for " + "old root addr %lu: %s\n", + old_root_addr, + platform_status_to_string(rc)); + return rc; + } } + + return STATUS_OK; } static core_super_block * @@ -288,6 +299,57 @@ core_get_compacted_memtable(core_handle *spl, uint64 generation) return &spl->compacted_memtable[memtable_idx]; } +static bool32 +core_memtable_state_has_compacted_branch(memtable_state state) +{ + return state == MEMTABLE_STATE_COMPACTED + || state == MEMTABLE_STATE_INCORPORATION_ASSIGNED + || state == MEMTABLE_STATE_INCORPORATION_FAILED; +} + +static uint64 +core_memtable_compacted_branch_root(core_handle *spl, uint64 generation) +{ + memtable *mt = core_get_memtable(spl, generation); + if (!core_memtable_state_has_compacted_branch(mt->state)) { + return 0; + } + + core_compacted_memtable *cmt = core_get_compacted_memtable(spl, generation); + return cmt->branch.root_addr; +} + +static void +core_memtable_release_compacted_branch(core_handle *spl, uint64 generation) +{ + core_compacted_memtable *cmt = core_get_compacted_memtable(spl, generation); + if (cmt->branch.root_addr == 0) { + return; + } + + btree_dec_ref( + spl->cc, spl->cfg.btree_cfg, cmt->branch.root_addr, PAGE_TYPE_BRANCH); + cmt->branch.root_addr = 0; +} + +static void +core_memtable_mark_incorporation_failed(core_handle *spl, + uint64 generation, + platform_status status) +{ + memtable_block_lookups(&spl->mt_ctxt); + memtable *mt = core_get_memtable(spl, generation); + platform_error_log("Memtable incorporation failed: generation=%lu " + "state=%s status=%s memtable_root=%lu\n", + generation, + memtable_state_string(mt->state), + platform_status_to_string(status), + mt->root_addr); + memtable_mark_incorporation_failed(mt, status); + core_memtable_release_compacted_branch(spl, generation); + memtable_unblock_lookups(&spl->mt_ctxt); +} + static inline void core_memtable_inc_ref(core_handle *spl, uint64 root_addr) { @@ -424,6 +486,7 @@ core_memtable_compact(core_handle *spl, uint64 generation, const threadid tid) itor, rflimit, rfcfg->seed, + FALSE, spl->heap_id); uint64 pack_start; if (spl->cfg.use_stats) { @@ -514,14 +577,21 @@ core_try_continue_incorporate(core_handle *spl, uint64 next_generation) return should_continue; } -static void +static platform_status core_memtable_incorporate(core_handle *spl, uint64 generation, const threadid tid) { platform_stream_handle stream; platform_status rc = core_open_log_stream_if_enabled(spl, &stream); - platform_assert_status_ok(rc); + if (!SUCCESS(rc)) { + platform_error_log("core_memtable_incorporate: failed to open log " + "stream for generation %lu: %s\n", + generation, + platform_status_to_string(rc)); + core_memtable_mark_incorporation_failed(spl, generation, rc); + return rc; + } core_log_stream_if_enabled( spl, &stream, "incorporate memtable gen %lu\n", generation); core_log_stream_if_enabled( @@ -534,9 +604,15 @@ core_memtable_incorporate(core_handle *spl, flush_start = platform_get_timestamp(); } rc = trunk_incorporate_prepare(&spl->trunk_context, cmt->branch.root_addr); - platform_assert_status_ok(rc); + if (!SUCCESS(rc)) { + platform_error_log("trunk_incorporate_prepare failed: %s\n", + platform_status_to_string(rc)); + core_close_log_stream_if_enabled(spl, &stream); + core_memtable_mark_incorporation_failed(spl, generation, rc); + return rc; + } btree_dec_ref( - spl->cc, spl->cfg.btree_cfg, cmt->branch.root_addr, PAGE_TYPE_MEMTABLE); + spl->cc, spl->cfg.btree_cfg, cmt->branch.root_addr, PAGE_TYPE_BRANCH); if (spl->cfg.use_stats) { spl->stats[tid].memtable_flush_wait_time_ns += platform_timestamp_elapsed(cmt->wait_start); @@ -581,6 +657,8 @@ core_memtable_incorporate(core_handle *spl, spl->stats[tid].memtable_flush_time_max_ns = flush_start; } } + + return STATUS_OK; } /* @@ -590,7 +668,7 @@ core_memtable_incorporate(core_handle *spl, * context of the foreground thread. If background threads are enabled, this * function is called in the context of the memtable worker thread. */ -static void +static platform_status core_memtable_flush_internal(core_handle *spl, uint64 generation) { const threadid tid = platform_get_tid(); @@ -602,18 +680,26 @@ core_memtable_flush_internal(core_handle *spl, uint64 generation) goto out; } do { - core_memtable_incorporate(spl, generation, tid); + platform_status rc = core_memtable_incorporate(spl, generation, tid); + if (!SUCCESS(rc)) { + return rc; + } generation++; } while (core_try_continue_incorporate(spl, generation)); out: - return; + return STATUS_OK; } static void core_memtable_flush_internal_virtual(task *arg) { core_memtable_args *mt_args = container_of(arg, core_memtable_args, tsk); - core_memtable_flush_internal(mt_args->spl, mt_args->generation); + platform_status rc = + core_memtable_flush_internal(mt_args->spl, mt_args->generation); + if (!SUCCESS(rc)) { + platform_error_log("memtable flush failed: %s\n", + platform_status_to_string(rc)); + } } /* @@ -1607,6 +1693,13 @@ core_apply_to_range(core_handle *spl, { core_range_iterator *range_itor = TYPED_MALLOC(PROCESS_PRIVATE_HEAP_ID, range_itor); + if (range_itor == NULL) { + platform_error_log("core_apply_to_range: failed to allocate range " + "iterator for %lu tuples\n", + num_tuples); + return STATUS_NO_MEMORY; + } + platform_status rc = core_range_iterator_init(spl, range_itor, greater_than_or_equal, @@ -1617,6 +1710,9 @@ core_apply_to_range(core_handle *spl, start_key, num_tuples); if (!SUCCESS(rc)) { + platform_error_log("core_apply_to_range: range iterator init failed: " + "%s\n", + platform_status_to_string(rc)); goto destroy_range_itor; } @@ -1628,6 +1724,8 @@ core_apply_to_range(core_handle *spl, func(curr_key, data, arg); rc = iterator_next(&range_itor->super); if (!SUCCESS(rc)) { + platform_error_log("core_apply_to_range: iterator_next failed: %s\n", + platform_status_to_string(rc)); goto destroy_range_itor; } } @@ -1638,6 +1736,84 @@ core_apply_to_range(core_handle *spl, return rc; } +static void +core_stats_destroy(platform_heap_id heap_id, core_stats *stats) +{ + if (stats == NULL) { + return; + } + + for (uint64 i = 0; i < MAX_THREADS; i++) { + histogram_destroy(heap_id, stats[i].insert_latency_histo); + histogram_destroy(heap_id, stats[i].update_latency_histo); + histogram_destroy(heap_id, stats[i].delete_latency_histo); + } + platform_free(heap_id, stats); +} + +static core_stats * +core_stats_create(platform_heap_id heap_id) +{ + core_stats *stats = TYPED_ARRAY_ZALLOC(heap_id, stats, MAX_THREADS); + if (stats == NULL) { + platform_error_log("core_stats_create: failed to allocate stats array " + "for %u threads\n", + MAX_THREADS); + return NULL; + } + + for (uint64 i = 0; i < MAX_THREADS; i++) { + stats[i].insert_latency_histo = histogram_create( + heap_id, LATENCYHISTO_SIZE + 1, latency_histo_buckets); + if (stats[i].insert_latency_histo == NULL) { + platform_error_log("core_stats_create: failed to allocate insert " + "latency histogram for thread %lu\n", + i); + goto cleanup; + } + stats[i].update_latency_histo = histogram_create( + heap_id, LATENCYHISTO_SIZE + 1, latency_histo_buckets); + if (stats[i].update_latency_histo == NULL) { + platform_error_log("core_stats_create: failed to allocate update " + "latency histogram for thread %lu\n", + i); + goto cleanup; + } + stats[i].delete_latency_histo = histogram_create( + heap_id, LATENCYHISTO_SIZE + 1, latency_histo_buckets); + if (stats[i].delete_latency_histo == NULL) { + platform_error_log("core_stats_create: failed to allocate delete " + "latency histogram for thread %lu\n", + i); + goto cleanup; + } + } + + return stats; + +cleanup: + core_stats_destroy(heap_id, stats); + return NULL; +} + +static platform_status +core_create_stats(core_handle *spl) +{ + if (!spl->cfg.use_stats) { + return STATUS_OK; + } + + spl->stats = core_stats_create(spl->heap_id); + return spl->stats == NULL ? STATUS_NO_MEMORY : STATUS_OK; +} + +static void +core_destroy_stats(core_handle *spl) +{ + core_stats_destroy(spl->heap_id, spl->stats); + spl->stats = NULL; +} + /* Format the disk and mount the database */ platform_status @@ -1668,43 +1844,56 @@ core_mkfs(core_handle *spl, core_memtable_flush_virtual, spl); if (!SUCCESS(rc)) { + platform_error_log("core_mkfs: memtable_context_init failed: %s\n", + platform_status_to_string(rc)); return rc; } // set up the log if (spl->cfg.use_log) { spl->log = log_create(cc, spl->cfg.log_cfg, spl->heap_id); + if (spl->log == NULL) { + platform_error_log("core_mkfs: log_create failed\n"); + rc = STATUS_NO_MEMORY; + goto deinit_memtable_context; + } } - trunk_context_init( + rc = trunk_context_init( &spl->trunk_context, spl->cfg.trunk_node_cfg, hid, cc, al, ts, 0); + if (!SUCCESS(rc)) { + platform_error_log("core_mkfs: trunk_context_init failed: %s\n", + platform_status_to_string(rc)); + goto deinit_log; + } - core_set_super_block(spl, FALSE, FALSE, TRUE); - - if (spl->cfg.use_stats) { - spl->stats = TYPED_ARRAY_ZALLOC(spl->heap_id, spl->stats, MAX_THREADS); - platform_assert(spl->stats); - for (uint64 i = 0; i < MAX_THREADS; i++) { - platform_status rc; - rc = histogram_create(spl->heap_id, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &spl->stats[i].insert_latency_histo); - platform_assert_status_ok(rc); - rc = histogram_create(spl->heap_id, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &spl->stats[i].update_latency_histo); - platform_assert_status_ok(rc); - rc = histogram_create(spl->heap_id, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &spl->stats[i].delete_latency_histo); - platform_assert_status_ok(rc); - } + rc = core_create_stats(spl); + if (!SUCCESS(rc)) { + platform_error_log("core_mkfs: core_create_stats failed: %s\n", + platform_status_to_string(rc)); + goto deinit_trunk_context; } + rc = core_set_super_block(spl, FALSE, FALSE, TRUE); + if (!SUCCESS(rc)) { + platform_error_log("core_mkfs: core_set_super_block failed: %s\n", + platform_status_to_string(rc)); + goto deinit_stats; + } return STATUS_OK; + +deinit_stats: + core_destroy_stats(spl); +deinit_trunk_context: + trunk_context_deinit(&spl->trunk_context); +deinit_log: + if (spl->cfg.use_log) { + platform_free(spl->heap_id, spl->log); + spl->log = NULL; + } +deinit_memtable_context: + memtable_context_deinit(&spl->mt_ctxt); + return rc; } /* @@ -1750,41 +1939,101 @@ core_mount(core_handle *spl, core_memtable_flush_virtual, spl); if (!SUCCESS(rc)) { + platform_error_log("core_mount: memtable_context_init failed: %s\n", + platform_status_to_string(rc)); return rc; } if (spl->cfg.use_log) { spl->log = log_create(cc, spl->cfg.log_cfg, spl->heap_id); + if (spl->log == NULL) { + platform_error_log("core_mount: log_create failed\n"); + rc = STATUS_NO_MEMORY; + goto deinit_memtable_context; + } } - trunk_context_init( + rc = trunk_context_init( &spl->trunk_context, spl->cfg.trunk_node_cfg, hid, cc, al, ts, root_addr); + if (!SUCCESS(rc)) { + platform_error_log("core_mount: trunk_context_init failed: %s\n", + platform_status_to_string(rc)); + goto deinit_log; + } - core_set_super_block(spl, FALSE, FALSE, FALSE); + rc = core_create_stats(spl); + if (!SUCCESS(rc)) { + platform_error_log("core_mount: core_create_stats failed: %s\n", + platform_status_to_string(rc)); + goto deinit_trunk_context; + } - if (spl->cfg.use_stats) { - spl->stats = TYPED_ARRAY_ZALLOC(spl->heap_id, spl->stats, MAX_THREADS); - platform_assert(spl->stats); - for (uint64 i = 0; i < MAX_THREADS; i++) { - platform_status rc; - rc = histogram_create(spl->heap_id, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &spl->stats[i].insert_latency_histo); - platform_assert_status_ok(rc); - rc = histogram_create(spl->heap_id, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &spl->stats[i].update_latency_histo); - platform_assert_status_ok(rc); - rc = histogram_create(spl->heap_id, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &spl->stats[i].delete_latency_histo); - platform_assert_status_ok(rc); - } + rc = core_set_super_block(spl, FALSE, FALSE, FALSE); + if (!SUCCESS(rc)) { + platform_error_log("core_mount: core_set_super_block failed: %s\n", + platform_status_to_string(rc)); + goto deinit_stats; } return STATUS_OK; + +deinit_stats: + core_destroy_stats(spl); +deinit_trunk_context: + trunk_context_deinit(&spl->trunk_context); +deinit_log: + if (spl->cfg.use_log) { + platform_free(spl->heap_id, spl->log); + spl->log = NULL; + } +deinit_memtable_context: + memtable_context_deinit(&spl->mt_ctxt); + return rc; +} + +static bool32 +core_report_unincorporated_memtables(core_handle *spl) +{ + bool32 found_unincorporated = FALSE; + uint64 start_generation = memtable_generation_retired(&spl->mt_ctxt) + 1; + uint64 end_generation = memtable_generation(&spl->mt_ctxt); + + for (uint64 generation = start_generation; generation < end_generation; + generation++) + { + memtable *mt = core_try_get_memtable(spl, generation); + if (mt == NULL || mt->state == MEMTABLE_STATE_READY) { + continue; + } + + found_unincorporated = TRUE; + uint64 compacted_root = + core_memtable_compacted_branch_root(spl, generation); + if (mt->state == MEMTABLE_STATE_INCORPORATION_FAILED) { + platform_error_log("Shutdown found memtable from failed " + "incorporation: generation=%lu state=%s " + "status=%s memtable_root=%lu " + "compacted_root=%lu\n", + generation, + memtable_state_string(mt->state), + platform_status_to_string(mt->incorporation_status), + mt->root_addr, + compacted_root); + } else { + platform_error_log("Shutdown found unincorporated memtable: " + "generation=%lu state=%s memtable_root=%lu " + "compacted_root=%lu\n", + generation, + memtable_state_string(mt->state), + mt->root_addr, + compacted_root); + } + + if (compacted_root != 0) { + core_memtable_release_compacted_branch(spl, generation); + } + } + + return found_unincorporated; } /* @@ -1810,6 +2059,8 @@ core_prepare_for_shutdown(core_handle *spl) platform_status rc = task_perform_until_quiescent(spl->ts); platform_assert_status_ok(rc); + core_report_unincorporated_memtables(spl); + // destroy memtable context (and its memtables) memtable_context_deinit(&spl->mt_ctxt); @@ -1829,18 +2080,17 @@ core_prepare_for_shutdown(core_handle *spl) platform_status core_unmount(core_handle *spl) { + platform_status rc; + core_prepare_for_shutdown(spl); - core_set_super_block(spl, FALSE, TRUE, FALSE); - trunk_context_deinit(&spl->trunk_context); - if (spl->cfg.use_stats) { - for (uint64 i = 0; i < MAX_THREADS; i++) { - histogram_destroy(spl->heap_id, &spl->stats[i].insert_latency_histo); - histogram_destroy(spl->heap_id, &spl->stats[i].update_latency_histo); - histogram_destroy(spl->heap_id, &spl->stats[i].delete_latency_histo); - } - platform_free(spl->heap_id, spl->stats); + rc = core_set_super_block(spl, FALSE, TRUE, FALSE); + if (!SUCCESS(rc)) { + platform_error_log("core_unmount: failed to update super block: %s\n", + platform_status_to_string(rc)); } - return STATUS_OK; + trunk_context_deinit(&spl->trunk_context); + core_destroy_stats(spl); + return rc; } /* @@ -1854,14 +2104,7 @@ core_destroy(core_handle *spl) // clear out this splinter table from the meta page. allocator_remove_super_addr(spl->al, spl->id); - if (spl->cfg.use_stats) { - for (uint64 i = 0; i < MAX_THREADS; i++) { - histogram_destroy(spl->heap_id, &spl->stats[i].insert_latency_histo); - histogram_destroy(spl->heap_id, &spl->stats[i].update_latency_histo); - histogram_destroy(spl->heap_id, &spl->stats[i].delete_latency_histo); - } - platform_free(spl->heap_id, spl->stats); - } + core_destroy_stats(spl); } @@ -1938,19 +2181,31 @@ core_print_insertion_stats(platform_log_handle *log_handle, const core_handle *s return; } - histogram_handle insert_lat_accum, update_lat_accum, delete_lat_accum; - histogram_create(PROCESS_PRIVATE_HEAP_ID, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &insert_lat_accum); - histogram_create(PROCESS_PRIVATE_HEAP_ID, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &update_lat_accum); - histogram_create(PROCESS_PRIVATE_HEAP_ID, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &delete_lat_accum); + histogram *insert_lat_accum; + histogram *update_lat_accum; + histogram *delete_lat_accum; + insert_lat_accum = + histogram_create(PROCESS_PRIVATE_HEAP_ID, + LATENCYHISTO_SIZE + 1, + latency_histo_buckets); + update_lat_accum = + histogram_create(PROCESS_PRIVATE_HEAP_ID, + LATENCYHISTO_SIZE + 1, + latency_histo_buckets); + delete_lat_accum = + histogram_create(PROCESS_PRIVATE_HEAP_ID, + LATENCYHISTO_SIZE + 1, + latency_histo_buckets); + if (insert_lat_accum == NULL || update_lat_accum == NULL + || delete_lat_accum == NULL) + { + platform_error_log("Out of memory for statistics\n"); + histogram_destroy(PROCESS_PRIVATE_HEAP_ID, insert_lat_accum); + histogram_destroy(PROCESS_PRIVATE_HEAP_ID, update_lat_accum); + histogram_destroy(PROCESS_PRIVATE_HEAP_ID, delete_lat_accum); + platform_free(PROCESS_PRIVATE_HEAP_ID, global); + return; + } for (thr_i = 0; thr_i < MAX_THREADS; thr_i++) { histogram_merge_in(insert_lat_accum, @@ -2006,9 +2261,9 @@ core_print_insertion_stats(platform_log_handle *log_handle, const core_handle *s histogram_print(insert_lat_accum, "Insert Latency Histogram (ns):", log_handle); histogram_print(update_lat_accum, "Update Latency Histogram (ns):", log_handle); histogram_print(delete_lat_accum, "Delete Latency Histogram (ns):", log_handle); - histogram_destroy(PROCESS_PRIVATE_HEAP_ID, &insert_lat_accum); - histogram_destroy(PROCESS_PRIVATE_HEAP_ID, &update_lat_accum); - histogram_destroy(PROCESS_PRIVATE_HEAP_ID, &delete_lat_accum); + histogram_destroy(PROCESS_PRIVATE_HEAP_ID, insert_lat_accum); + histogram_destroy(PROCESS_PRIVATE_HEAP_ID, update_lat_accum); + histogram_destroy(PROCESS_PRIVATE_HEAP_ID, delete_lat_accum); platform_log(log_handle, "Flush Statistics\n"); @@ -2147,33 +2402,15 @@ void core_reset_stats(core_handle *spl) { if (spl->cfg.use_stats) { - for (threadid thr_i = 0; thr_i < MAX_THREADS; thr_i++) { - histogram_destroy(spl->heap_id, - &spl->stats[thr_i].insert_latency_histo); - histogram_destroy(spl->heap_id, - &spl->stats[thr_i].update_latency_histo); - histogram_destroy(spl->heap_id, - &spl->stats[thr_i].delete_latency_histo); - - memset(&spl->stats[thr_i], 0, sizeof(spl->stats[thr_i])); - - platform_status rc; - rc = histogram_create(spl->heap_id, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &spl->stats[thr_i].insert_latency_histo); - platform_assert_status_ok(rc); - rc = histogram_create(spl->heap_id, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &spl->stats[thr_i].update_latency_histo); - platform_assert_status_ok(rc); - rc = histogram_create(spl->heap_id, - LATENCYHISTO_SIZE + 1, - latency_histo_buckets, - &spl->stats[thr_i].delete_latency_histo); - platform_assert_status_ok(rc); + core_stats *new_stats = core_stats_create(spl->heap_id); + if (new_stats == NULL) { + platform_error_log("core_reset_stats: failed to reset stats: %s\n", + platform_status_to_string(STATUS_NO_MEMORY)); + return; } + + core_destroy_stats(spl); + spl->stats = new_stats; } } diff --git a/src/core.h b/src/core.h index d6728710..e44c5328 100644 --- a/src/core.h +++ b/src/core.h @@ -53,9 +53,9 @@ typedef struct core_stats { uint64 updates; uint64 deletions; - histogram_handle insert_latency_histo; - histogram_handle update_latency_histo; - histogram_handle delete_latency_histo; + histogram *insert_latency_histo; + histogram *update_latency_histo; + histogram *delete_latency_histo; uint64 memtable_flushes; uint64 memtable_flush_time_ns; diff --git a/src/histogram.c b/src/histogram.c index 112e4310..4b0bedb6 100644 --- a/src/histogram.c +++ b/src/histogram.c @@ -12,19 +12,19 @@ #include "platform_log.h" #include "poison.h" -platform_status +histogram * histogram_create(platform_heap_id heap_id, uint32 num_buckets, - const int64 *const bucket_limits, - histogram_handle *histo) + const int64 *const bucket_limits) { - histogram_handle hh; - hh = TYPED_MANUAL_MALLOC(heap_id, - hh, - sizeof(hh) // NOLINT(bugprone-sizeof-expression) - + num_buckets * sizeof(hh->count[0])); + histogram *hh; + hh = TYPED_MANUAL_MALLOC( + heap_id, hh, sizeof(*hh) + num_buckets * sizeof(hh->count[0])); if (!hh) { - return STATUS_NO_MEMORY; + platform_error_log("histogram_create: failed to allocate histogram " + "with %u buckets\n", + num_buckets); + return NULL; } hh->num_buckets = num_buckets; hh->bucket_limits = bucket_limits; @@ -34,21 +34,20 @@ histogram_create(platform_heap_id heap_id, hh->num = 0; memset(hh->count, 0, hh->num_buckets * sizeof(hh->count[0])); - *histo = hh; - return STATUS_OK; + return hh; } void -histogram_destroy(platform_heap_id heap_id, histogram_handle *histo_out) +histogram_destroy(platform_heap_id heap_id, histogram *histo) { - platform_assert(histo_out); - histogram_handle histo = *histo_out; + if (histo == NULL) { + return; + } platform_free(heap_id, histo); - *histo_out = NULL; } void -histogram_print(histogram_handle histo, +histogram_print(histogram *histo, const char *name, platform_log_handle *log_handle) { diff --git a/src/histogram.h b/src/histogram.h index b1801205..a6ab84b4 100644 --- a/src/histogram.h +++ b/src/histogram.h @@ -13,7 +13,6 @@ #include "splinterdb/platform_linux/public_platform.h" #include "platform_heap.h" #include "platform_assert.h" -#include "platform_status.h" typedef struct histogram { unsigned int num_buckets; @@ -21,24 +20,23 @@ typedef struct histogram { long min, max, total; unsigned long num; // no. of elements unsigned long count[]; -} *histogram_handle; +} histogram; -platform_status +histogram * histogram_create(platform_heap_id heap_id, uint32 num_buckets, - const int64 *const bucket_limits, - histogram_handle *histo); + const int64 *const bucket_limits); void -histogram_destroy(platform_heap_id heap_id, histogram_handle *histo); +histogram_destroy(platform_heap_id heap_id, histogram *histo); void -histogram_print(histogram_handle histo, +histogram_print(histogram *histo, const char *name, platform_log_handle *log_handle); static inline void -histogram_insert(histogram_handle histo, int64 datum) +histogram_insert(histogram *histo, int64 datum) { int lo = 0, hi = histo->num_buckets - 1; @@ -64,7 +62,7 @@ histogram_insert(histogram_handle histo, int64 datum) } static inline void -histogram_merge_in(histogram_handle dest_histo, histogram_handle src_histo) +histogram_merge_in(histogram *dest_histo, histogram *src_histo) { uint32 i; if (src_histo->num == 0) { diff --git a/src/memtable.c b/src/memtable.c index 0c2c3ff3..41482e68 100644 --- a/src/memtable.c +++ b/src/memtable.c @@ -17,6 +17,33 @@ #define MEMTABLE_INSERT_LOCK_IDX 0 #define MEMTABLE_LOOKUP_LOCK_IDX 1 +const char * +memtable_state_string(memtable_state state) +{ + switch (state) { + case MEMTABLE_STATE_INVALID: + return "INVALID"; + case MEMTABLE_STATE_READY: + return "READY"; + case MEMTABLE_STATE_FINALIZED: + return "FINALIZED"; + case MEMTABLE_STATE_COMPACTED: + return "COMPACTED"; + case MEMTABLE_STATE_COMPACTING: + return "COMPACTING"; + case MEMTABLE_STATE_INCORPORATION_ASSIGNED: + return "INCORPORATION_ASSIGNED"; + case MEMTABLE_STATE_INCORPORATION_FAILED: + return "INCORPORATION_FAILED"; + case MEMTABLE_STATE_INCORPORATING: + return "INCORPORATING"; + case MEMTABLE_STATE_INCORPORATED: + return "INCORPORATED"; + default: + return "UNKNOWN"; + } +} + bool32 memtable_is_full(const memtable_config *cfg, memtable *mt) { @@ -252,10 +279,21 @@ memtable_recycle(memtable_context *ctxt, memtable *mt) memtable_lock_incorporation_lock(ctxt); mt->generation += ctxt->cfg.max_memtables; memtable_unlock_incorporation_lock(ctxt); + mt->incorporation_status = STATUS_OK; memtable_transition(mt, MEMTABLE_STATE_INCORPORATED, MEMTABLE_STATE_READY); memtable_root_dec_ref(ctxt, old_root_addr); } +void +memtable_mark_incorporation_failed(memtable *mt, platform_status status) +{ + platform_assert(!SUCCESS(status)); + mt->incorporation_status = status; + memtable_transition(mt, + MEMTABLE_STATE_INCORPORATION_ASSIGNED, + MEMTABLE_STATE_INCORPORATION_FAILED); +} + uint64 memtable_force_finalize(memtable_context *ctxt) { @@ -281,14 +319,38 @@ memtable_init(memtable *mt, cache *cc, memtable_config *cfg, uint64 generation) mt->cfg = cfg->btree_cfg; mt->root_addr = btree_create(cc, mt->cfg, &mt->mini, PAGE_TYPE_MEMTABLE); mt->state = MEMTABLE_STATE_READY; + mt->incorporation_status = STATUS_OK; platform_assert(generation < UINT64_MAX); mt->generation = generation; } +static bool32 +memtable_mini_needs_release(memtable *mt) +{ + switch (mt->state) { + case MEMTABLE_STATE_READY: + case MEMTABLE_STATE_FINALIZED: + return TRUE; + case MEMTABLE_STATE_COMPACTING: + case MEMTABLE_STATE_COMPACTED: + case MEMTABLE_STATE_INCORPORATION_ASSIGNED: + case MEMTABLE_STATE_INCORPORATION_FAILED: + case MEMTABLE_STATE_INCORPORATING: + case MEMTABLE_STATE_INCORPORATED: + return FALSE; + case MEMTABLE_STATE_INVALID: + default: + platform_assert(0); + return FALSE; + } +} + void memtable_deinit(cache *cc, memtable *mt) { - mini_release(&mt->mini); + if (memtable_mini_needs_release(mt)) { + mini_release(&mt->mini); + } debug_only bool32 freed = btree_dec_ref(cc, mt->cfg, mt->root_addr, PAGE_TYPE_MEMTABLE); debug_assert(freed); diff --git a/src/memtable.h b/src/memtable.h index 00ce04eb..7c82aadd 100644 --- a/src/memtable.h +++ b/src/memtable.h @@ -10,6 +10,7 @@ #pragma once #include "platform_mutex.h" +#include "platform_status.h" #include "task.h" #include "cache.h" #include "btree.h" @@ -25,6 +26,7 @@ typedef enum memtable_state { MEMTABLE_STATE_COMPACTED, MEMTABLE_STATE_COMPACTING, MEMTABLE_STATE_INCORPORATION_ASSIGNED, + MEMTABLE_STATE_INCORPORATION_FAILED, MEMTABLE_STATE_INCORPORATING, MEMTABLE_STATE_INCORPORATED, NUM_MEMTABLE_STATES, @@ -36,6 +38,7 @@ typedef struct memtable { uint64 root_addr; mini_allocator mini; btree_config *cfg; + platform_status incorporation_status; } PLATFORM_CACHELINE_ALIGNED memtable; static inline bool32 @@ -63,7 +66,11 @@ memtable_try_transition(memtable *mt, case MEMTABLE_STATE_INCORPORATION_ASSIGNED: // This occurs after the lookup lock has been acquired in // incorporate_memtable - debug_assert(new_state == MEMTABLE_STATE_INCORPORATING); + debug_assert(new_state == MEMTABLE_STATE_INCORPORATING + || new_state == MEMTABLE_STATE_INCORPORATION_FAILED); + break; + case MEMTABLE_STATE_INCORPORATION_FAILED: + debug_assert(0); break; case MEMTABLE_STATE_INCORPORATING: // This transition happens when incorporation has completed @@ -83,6 +90,8 @@ memtable_try_transition(memtable *mt, case MEMTABLE_STATE_COMPACTED: debug_assert(actual_old_state != MEMTABLE_STATE_INCORPORATION_ASSIGNED); + debug_assert(actual_old_state + != MEMTABLE_STATE_INCORPORATION_FAILED); debug_assert(actual_old_state != MEMTABLE_STATE_INCORPORATING); break; default: @@ -179,6 +188,12 @@ memtable_insert(memtable_context *ctxt, void memtable_recycle(memtable_context *ctxt, memtable *mt); +void +memtable_mark_incorporation_failed(memtable *mt, platform_status status); + +const char * +memtable_state_string(memtable_state state); + uint64 memtable_force_finalize(memtable_context *ctxt); diff --git a/src/platform_linux/laio.c b/src/platform_linux/laio.c index 901e981b..a563f6b8 100644 --- a/src/platform_linux/laio.c +++ b/src/platform_linux/laio.c @@ -181,6 +181,12 @@ laio_read(io_handle *ioh, void *buf, uint64 bytes, uint64 addr) if (ret == bytes) { return STATUS_OK; } + platform_error_log("laio_read: pread failed for addr %lu, bytes %lu, " + "ret %d: %s\n", + addr, + bytes, + ret, + strerror(errno)); return STATUS_IO_ERROR; } @@ -198,6 +204,12 @@ laio_write(io_handle *ioh, void *buf, uint64 bytes, uint64 addr) if (ret == bytes) { return STATUS_OK; } + platform_error_log("laio_write: pwrite failed for addr %lu, bytes %lu, " + "ret %d: %s\n", + addr, + bytes, + ret, + strerror(errno)); return STATUS_IO_ERROR; } @@ -236,8 +248,15 @@ laio_get_thread_context(laio_handle *io) } io->ctx[pid].state = PROCESS_CONTEXT_STATE_INITIALIZED; async_wait_queue_init(&io->ctx[pid].submit_waiters); - pthread_create( + int pthread_status = pthread_create( &io->ctx[pid].io_cleaner, NULL, laio_cleaner, &io->ctx[pid]); + if (pthread_status != 0) { + platform_error_log("pthread_create() failed for laio cleaner " + "PID=%lu: %s\n", + pid, + strerror(pthread_status)); + } + platform_assert(pthread_status == 0); __sync_lock_release(&io->ctx[pid].lock); } return &io->ctx[pid]; @@ -271,7 +290,7 @@ laio_async_state_deinit(io_async_state *ios) { laio_async_state *lios = (laio_async_state *)ios; if (lios->iovs != lios->iov) { - platform_free(lios->io->heap_id, lios->iovs); + platform_free(PROCESS_PRIVATE_HEAP_ID, lios->iovs); } } @@ -283,6 +302,11 @@ laio_async_state_append_page(io_async_state *ios, void *buf) lios->io->cfg->extent_size / lios->io->cfg->page_size; if (lios->iovlen == pages_per_extent) { + platform_error_log("laio_async_state_append_page: request for addr %lu " + "already has %lu pages, limit %lu\n", + lios->addr, + lios->iovlen, + pages_per_extent); return STATUS_LIMIT_EXCEEDED; } @@ -332,6 +356,13 @@ laio_async_run(io_async_state *gios) } ios->pctx = laio_get_thread_context(ios->io); + if (ios->pctx == NULL) { + platform_error_log("laio_async_run: failed to get thread context for " + "addr %lu\n", + ios->addr); + ios->status = -EIO; + async_return(ios); + } if (ios->cmd == io_async_preadv) { io_prep_preadv(&ios->req, ios->io->fd, ios->iovs, ios->iovlen, ios->addr); } else { @@ -440,6 +471,13 @@ laio_async_state_get_result(io_async_state *gios) { laio_async_state *ios = (laio_async_state *)gios; if (ios->status < 0) { + platform_error_log("laio_async_state_get_result: async %s failed for " + "addr %lu, pages %lu, status %d: %s\n", + ios->cmd == io_async_preadv ? "read" : "write", + ios->addr, + ios->iovlen, + ios->status, + strerror(-ios->status)); return STATUS_IO_ERROR; } @@ -486,8 +524,13 @@ laio_async_state_init(io_async_state *state, { ios->iovs = ios->iov; } else { - ios->iovs = TYPED_ARRAY_MALLOC(io->heap_id, ios->iovs, pages_per_extent); + ios->iovs = TYPED_ARRAY_MALLOC( + PROCESS_PRIVATE_HEAP_ID, ios->iovs, pages_per_extent); if (ios->iovs == NULL) { + platform_error_log("laio_async_state_init: failed to allocate iovec " + "array for addr %lu, pages_per_extent %lu\n", + addr, + pages_per_extent); return STATUS_NO_MEMORY; } } @@ -560,6 +603,10 @@ laio_wakeup_cleaner(io_process_context *pctx, io_set_callback(iocb, LAIO_CLEANER_TERMINATE_CALLBACK); int status = io_submit(pctx->ctx, 1, &iocb); if (status != 1) { + platform_error_log("laio_wakeup_cleaner: io_submit failed, status=%d: " + "%s\n", + status, + status < 0 ? strerror(-status) : "unexpected result"); return STATUS_IO_ERROR; } return STATUS_OK; @@ -646,6 +693,7 @@ laio_handle_create(io_config *cfg, platform_heap_id hid) laio_handle *io = TYPED_MALLOC(hid, io); if (io == NULL) { + platform_error_log("laio_handle_create: failed to allocate io handle\n"); return NULL; } @@ -716,11 +764,17 @@ laio_handle_create(io_config *cfg, platform_heap_id hid) io->pecnode.termination = laio_process_termination_callback; io->pecnode.arg = io; - platform_linux_add_process_event_callback(&io->pecnode); + rc = platform_linux_add_process_event_callback(&io->pecnode); + if (!SUCCESS(rc)) { + platform_error_log("failed to register process event callback: %s\n", + platform_status_to_string(rc)); + goto process_event_callback_failed; + } // leave req_hand set to 0 return (io_handle *)io; +process_event_callback_failed: write_failed: seek_failed: no_fallocate_and_bad_size: diff --git a/src/platform_linux/platform_heap.c b/src/platform_linux/platform_heap.c index 624e09b0..d0790659 100644 --- a/src/platform_linux/platform_heap.c +++ b/src/platform_linux/platform_heap.c @@ -13,6 +13,286 @@ */ platform_heap_id Heap_id = NULL; +#define PLATFORM_MEMORY_FAULT_CONFIG_DEFAULT \ + { \ + .mode = PLATFORM_MEMORY_FAULT_RANGE, \ + .range_start = 0, \ + .range_count = 0, \ + .seed = 0, \ + .random_fail_probability = 1000, \ + .random_burst_start_probability = 1000, \ + .random_burst_fail_probability = 850000, \ + .random_burst_min_length = 1, \ + .random_burst_max_length = 4096, \ + .max_failures = UINT64_MAX, \ + .verbose = FALSE, \ + } + +static platform_memory_fault_config Platform_memory_fault_config = + PLATFORM_MEMORY_FAULT_CONFIG_DEFAULT; + +void +platform_memory_fault_config_get(platform_memory_fault_config *cfg) +{ + *cfg = Platform_memory_fault_config; +} + +void +platform_memory_fault_config_set(const platform_memory_fault_config *cfg) +{ + Platform_memory_fault_config = *cfg; + platform_memory_fault_reset_counters(); +} + +#if PLATFORM_MEMORY_FAULT_INJECTION + +static uint32 Platform_memory_fault_enabled; +static uint64 Platform_memory_fault_alloc_count; +static uint64 Platform_memory_fault_failure_count; +static uint64 Platform_memory_fault_burst_remaining; + +static inline uint64 +platform_memory_fault_mix(uint64 x) +{ + x += UINT64_C(0x9e3779b97f4a7c15); + x = (x ^ (x >> 30)) * UINT64_C(0xbf58476d1ce4e5b9); + x = (x ^ (x >> 27)) * UINT64_C(0x94d049bb133111eb); + return x ^ (x >> 31); +} + +static inline bool32 +platform_memory_fault_chance(const platform_memory_fault_config *cfg, + uint64 alloc_no, + uint64 stream, + uint32 probability) +{ + if (probability == 0) { + return FALSE; + } + if (probability >= PLATFORM_MEMORY_FAULT_PROBABILITY_SCALE) { + return TRUE; + } + + uint64 sample = platform_memory_fault_mix( + cfg->seed ^ alloc_no ^ (stream * UINT64_C(0x9e3779b97f4a7c15))); + return sample % PLATFORM_MEMORY_FAULT_PROBABILITY_SCALE < probability; +} + +static inline uint64 +platform_memory_fault_random_below(uint64 seed, uint64 alloc_no, uint64 max) +{ + if (max == 0) { + return 0; + } + return platform_memory_fault_mix(seed ^ alloc_no) % max; +} + +static uint64 +platform_memory_fault_burst_length(const platform_memory_fault_config *cfg, + uint64 alloc_no) +{ + uint64 min_length = cfg->random_burst_min_length; + uint64 max_length = cfg->random_burst_max_length; + + if (max_length < min_length) { + return 0; + } + + uint64 bucket = + platform_memory_fault_random_below(cfg->seed, alloc_no ^ 0xbad51eed, 100); + uint64 low = min_length; + uint64 high; + + if (bucket < 70) { + high = MIN(max_length, MAX(min_length, 10)); + } else if (bucket < 95) { + low = MAX(min_length, 10); + high = MIN(max_length, MAX(low, 100)); + } else { + low = MAX(min_length, 100); + high = max_length; + } + + if (high < low) { + low = min_length; + high = max_length; + } + + return low + + platform_memory_fault_random_below( + cfg->seed, alloc_no ^ 0x51eed5eed, high - low + 1); +} + +static bool32 +platform_memory_fault_consume_burst(void) +{ + for (;;) { + uint64 remaining = Platform_memory_fault_burst_remaining; + if (remaining == 0) { + return FALSE; + } + if (__sync_bool_compare_and_swap( + &Platform_memory_fault_burst_remaining, remaining, remaining - 1)) + { + return TRUE; + } + } +} + +void +platform_memory_fault_enable(void) +{ + __sync_lock_test_and_set(&Platform_memory_fault_enabled, TRUE); +} + +void +platform_memory_fault_disable(void) +{ + __sync_lock_test_and_set(&Platform_memory_fault_enabled, FALSE); + __sync_lock_test_and_set(&Platform_memory_fault_burst_remaining, 0); +} + +void +platform_memory_fault_reset_counters(void) +{ + __sync_lock_test_and_set(&Platform_memory_fault_alloc_count, 0); + __sync_lock_test_and_set(&Platform_memory_fault_failure_count, 0); + __sync_lock_test_and_set(&Platform_memory_fault_burst_remaining, 0); +} + +platform_memory_fault_counters +platform_memory_fault_get_counters(void) +{ + return (platform_memory_fault_counters){ + .alloc_count = + __sync_fetch_and_add(&Platform_memory_fault_alloc_count, 0), + .failure_count = + __sync_fetch_and_add(&Platform_memory_fault_failure_count, 0), + }; +} + +bool32 +platform_memory_fault_should_fail(platform_heap_id heap_id, + size_t size, + const char *objname, + const char *func, + const char *file, + int lineno) +{ + (void)heap_id; + + if (!__sync_fetch_and_add(&Platform_memory_fault_enabled, 0) || size == 0) { + return FALSE; + } + + platform_memory_fault_config cfg = Platform_memory_fault_config; + + uint64 alloc_no = + __sync_add_and_fetch(&Platform_memory_fault_alloc_count, 1); + bool32 should_fail = FALSE; + + switch (cfg.mode) { + case PLATFORM_MEMORY_FAULT_RANGE: + should_fail = alloc_no >= cfg.range_start + && alloc_no - cfg.range_start < cfg.range_count; + break; + + case PLATFORM_MEMORY_FAULT_RANDOM: + { + bool32 in_burst = platform_memory_fault_consume_burst(); + + if (!in_burst + && platform_memory_fault_chance( + &cfg, alloc_no, 1, cfg.random_burst_start_probability)) + { + uint64 burst_length = + platform_memory_fault_burst_length(&cfg, alloc_no); + if (burst_length != 0) { + in_burst = TRUE; + if (burst_length > 1) { + __sync_bool_compare_and_swap( + &Platform_memory_fault_burst_remaining, + 0, + burst_length - 1); + } + } + } + + should_fail = + in_burst ? platform_memory_fault_chance( + &cfg, alloc_no, 2, cfg.random_burst_fail_probability) + : platform_memory_fault_chance( + &cfg, alloc_no, 3, cfg.random_fail_probability); + break; + } + + default: + should_fail = FALSE; + break; + } + + if (!should_fail) { + return FALSE; + } + + uint64 failure_no = + __sync_add_and_fetch(&Platform_memory_fault_failure_count, 1); + if (failure_no > cfg.max_failures) { + __sync_fetch_and_sub(&Platform_memory_fault_failure_count, 1); + return FALSE; + } + + if (cfg.verbose) { + platform_error_log("memory fault: allocation %lu failed" + " (failure %lu, size %zu, object %s, at %s:%d %s)\n", + alloc_no, + failure_no, + size, + objname ? objname : "", + file ? file : "", + lineno, + func ? func : ""); + } + + return TRUE; +} + +#else + +void +platform_memory_fault_enable(void) +{ +} + +void +platform_memory_fault_disable(void) +{ +} + +void +platform_memory_fault_reset_counters(void) +{ +} + +platform_memory_fault_counters +platform_memory_fault_get_counters(void) +{ + return (platform_memory_fault_counters){0}; +} + +bool32 +platform_memory_fault_should_fail(platform_heap_id heap_id, + size_t size, + const char *objname, + const char *func, + const char *file, + int lineno) +{ + return FALSE; +} + +#endif // PLATFORM_MEMORY_FAULT_INJECTION + /* * platform_heap_create() - Create a heap for memory allocation. * diff --git a/src/platform_linux/platform_heap.h b/src/platform_linux/platform_heap.h index 04664240..39999f63 100644 --- a/src/platform_linux/platform_heap.h +++ b/src/platform_linux/platform_heap.h @@ -27,6 +27,66 @@ platform_get_module_id() extern platform_heap_id Heap_id; +#ifndef PLATFORM_MEMORY_FAULT_INJECTION +# define PLATFORM_MEMORY_FAULT_INJECTION 0 +#endif + +typedef enum platform_memory_fault_mode { + PLATFORM_MEMORY_FAULT_RANGE = 0, + PLATFORM_MEMORY_FAULT_RANDOM, +} platform_memory_fault_mode; + +#define PLATFORM_MEMORY_FAULT_PROBABILITY_SCALE (1000000U) + +typedef struct platform_memory_fault_config { + platform_memory_fault_mode mode; + + // Allocation numbers are 1-based. + uint64 range_start; + uint64 range_count; + + uint64 seed; + uint32 random_fail_probability; + uint32 random_burst_start_probability; + uint32 random_burst_fail_probability; + uint64 random_burst_min_length; + uint64 random_burst_max_length; + + uint64 max_failures; + bool32 verbose; +} platform_memory_fault_config; + +typedef struct platform_memory_fault_counters { + uint64 alloc_count; + uint64 failure_count; +} platform_memory_fault_counters; + +void +platform_memory_fault_config_get(platform_memory_fault_config *cfg); + +void +platform_memory_fault_config_set(const platform_memory_fault_config *cfg); + +void +platform_memory_fault_enable(void); + +void +platform_memory_fault_disable(void); + +void +platform_memory_fault_reset_counters(void); + +platform_memory_fault_counters +platform_memory_fault_get_counters(void); + +bool32 +platform_memory_fault_should_fail(platform_heap_id heap_id, + size_t size, + const char *objname, + const char *func, + const char *file, + int lineno); + /* * Provide a tag for callers that do not want to use shared-memory allocation, * when configured but want to fallback to default scheme of allocating @@ -76,6 +136,15 @@ platform_aligned_malloc(const platform_heap_id heap_id, platform_assert(size <= size + alignment - 1); size_t aligned_size = (size + alignment - 1) & ~((uintptr_t)alignment - 1); +#if PLATFORM_MEMORY_FAULT_INJECTION \ + && !defined(PLATFORM_MEMORY_FAULT_INJECTION_DISABLED_IN_THIS_FILE) + if (platform_memory_fault_should_fail( + heap_id, size, objname, func, file, lineno)) + { + return NULL; + } +#endif + if (heap_id) { return shmalloc(heap_id, alignment, size); } else { @@ -96,13 +165,27 @@ platform_aligned_malloc(const platform_heap_id heap_id, * Reallocing from NULL must be equivalent to allocing. */ static inline void * -platform_realloc(const platform_heap_id heap_id, - const size_t oldsize, - void *ptr, // IN - const size_t newsize) // IN +platform_realloc_from_heap(const platform_heap_id heap_id, + const size_t oldsize, + void *ptr, // IN + const size_t newsize, // IN + const char *objname, + const char *func, + const char *file, + const int lineno) { /* FIXME: alignment? */ +#if PLATFORM_MEMORY_FAULT_INJECTION \ + && !defined(PLATFORM_MEMORY_FAULT_INJECTION_DISABLED_IN_THIS_FILE) + if (newsize != 0 + && platform_memory_fault_should_fail( + heap_id, newsize, objname, func, file, lineno)) + { + return NULL; + } +#endif + // Farm control off to shared-memory based realloc, if it's configured if (heap_id) { return shrealloc(heap_id, ptr, newsize); @@ -111,6 +194,10 @@ platform_realloc(const platform_heap_id heap_id, } } +#define platform_realloc(id, oldsize, p, newsize) \ + platform_realloc_from_heap( \ + id, oldsize, p, newsize, STRINGIFY(p), __func__, __FILE__, __LINE__) + static inline void platform_free_from_heap(platform_heap_id heap_id, void *ptr, diff --git a/src/platform_linux/platform_threads.c b/src/platform_linux/platform_threads.c index 451d876b..a6ba64b6 100644 --- a/src/platform_linux/platform_threads.c +++ b/src/platform_linux/platform_threads.c @@ -5,6 +5,7 @@ #include "splinterdb/platform_linux/public_platform.h" #include "platform_log.h" #include +#include #include __thread threadid xxxtid = INVALID_TID; @@ -123,7 +124,7 @@ thread_registration_cleanup_disarm(void) * in the task system structure to indicate that no threads are currently * active. */ -static void +static platform_status id_allocator_init_if_needed(void) { if (id_alloc == NULL) { @@ -135,8 +136,10 @@ id_allocator_init_if_needed(void) -1, 0); if (my_id_alloc == MAP_FAILED) { - platform_error_log("Failed to allocate memory for id allocator"); - return; + platform_error_log("id_allocator_init_if_needed: mmap failed for " + "id allocator: %s\n", + strerror(errno)); + return STATUS_NO_MEMORY; } memset(my_id_alloc, 0x00, sizeof(id_allocator)); memset(my_id_alloc->tid_allocator.available_tids, @@ -146,6 +149,7 @@ id_allocator_init_if_needed(void) munmap(my_id_alloc, sizeof(id_allocator)); } } + return STATUS_OK; } /* @@ -270,6 +274,9 @@ ensure_xxxpid_is_setup(void) __sync_lock_release(&id_alloc->pid_allocator.lock); if (ospid != myospid) { + platform_error_log("ensure_xxxpid_is_setup: no PID slot available for " + "OS pid %d\n", + myospid); return STATUS_BUSY; } @@ -308,11 +315,18 @@ decref_xxxpid(void) __sync_lock_release(&id_alloc->pid_allocator.lock); } -void +platform_status platform_linux_add_process_event_callback( process_event_callback_list_node *node) { - id_allocator_init_if_needed(); + platform_status rc = id_allocator_init_if_needed(); + if (!SUCCESS(rc)) { + platform_error_log("platform_linux_add_process_event_callback: " + "id allocator init failed: %s\n", + platform_status_to_string(rc)); + return rc; + } + while ( __sync_lock_test_and_set(&id_alloc->process_event_callback_list_lock, 1)) { @@ -321,6 +335,7 @@ platform_linux_add_process_event_callback( node->next = id_alloc->process_event_callback_list; id_alloc->process_event_callback_list = node; __sync_lock_release(&id_alloc->process_event_callback_list_lock); + return STATUS_OK; } void @@ -388,24 +403,47 @@ thread_registration_cleanup_function(void *arg) } } -static platform_status -register_thread_common(void) +/* + * platform_register_thread(): Register this new thread. + * + * Registration implies: + * - Acquire a new thread ID (index) for this to-be-active thread + */ +int +platform_register_thread(void) { platform_status status; - threadid thread_tid; + threadid thread_tid = xxxtid; + + // Before registration, all SplinterDB threads' tid will be its default + // value; i.e. INVALID_TID. + platform_assert(thread_tid == INVALID_TID, + "Attempt to register thread that is already " + "registered as thread %lu\n", + thread_tid); - id_allocator_init_if_needed(); + status = id_allocator_init_if_needed(); + if (!SUCCESS(status)) { + platform_error_log("platform_register_thread: id allocator init failed: " + "%s\n", + platform_status_to_string(status)); + return -1; + } status = ensure_xxxpid_is_setup(); if (!SUCCESS(status)) { - return status; + platform_error_log("platform_register_thread: PID setup failed: %s\n", + platform_status_to_string(status)); + return -1; } thread_tid = allocate_threadid(); // Unavailable threads is a temporary state that could go away. if (thread_tid == INVALID_TID) { decref_xxxpid(); - return STATUS_BUSY; + platform_error_log("platform_register_thread: thread id allocation " + "failed\n"); + return -1; } platform_assert(thread_tid < MAX_THREADS); @@ -415,41 +453,12 @@ register_thread_common(void) if (!SUCCESS(status)) { deallocate_threadid(thread_tid); decref_xxxpid(); - return status; - } - - return STATUS_OK; -} - -/* - * platform_register_thread(): Register this new thread. - * - * Registration implies: - * - Acquire a new thread ID (index) for this to-be-active thread - */ -int -platform_register_thread(void) -{ - threadid thread_tid = xxxtid; - - // Before registration, all SplinterDB threads' tid will be its default - // value; i.e. INVALID_TID. - platform_assert(thread_tid == INVALID_TID, - "Attempt to register thread that is already " - "registered as thread %lu\n", - thread_tid); - - return SUCCESS(register_thread_common()) ? 0 : -1; -} - -platform_status -platform_register_thread_auto(void) -{ - if (xxxtid != INVALID_TID) { - return STATUS_OK; + platform_error_log("platform_register_thread: cleanup arm failed: %s\n", + platform_status_to_string(status)); + return -1; } - return register_thread_common(); + return 0; } @@ -483,9 +492,18 @@ platform_thread_create(platform_thread *thread, { int ret; - id_allocator_init_if_needed(); - platform_status rc = ensure_xxxpid_is_setup(); + platform_status rc = id_allocator_init_if_needed(); + if (!SUCCESS(rc)) { + platform_error_log("platform_thread_create: id allocator init failed: " + "%s\n", + platform_status_to_string(rc)); + return rc; + } + + rc = ensure_xxxpid_is_setup(); if (!SUCCESS(rc)) { + platform_error_log("platform_thread_create: PID setup failed: %s\n", + platform_status_to_string(rc)); return rc; } @@ -494,6 +512,8 @@ platform_thread_create(platform_thread *thread, threadid tid = allocate_threadid(); if (tid == INVALID_TID) { decref_xxxpid(); + platform_error_log("platform_thread_create: thread id allocation " + "failed\n"); return STATUS_BUSY; } thread_invocation *thread_inv = &id_alloc->thread_invocations[tid]; @@ -504,6 +524,9 @@ platform_thread_create(platform_thread *thread, if (ret != 0) { deallocate_threadid(tid); decref_xxxpid(); + platform_error_log("platform_thread_create: pthread_create failed: " + "%s\n", + strerror(ret)); return STATUS_NO_MEMORY; } @@ -524,6 +547,12 @@ platform_thread_join(platform_thread *thread) threadid platform_num_threads(void) { - id_allocator_init_if_needed(); + platform_status rc = id_allocator_init_if_needed(); + if (!SUCCESS(rc)) { + platform_error_log("platform_num_threads: id allocator init failed: %s\n", + platform_status_to_string(rc)); + return 0; + } + return id_alloc->tid_allocator.num_threads; } diff --git a/src/platform_linux/platform_threads.h b/src/platform_linux/platform_threads.h index c1b4f3dc..d340e695 100644 --- a/src/platform_linux/platform_threads.h +++ b/src/platform_linux/platform_threads.h @@ -28,9 +28,6 @@ platform_get_tid() return xxxtid; } -platform_status -platform_register_thread_auto(void); - static inline platform_status platform_ensure_thread_registered() { @@ -40,7 +37,7 @@ platform_ensure_thread_registered() return STATUS_OK; } - return platform_register_thread_auto(); + return platform_register_thread() == 0 ? STATUS_OK : STATUS_BUSY; } /* This is not part of the platform API. It is used internally to this platform @@ -60,7 +57,7 @@ typedef struct process_event_callback_list_node { struct process_event_callback_list_node *next; } process_event_callback_list_node; -void +platform_status platform_linux_add_process_event_callback( process_event_callback_list_node *node); diff --git a/src/routing_filter.c b/src/routing_filter.c index 62b2e894..26db8bc2 100644 --- a/src/routing_filter.c +++ b/src/routing_filter.c @@ -699,13 +699,21 @@ routing_filter_prefetch(cache *cc, } } -uint32 +platform_status routing_filter_estimate_unique_fp(cache *cc, const routing_config *cfg, platform_heap_id hid, routing_filter *filter, - uint64 num_filters) + uint64 num_filters, + uint32 *num_unique_fp) { + if (num_unique_fp == NULL) { + platform_error_log("routing_filter_estimate_unique_fp: " + "num_unique_fp must not be NULL\n"); + return STATUS_BAD_PARAM; + } + + *num_unique_fp = 0; platform_assert(num_filters <= MAX_FILTERS); uint32 total_num_fp = 0; for (uint64 i = 0; i != num_filters; i++) { @@ -714,7 +722,13 @@ routing_filter_estimate_unique_fp(cache *cc, uint32 buffer_size = total_num_fp / 12; uint32 alloc_size = buffer_size + cfg->index_size; // NOLINTNEXTLINE(bugprone-sizeof-expression) - uint32 *local = TYPED_ARRAY_ZALLOC(hid, local, alloc_size * sizeof(uint32)); + uint32 *local = TYPED_ARRAY_ZALLOC(hid, local, alloc_size * sizeof(uint32)); + if (local == NULL) { + platform_error_log("routing_filter_estimate_unique_fp: failed to " + "allocate fingerprint work buffer of %u uint32s\n", + alloc_size); + return STATUS_NO_MEMORY; + } uint32 *fp_arr = local; uint32 *count = local + buffer_size; @@ -829,7 +843,8 @@ routing_filter_estimate_unique_fp(cache *cc, } platform_free(hid, local); - return num_unique * 16; + *num_unique_fp = num_unique * 16; + return STATUS_OK; } static inline async_status diff --git a/src/routing_filter.h b/src/routing_filter.h index aa21d90b..79c3a4d0 100644 --- a/src/routing_filter.h +++ b/src/routing_filter.h @@ -168,12 +168,13 @@ uint32 routing_filter_estimate_unique_keys(routing_filter *filter, routing_config *cfg); -uint32 +platform_status routing_filter_estimate_unique_fp(cache *cc, const routing_config *cfg, platform_heap_id hid, routing_filter *filter, - uint64 num_filters); + uint64 num_filters, + uint32 *num_unique_fp); uint64 routing_filter_space_use_bytes(cache *cc, const routing_filter *filter); diff --git a/src/shard_log.c b/src/shard_log.c index d997d998..c0eee131 100644 --- a/src/shard_log.c +++ b/src/shard_log.c @@ -382,8 +382,17 @@ log_create(cache *cc, log_config *lcfg, platform_heap_id hid) { shard_log_config *cfg = (shard_log_config *)lcfg; shard_log *slog = TYPED_MALLOC(hid, slog); - platform_status rc = shard_log_init(slog, cc, cfg); - platform_assert(SUCCESS(rc)); + if (slog == NULL) { + platform_error_log("log_create: failed to allocate shard_log\n"); + return NULL; + } + platform_status rc = shard_log_init(slog, cc, cfg); + if (!SUCCESS(rc)) { + platform_error_log("log_create: shard_log_init failed: %s\n", + platform_status_to_string(rc)); + platform_free(hid, slog); + return NULL; + } return (log_handle *)slog; } @@ -402,6 +411,7 @@ shard_log_iterator_init(cache *cc, uint64 num_valid_pages = 0; uint64 extent_addr; uint64 next_extent_addr; + uint64 contents_size; memset(itor, 0, sizeof(shard_log_iterator)); itor->super.ops = &shard_log_iterator_ops; @@ -431,9 +441,26 @@ shard_log_iterator_init(cache *cc, finished_first_pass: - itor->contents = TYPED_ARRAY_MALLOC( - hid, itor->contents, num_valid_pages * shard_log_page_size(cfg)); - itor->entries = TYPED_ARRAY_MALLOC(hid, itor->entries, itor->num_entries); + contents_size = num_valid_pages * shard_log_page_size(cfg); + if (contents_size != 0) { + itor->contents = TYPED_ARRAY_MALLOC(hid, itor->contents, contents_size); + if (itor->contents == NULL) { + platform_error_log("shard_log_iterator_init: failed to allocate " + "contents buffer of %lu bytes\n", + contents_size); + return STATUS_NO_MEMORY; + } + } + if (itor->num_entries != 0) { + itor->entries = TYPED_ARRAY_MALLOC(hid, itor->entries, itor->num_entries); + if (itor->entries == NULL) { + platform_error_log("shard_log_iterator_init: failed to allocate " + "entries array for %lu entries\n", + itor->num_entries); + platform_free(hid, itor->contents); + return STATUS_NO_MEMORY; + } + } // traverse the log extents again and copy the kv pairs log_entry *cursor = (log_entry *)itor->contents; @@ -467,14 +494,16 @@ shard_log_iterator_init(cache *cc, debug_assert(entry_idx == itor->num_entries); // sort by generation - log_entry *tmp; finished_second_pass: - platform_sort_slow(itor->entries, - itor->num_entries, - sizeof(log_entry *), - shard_log_compare, - NULL, - &tmp); + if (itor->num_entries != 0) { + log_entry *tmp; + platform_sort_slow(itor->entries, + itor->num_entries, + sizeof(log_entry *), + shard_log_compare, + NULL, + &tmp); + } return STATUS_OK; } @@ -482,8 +511,12 @@ shard_log_iterator_init(cache *cc, void shard_log_iterator_deinit(platform_heap_id hid, shard_log_iterator *itor) { - platform_free(hid, itor->contents); - platform_free(hid, itor->entries); + if (itor->contents != NULL) { + platform_free(hid, itor->contents); + } + if (itor->entries != NULL) { + platform_free(hid, itor->entries); + } } void diff --git a/src/splinterdb.c b/src/splinterdb.c index 5ca6dac7..45b70616 100644 --- a/src/splinterdb.c +++ b/src/splinterdb.c @@ -389,11 +389,9 @@ splinterdb_create_or_open(const splinterdb_config *kvs_cfg, // IN kvs->heap_id); } if (!SUCCESS(status)) { - platform_error_log("Failed to %s SplinterDB instance.\n", - (open_existing ? "mount existing" : "initialize")); - - // Return a generic 'something went wrong' error - status = STATUS_INVALID_STATE; + platform_error_log("Failed to %s SplinterDB instance: %s\n", + (open_existing ? "mount existing" : "initialize"), + platform_status_to_string(status)); goto deinit_cache; } @@ -476,7 +474,11 @@ splinterdb_close(splinterdb **kvs_in) // IN * order when these sub-systems were init'ed when a Splinter device was * created or re-opened. Otherwise, asserts will trip. */ - core_unmount(&kvs->spl); + platform_status status = core_unmount(&kvs->spl); + if (!SUCCESS(status)) { + platform_error_log("Failed to close SplinterDB instance cleanly: %s\n", + platform_status_to_string(status)); + } io_wait_all(kvs->io_handle); clockcache_deinit(&kvs->cache_handle); rc_allocator_unmount(&kvs->allocator_handle); diff --git a/src/trunk.c b/src/trunk.c index 9a0e9e12..0e827ba7 100644 --- a/src/trunk.c +++ b/src/trunk.c @@ -522,9 +522,9 @@ trunk_node_copy_init(trunk_node *dst, vector_init(&pivot_bundles, hid); vector_init(&inflight_bundles, hid); - rc = VECTOR_MAP_ELTS(&pivots, trunk_pivot_copy, &src->pivots, hid); + rc = VECTOR_MAP_ELTS_TO_PTRS(&pivots, trunk_pivot_copy, &src->pivots, hid); if (!SUCCESS(rc)) { - platform_error_log("%s():%d: VECTOR_MAP_ELTS() failed: %s", + platform_error_log("%s():%d: VECTOR_MAP_ELTS_TO_PTRS() failed: %s", __func__, __LINE__, platform_status_to_string(rc)); @@ -2294,7 +2294,7 @@ trunk_branch_merger_add_branch(trunk_branch_merger *merger, __LINE__, platform_status_to_string(rc)); } - return STATUS_OK; + return rc; } @@ -3478,7 +3478,7 @@ bundle_compaction_task(task *arg) state, bc, platform_status_to_string(rc)); - goto cleanup; + goto cleanup_branch_merger; } uint64 tuple_bound; @@ -3493,7 +3493,7 @@ bundle_compaction_task(task *arg) state, bc, platform_status_to_string(rc)); - goto cleanup; + goto cleanup_branch_merger; } rc = trunk_branch_merger_build_merge_itor(&merger, bc->merge_mode); @@ -3503,17 +3503,26 @@ bundle_compaction_task(task *arg) state, bc, platform_status_to_string(rc)); - goto cleanup; + goto cleanup_branch_merger; } btree_pack_req pack_req; - btree_pack_req_init(&pack_req, - context->cc, - context->cfg->btree_cfg, - &merger.merge_itor->super, - tuple_bound, - context->cfg->filter_cfg->seed, - context->hid); + rc = btree_pack_req_init(&pack_req, + context->cc, + context->cfg->btree_cfg, + &merger.merge_itor->super, + tuple_bound, + context->cfg->filter_cfg->seed, + TRUE, + context->hid); + if (!SUCCESS(rc)) { + platform_error_log( + "btree_pack_req_init failed for state: %p bc: %p: %s\n", + state, + bc, + platform_status_to_string(rc)); + goto cleanup_branch_merger; + } // This is just a quick shortcut to avoid wasting time on a compaction when // the pivot is already stuck due to an earlier maplet compaction failure. @@ -3522,7 +3531,7 @@ bundle_compaction_task(task *arg) "for state %p\n", state); rc = STATUS_INVALID_STATE; - goto cleanup; + goto cleanup_pack_req; } uint64 pack_start = platform_get_timestamp(); @@ -3532,7 +3541,7 @@ bundle_compaction_task(task *arg) state, bc, platform_status_to_string(rc)); - goto cleanup; + goto cleanup_pack_req; } if (context->stats) { context->stats[tid].compaction_pack_time_ns[state->height] += @@ -3561,8 +3570,9 @@ bundle_compaction_task(task *arg) bc->compaction_time_ns); } -cleanup: +cleanup_pack_req: btree_pack_req_deinit(&pack_req, context->hid); +cleanup_branch_merger: trunk_branch_merger_deinit(&merger); trunk_pivot_state_lock_compactions(state); @@ -3587,8 +3597,9 @@ bundle_compaction_task(task *arg) static platform_status enqueue_bundle_compaction(trunk_context *context, trunk_node *node) { - uint64 height = trunk_node_height(node); - uint64 num_children = trunk_node_num_children(node); + uint64 height = trunk_node_height(node); + uint64 num_children = trunk_node_num_children(node); + platform_status result = STATUS_OK; for (uint64 pivot_num = 0; pivot_num < num_children; pivot_num++) { if (trunk_node_pivot_has_received_bundles(node, pivot_num)) { @@ -3596,8 +3607,10 @@ enqueue_bundle_compaction(trunk_context *context, trunk_node *node) key pivot_key = trunk_node_pivot_key(node, pivot_num); key ubkey = trunk_node_pivot_key(node, pivot_num + 1); bundle *pivot_bundle = trunk_node_pivot_bundle(node, pivot_num); + trunk_pivot_state *state = NULL; + bundle_compaction *bc = NULL; - trunk_pivot_state *state = + state = trunk_pivot_state_map_get_or_create_entry(context, &context->pivot_states, pivot_key, @@ -3611,8 +3624,7 @@ enqueue_bundle_compaction(trunk_context *context, trunk_node *node) goto next; } - bundle_compaction *bc = - bundle_compaction_create(context, node, pivot_num, state); + bc = bundle_compaction_create(context, node, pivot_num, state); if (bc == NULL) { platform_error_log("enqueue_bundle_compaction: " "bundle_compaction_create failed\n"); @@ -3632,13 +3644,17 @@ enqueue_bundle_compaction(trunk_context *context, trunk_node *node) if (!SUCCESS(rc)) { trunk_pivot_state_decref(state); platform_error_log( - "enqueue_bundle_compaction: task_enqueue failed\n"); + "enqueue_bundle_compaction: task_enqueue failed: %s\n", + platform_status_to_string(rc)); } next: if (!SUCCESS(rc) && bc) { bc->state = BUNDLE_COMPACTION_FAILED; } + if (!SUCCESS(rc) && SUCCESS(result)) { + result = rc; + } if (state != NULL) { trunk_pivot_state_map_release_entry( context, &context->pivot_states, state); @@ -3646,7 +3662,7 @@ enqueue_bundle_compaction(trunk_context *context, trunk_node *node) } } - return STATUS_OK; + return result; } static void @@ -3938,12 +3954,19 @@ leaf_estimate_unique_keys(trunk_context *context, *estimate = unfiltered_tuples; if (0 < num_fp) { - uint32 num_globally_unique_fp = - routing_filter_estimate_unique_fp(context->cc, - context->cfg->filter_cfg, - PROCESS_PRIVATE_HEAP_ID, - vector_data(&maplets), - vector_length(&maplets)); + uint32 num_globally_unique_fp; + rc = routing_filter_estimate_unique_fp(context->cc, + context->cfg->filter_cfg, + PROCESS_PRIVATE_HEAP_ID, + vector_data(&maplets), + vector_length(&maplets), + &num_globally_unique_fp); + if (!SUCCESS(rc)) { + platform_error_log("leaf_estimate_unique_keys: " + "routing_filter_estimate_unique_fp failed: %d\n", + rc.r); + goto cleanup; + } num_globally_unique_fp = routing_filter_estimate_unique_keys_from_count( context->cfg->filter_cfg, num_globally_unique_fp); @@ -3959,7 +3982,7 @@ leaf_estimate_unique_keys(trunk_context *context, cleanup: vector_deinit(&maplets); - return STATUS_OK; + return rc; } static platform_status @@ -4555,22 +4578,16 @@ flush_to_one_child(trunk_context *context, // Construct our new pivots for the new children trunk_pivot_vector new_pivots; vector_init(&new_pivots, PROCESS_PRIVATE_HEAP_ID); - rc = vector_ensure_capacity(&new_pivots, vector_length(&new_childrefs)); + rc = VECTOR_MAP_ELTS_TO_PTRS(&new_pivots, + trunk_pivot_create_from_ondisk_node_ref, + &new_childrefs, + PROCESS_PRIVATE_HEAP_ID); if (!SUCCESS(rc)) { - platform_error_log("flush_to_one_child: vector_ensure_capacity failed: " + platform_error_log("flush_to_one_child: VECTOR_MAP_ELTS_TO_PTRS failed: " "%d\n", rc.r); goto cleanup_new_pivots; } - rc = VECTOR_MAP_ELTS(&new_pivots, - trunk_pivot_create_from_ondisk_node_ref, - &new_childrefs, - PROCESS_PRIVATE_HEAP_ID); - if (!SUCCESS(rc)) { - platform_error_log("flush_to_one_child: VECTOR_MAP_ELTS failed: %d\n", - rc.r); - goto cleanup_new_pivots; - } for (uint64 j = 0; j < vector_length(&new_pivots); j++) { trunk_pivot *new_pivot = vector_get(&new_pivots, j); trunk_pivot_set_inflight_bundle_start( @@ -4814,12 +4831,14 @@ build_new_roots(trunk_context *context, rc.r); goto cleanup_pivots; } - rc = VECTOR_MAP_ELTS(&pivots, - trunk_pivot_create_from_ondisk_node_ref, - node_refs, - PROCESS_PRIVATE_HEAP_ID); + rc = VECTOR_MAP_ELTS_TO_PTRS(&pivots, + trunk_pivot_create_from_ondisk_node_ref, + node_refs, + PROCESS_PRIVATE_HEAP_ID); if (!SUCCESS(rc)) { - platform_error_log("build_new_roots: VECTOR_MAP_ELTS failed: %d\n", rc.r); + platform_error_log("build_new_roots: VECTOR_MAP_ELTS_TO_PTRS failed: " + "%d\n", + rc.r); goto cleanup_pivots; } trunk_pivot *ub_pivot = trunk_pivot_create(PROCESS_PRIVATE_HEAP_ID, @@ -5885,6 +5904,10 @@ trunk_context_init(trunk_context *context, if (context->stats == NULL) { platform_error_log("trunk_node_context_init: " "TYPED_ARRAY_MALLOC failed\n"); + if (context->root != NULL) { + trunk_ondisk_node_ref_destroy(context->root, context, hid); + context->root = NULL; + } return STATUS_NO_MEMORY; } memset(context->stats, 0, sizeof(trunk_stats) * MAX_THREADS); @@ -5896,11 +5919,10 @@ trunk_context_init(trunk_context *context, return STATUS_OK; } -platform_status +void trunk_inc_ref(allocator *al, uint64 root_addr) { trunk_addr_inc_ref(al, root_addr); - return STATUS_OK; } platform_status diff --git a/src/trunk.h b/src/trunk.h index 9daf45fd..9ce8c7f5 100644 --- a/src/trunk.h +++ b/src/trunk.h @@ -213,7 +213,7 @@ trunk_context_init(trunk_context *context, task_system *ts, uint64 root_addr); -platform_status +void trunk_inc_ref(allocator *al, uint64 root_addr); platform_status diff --git a/src/vector.h b/src/vector.h index 77c87fec..ee782289 100644 --- a/src/vector.h +++ b/src/vector.h @@ -15,6 +15,7 @@ #include "util.h" +#include "platform_log.h" #define VECTOR(elt_type) \ struct { \ @@ -268,6 +269,50 @@ __vector_replace(writable_buffer *dst, VECTOR_MAP_GENERIC( \ dst, vector_apply_to_ptr, src, func __VA_OPT__(, __VA_ARGS__)) +// forall i: dst[i] = f(src[i], ...) +// f must return a pointer. Stops after the first NULL result. +// Leaves dst length equal to the number of non-NULL results. +#define VECTOR_MAP_ELTS_TO_PTRS(dst, func, src, ...) \ + ({ \ + _Static_assert( \ + __builtin_classify_type((vector_elt_type(dst))0) == 5, \ + "VECTOR_MAP_ELTS_TO_PTRS requires pointer elements in dst"); \ + platform_status __rc; \ + uint64 __len = vector_length(src); \ + uint64 __size = __len * vector_elt_size(dst); \ + __rc = writable_buffer_resize(&(dst)->wb, __size); \ + if (!SUCCESS(__rc)) { \ + platform_error_log("VECTOR_MAP_ELTS_TO_PTRS: resize failed at " \ + "%s:%d: %s\n", \ + __FILE__, \ + __LINE__, \ + platform_status_to_string(__rc)); \ + } else { \ + uint64 __idx = 0; \ + for (; __idx < __len; __idx++) { \ + vector_elt_type(dst) __result = vector_apply_to_elt( \ + src, __idx, func __VA_OPT__(, __VA_ARGS__)); \ + if (__result == NULL) { \ + platform_error_log("VECTOR_MAP_ELTS_TO_PTRS: %s returned NULL " \ + "at index %lu from %s:%d\n", \ + STRINGIFY(func), \ + __idx, \ + __FILE__, \ + __LINE__); \ + __rc = STATUS_NO_MEMORY; \ + break; \ + } \ + vector_set(dst, __idx, __result); \ + } \ + if (!SUCCESS(__rc)) { \ + platform_status __resize_rc = writable_buffer_resize( \ + &(dst)->wb, __idx * vector_elt_size(dst)); \ + platform_assert_status_ok(__resize_rc); \ + } \ + } \ + __rc; \ + }) + /* * Convenience function so you can use vector_apply_to_elements to * free all the elements of a vector of pointers. @@ -481,8 +526,9 @@ _Static_assert(!__builtin_types_compatible_p(platform_status, void), "Uhoh"); init, (v), __old_length __VA_OPT__(, __VA_ARGS__)); \ } \ if (!SUCCESS(__rc)) { \ - __rc = writable_buffer_resize(&(v)->wb, __old_size); \ - platform_assert_status_ok(__rc); \ + platform_status __resize_rc = \ + writable_buffer_resize(&(v)->wb, __old_size); \ + platform_assert_status_ok(__resize_rc); \ } \ __rc; \ }) diff --git a/tests/config.c b/tests/config.c index 463a1228..2fe9a5f5 100644 --- a/tests/config.c +++ b/tests/config.c @@ -4,6 +4,7 @@ #include "config.h" #include "platform_units.h" #include "platform_buffer.h" +#include "platform_heap.h" #include "util.h" /* @@ -167,8 +168,99 @@ config_usage() platform_error_log("\t--num-inserts (%d)\n", TEST_CONFIG_DEFAULT_NUM_INSERTS); platform_error_log("\t--seed (%d)\n", TEST_CONFIG_DEFAULT_SEED); + +#if PLATFORM_MEMORY_FAULT_INJECTION + platform_error_log("\t--fault-alloc-range \n"); + platform_error_log("\t--fault-alloc-random \n"); + platform_error_log("\t--fault-alloc-random-probability \n"); + platform_error_log("\t--fault-alloc-burst-probability \n"); + platform_error_log("\t--fault-alloc-burst-fail-probability \n"); + platform_error_log("\t--fault-alloc-burst-min \n"); + platform_error_log("\t--fault-alloc-burst-max \n"); + platform_error_log("\t--fault-alloc-max-failures \n"); + platform_error_log("\t--fault-alloc-verbose\n"); + platform_error_log("\t--fault-alloc-disable\n"); +#endif +} + +#if PLATFORM_MEMORY_FAULT_INJECTION + +static bool32 +config_parse_uint64_arg(int argc, + char *argv[], + uint64 *idx, + const char *name, + uint64 *result) +{ + if (*idx + 1 == argc || !try_string_to_uint64(argv[++(*idx)], result)) { + platform_error_log("config: failed to parse %s\n", name); + return FALSE; + } + return TRUE; +} + +static bool32 +config_parse_uint32_arg(int argc, + char *argv[], + uint64 *idx, + const char *name, + uint32 *result) +{ + uint64 tmp; + if (!config_parse_uint64_arg(argc, argv, idx, name, &tmp) + || tmp > UINT32_MAX) + { + platform_error_log("config: failed to parse %s\n", name); + return FALSE; + } + *result = (uint32)tmp; + return TRUE; } +static platform_status +config_parse_memory_fault_validate(const platform_memory_fault_config *cfg) +{ + switch (cfg->mode) { + case PLATFORM_MEMORY_FAULT_RANGE: + if (cfg->range_start == 0 || cfg->range_count == 0) { + platform_error_log("config: --fault-alloc-range uses a 1-based " + "start and non-zero count\n"); + return STATUS_BAD_PARAM; + } + break; + + case PLATFORM_MEMORY_FAULT_RANDOM: + if (cfg->random_fail_probability + > PLATFORM_MEMORY_FAULT_PROBABILITY_SCALE + || cfg->random_burst_start_probability + > PLATFORM_MEMORY_FAULT_PROBABILITY_SCALE + || cfg->random_burst_fail_probability + > PLATFORM_MEMORY_FAULT_PROBABILITY_SCALE) + { + platform_error_log("config: fault allocation probabilities are " + "parts per million and must be <= %u\n", + PLATFORM_MEMORY_FAULT_PROBABILITY_SCALE); + return STATUS_BAD_PARAM; + } + if (cfg->random_burst_min_length == 0 + || cfg->random_burst_max_length < cfg->random_burst_min_length) + { + platform_error_log("config: invalid fault allocation burst length " + "range\n"); + return STATUS_BAD_PARAM; + } + break; + + default: + platform_error_log("config: invalid fault allocation mode\n"); + return STATUS_BAD_PARAM; + } + + return STATUS_OK; +} + +#endif // PLATFORM_MEMORY_FAULT_INJECTION + /* * config_parse_use_shmem() - Check if --use-shmem argument was supplied on * the cmdline. Some tests need to know this to setup the shared memory heap @@ -194,6 +286,14 @@ config_parse_use_shmem(int argc, char *argv[]) platform_status config_parse(master_config *cfg, const uint8 num_config, int argc, char *argv[]) { +#if PLATFORM_MEMORY_FAULT_INJECTION + platform_memory_fault_config fault_cfg; + platform_memory_fault_config_get(&fault_cfg); + bool32 fault_option_seen = FALSE; + bool32 fault_config_seen = FALSE; + bool32 fault_disable = FALSE; +#endif + uint64 i; for (i = 0; i < argc; i++) { // Don't be mislead; this is not dead-code. See the config macro expansion @@ -378,6 +478,125 @@ config_parse(master_config *cfg, const uint8 num_config, int argc, char *argv[]) config_set_uint64("num-inserts", cfg, num_inserts) {} config_set_uint64("num-processes", cfg, num_processes) {} +#if PLATFORM_MEMORY_FAULT_INJECTION + config_has_option("fault-alloc-range") + { + fault_option_seen = TRUE; + fault_config_seen = TRUE; + fault_disable = FALSE; + fault_cfg.mode = PLATFORM_MEMORY_FAULT_RANGE; + if (!config_parse_uint64_arg(argc, + argv, + &i, + "--fault-alloc-range", + &fault_cfg.range_start) + || !config_parse_uint64_arg(argc, + argv, + &i, + "--fault-alloc-range", + &fault_cfg.range_count)) + { + return STATUS_BAD_PARAM; + } + } + config_has_option("fault-alloc-random") + { + fault_option_seen = TRUE; + fault_config_seen = TRUE; + fault_disable = FALSE; + fault_cfg.mode = PLATFORM_MEMORY_FAULT_RANDOM; + if (!config_parse_uint64_arg( + argc, argv, &i, "--fault-alloc-random", &fault_cfg.seed)) + { + return STATUS_BAD_PARAM; + } + } + config_has_option("fault-alloc-random-probability") + { + fault_option_seen = TRUE; + if (!config_parse_uint32_arg(argc, + argv, + &i, + "--fault-alloc-random-probability", + &fault_cfg.random_fail_probability)) + { + return STATUS_BAD_PARAM; + } + } + config_has_option("fault-alloc-burst-probability") + { + fault_option_seen = TRUE; + if (!config_parse_uint32_arg( + argc, + argv, + &i, + "--fault-alloc-burst-probability", + &fault_cfg.random_burst_start_probability)) + { + return STATUS_BAD_PARAM; + } + } + config_has_option("fault-alloc-burst-fail-probability") + { + fault_option_seen = TRUE; + if (!config_parse_uint32_arg( + argc, + argv, + &i, + "--fault-alloc-burst-fail-probability", + &fault_cfg.random_burst_fail_probability)) + { + return STATUS_BAD_PARAM; + } + } + config_has_option("fault-alloc-burst-min") + { + fault_option_seen = TRUE; + if (!config_parse_uint64_arg(argc, + argv, + &i, + "--fault-alloc-burst-min", + &fault_cfg.random_burst_min_length)) + { + return STATUS_BAD_PARAM; + } + } + config_has_option("fault-alloc-burst-max") + { + fault_option_seen = TRUE; + if (!config_parse_uint64_arg(argc, + argv, + &i, + "--fault-alloc-burst-max", + &fault_cfg.random_burst_max_length)) + { + return STATUS_BAD_PARAM; + } + } + config_has_option("fault-alloc-max-failures") + { + fault_option_seen = TRUE; + if (!config_parse_uint64_arg(argc, + argv, + &i, + "--fault-alloc-max-failures", + &fault_cfg.max_failures)) + { + return STATUS_BAD_PARAM; + } + } + config_has_option("fault-alloc-verbose") + { + fault_option_seen = TRUE; + fault_cfg.verbose = TRUE; + } + config_has_option("fault-alloc-disable") + { + fault_option_seen = TRUE; + fault_disable = TRUE; + } +#endif + config_set_else { platform_error_log("config: invalid option: %s\n", argv[i]); @@ -385,6 +604,27 @@ config_parse(master_config *cfg, const uint8 num_config, int argc, char *argv[]) } } +#if PLATFORM_MEMORY_FAULT_INJECTION + if (fault_option_seen) { + if (fault_disable) { + platform_memory_fault_disable(); + } else { + if (!fault_config_seen) { + platform_error_log("config: fault allocation tuning options " + "require --fault-alloc-range or " + "--fault-alloc-random\n"); + return STATUS_BAD_PARAM; + } + platform_status rc = config_parse_memory_fault_validate(&fault_cfg); + if (!SUCCESS(rc)) { + return rc; + } + platform_memory_fault_config_set(&fault_cfg); + platform_memory_fault_enable(); + } + } +#endif + // Validate consistency of config parameters provided. for (uint8 cfg_idx = 0; cfg_idx < num_config; cfg_idx++) { if (cfg[cfg_idx].extent_size % cfg[cfg_idx].page_size != 0) { diff --git a/tests/functional/btree_test.c b/tests/functional/btree_test.c index 2e690d95..be858267 100644 --- a/tests/functional/btree_test.c +++ b/tests/functional/btree_test.c @@ -657,7 +657,8 @@ test_btree_basic(cache *cc, platform_default_log("btree iterator init time %luns\n", platform_timestamp_elapsed(start_time)); btree_pack_req req; - rc = btree_pack_req_init(&req, cc, btree_cfg, (iterator *)&itor, 0, 0, NULL); + rc = btree_pack_req_init( + &req, cc, btree_cfg, (iterator *)&itor, 0, 0, FALSE, NULL); platform_assert_status_ok(rc); btree_print_tree_stats(Platform_default_log_handle, @@ -838,7 +839,8 @@ test_btree_create_packed_trees(cache *cc, 0); btree_pack_req req; - rc = btree_pack_req_init(&req, cc, btree_cfg, &itor.super, 0, 0, hid); + rc = btree_pack_req_init( + &req, cc, btree_cfg, &itor.super, 0, 0, FALSE, hid); platform_assert_status_ok(rc); rc = btree_pack(&req); @@ -1079,7 +1081,7 @@ test_btree_merge_basic(cache *cc, btree_pack_req req; rc = btree_pack_req_init( - &req, cc, btree_cfg, &merge_itor->super, 0, 0, hid); + &req, cc, btree_cfg, &merge_itor->super, 0, 0, FALSE, hid); platform_assert_status_ok(rc); btree_pack(&req); output_addr[pivot_no] = req.root_addr; @@ -1461,7 +1463,7 @@ test_btree_merge_perf(cache *cc, btree_pack_req req; rc = btree_pack_req_init( - &req, cc, btree_cfg, &merge_itor->super, 0, 0, hid); + &req, cc, btree_cfg, &merge_itor->super, 0, 0, FALSE, hid); platform_assert_status_ok(rc); btree_pack(&req); diff --git a/tests/functional/filter_test.c b/tests/functional/filter_test.c index d689200c..1b2f6034 100644 --- a/tests/functional/filter_test.c +++ b/tests/functional/filter_test.c @@ -81,8 +81,15 @@ test_filter_basic(cache *cc, filter[i + 1].num_unique); } - uint32 num_unique = - routing_filter_estimate_unique_fp(cc, cfg, hid, filter + 1, num_values); + uint32 num_unique; + rc = routing_filter_estimate_unique_fp( + cc, cfg, hid, filter + 1, num_values, &num_unique); + if (!SUCCESS(rc)) { + for (uint64 i = 0; i < num_values; i++) { + routing_filter_dec_ref(cc, &filter[i + 1]); + } + goto out; + } num_unique = routing_filter_estimate_unique_keys_from_count(cfg, num_unique); platform_default_log("across filters: num input keys %8u estimate %8u\n", num_input_keys[num_values - 1], diff --git a/tests/unit/btree_stress_test.c b/tests/unit/btree_stress_test.c index c30ef2b3..27eecec6 100644 --- a/tests/unit/btree_stress_test.c +++ b/tests/unit/btree_stress_test.c @@ -19,6 +19,7 @@ #include "splinterdb/data.h" #include "../config.h" #include "platform_io.h" +#include "platform_threads.h" #include "platform_units.h" #include "rc_allocator.h" #include "clockcache.h" @@ -808,7 +809,7 @@ pack_tests(cache *cc, platform_status rc = STATUS_TEST_FAILED; btree_pack_req req; - rc = btree_pack_req_init(&req, cc, cfg, iter, nkvs, 0, hid); + rc = btree_pack_req_init(&req, cc, cfg, iter, nkvs, 0, FALSE, hid); ASSERT_TRUE(SUCCESS(rc)); if (!SUCCESS(btree_pack(&req))) { diff --git a/tests/unit/btree_test.c b/tests/unit/btree_test.c index bf3d0014..35f59259 100644 --- a/tests/unit/btree_test.c +++ b/tests/unit/btree_test.c @@ -15,6 +15,7 @@ #include "test_data.h" #include "splinterdb/data.h" #include "platform_io.h" +#include "platform_threads.h" #include "platform_units.h" #include "clockcache.h" #include "btree_private.h" diff --git a/tests/unit/config_parse_test.c b/tests/unit/config_parse_test.c index b5131cfa..633743e3 100644 --- a/tests/unit/config_parse_test.c +++ b/tests/unit/config_parse_test.c @@ -16,6 +16,7 @@ */ #include "core.h" #include "functional/test.h" +#include "platform_threads.h" #include "test_common.h" #include "unit_tests.h" #include "ctest.h" // This is required for all test-case files. diff --git a/tests/unit/limitations_test.c b/tests/unit/limitations_test.c index c15637bf..33740b3b 100644 --- a/tests/unit/limitations_test.c +++ b/tests/unit/limitations_test.c @@ -19,6 +19,7 @@ #include "splinterdb/default_data_config.h" #include "unit_tests.h" #include "ctest.h" // This is required for all test-case files. +#include "platform_threads.h" #include "platform_units.h" static void diff --git a/tests/unit/misc_test.c b/tests/unit/misc_test.c index 6f65b26e..23cf7914 100644 --- a/tests/unit/misc_test.c +++ b/tests/unit/misc_test.c @@ -12,6 +12,7 @@ #include "unit_tests.h" #include "ctest.h" // This is required for all test-case files. #include "platform_log.h" +#include "platform_threads.h" #include "platform_units.h" #include "platform_assert.h" diff --git a/tests/unit/platform_apis_test.c b/tests/unit/platform_apis_test.c index d7b49d4b..078ec58d 100644 --- a/tests/unit/platform_apis_test.c +++ b/tests/unit/platform_apis_test.c @@ -22,6 +22,7 @@ #include "platform_threads.h" #include "platform_units.h" #include "platform_buffer.h" +#include "platform_typed_alloc.h" #include "platform_spinlock.h" #include "platform_mutex.h" #include "platform_condvar.h" @@ -50,10 +51,17 @@ CTEST_SETUP(platform_api) CTEST_TEARDOWN(platform_api) { + platform_memory_fault_disable(); platform_heap_destroy(&data->hid); platform_deregister_thread(); } +#if PLATFORM_MEMORY_FAULT_INJECTION +# define CTEST_MEMORY_FAULT CTEST2 +#else +# define CTEST_MEMORY_FAULT CTEST2_SKIP +#endif + /* * Test platform_buffer_init() and platform_buffer_deinit(). */ @@ -105,6 +113,93 @@ CTEST2(platform_api, test_platform_buffer_init_fails_for_very_large_length) set_log_streams_for_tests(MSG_LEVEL_INFO); } +CTEST_MEMORY_FAULT(platform_api, test_memory_fault_range) +{ + platform_memory_fault_disable(); + + platform_memory_fault_config cfg; + platform_memory_fault_config_get(&cfg); + cfg.mode = PLATFORM_MEMORY_FAULT_RANGE; + cfg.range_start = 2; + cfg.range_count = 2; + platform_memory_fault_config_set(&cfg); + + char *p0 = TYPED_ARRAY_MALLOC(data->hid, p0, 8); + platform_memory_fault_counters unarmed_counters = + platform_memory_fault_get_counters(); + platform_memory_fault_enable(); + + char *p1 = TYPED_ARRAY_MALLOC(data->hid, p1, 8); + char *p2 = TYPED_ARRAY_MALLOC(data->hid, p2, 8); + char *p3 = TYPED_ARRAY_MALLOC(data->hid, p3, 8); + char *p4 = TYPED_ARRAY_MALLOC(data->hid, p4, 8); + + bool32 p0_ok = p0 != NULL; + bool32 p1_ok = p1 != NULL; + bool32 p2_ok = p2 != NULL; + bool32 p3_ok = p3 != NULL; + bool32 p4_ok = p4 != NULL; + platform_memory_fault_counters counters = + platform_memory_fault_get_counters(); + + platform_memory_fault_disable(); + platform_memory_fault_counters disabled_counters = + platform_memory_fault_get_counters(); + platform_free(data->hid, p1); + platform_free(data->hid, p4); + platform_free(data->hid, p0); + + ASSERT_TRUE(p0_ok); + ASSERT_EQUAL(0, unarmed_counters.alloc_count); + ASSERT_EQUAL(0, unarmed_counters.failure_count); + ASSERT_TRUE(p1_ok); + ASSERT_FALSE(p2_ok); + ASSERT_FALSE(p3_ok); + ASSERT_TRUE(p4_ok); + ASSERT_EQUAL(4, counters.alloc_count); + ASSERT_EQUAL(2, counters.failure_count); + ASSERT_EQUAL(counters.alloc_count, disabled_counters.alloc_count); + ASSERT_EQUAL(counters.failure_count, disabled_counters.failure_count); +} + +CTEST_MEMORY_FAULT(platform_api, test_memory_fault_random_respects_max_failures) +{ + platform_memory_fault_disable(); + + platform_memory_fault_config cfg; + platform_memory_fault_config_get(&cfg); + cfg.mode = PLATFORM_MEMORY_FAULT_RANDOM; + cfg.seed = 42; + cfg.random_fail_probability = PLATFORM_MEMORY_FAULT_PROBABILITY_SCALE; + cfg.random_burst_start_probability = 0; + cfg.max_failures = 2; + platform_memory_fault_config_set(&cfg); + platform_memory_fault_enable(); + + char *p1 = TYPED_ARRAY_MALLOC(data->hid, p1, 8); + char *p2 = TYPED_ARRAY_MALLOC(data->hid, p2, 8); + char *p3 = TYPED_ARRAY_MALLOC(data->hid, p3, 8); + + bool32 p1_ok = p1 != NULL; + bool32 p2_ok = p2 != NULL; + bool32 p3_ok = p3 != NULL; + platform_memory_fault_counters counters = + platform_memory_fault_get_counters(); + + platform_memory_fault_disable(); + platform_memory_fault_counters disabled_counters = + platform_memory_fault_get_counters(); + platform_free(data->hid, p3); + + ASSERT_FALSE(p1_ok); + ASSERT_FALSE(p2_ok); + ASSERT_TRUE(p3_ok); + ASSERT_EQUAL(3, counters.alloc_count); + ASSERT_EQUAL(2, counters.failure_count); + ASSERT_EQUAL(counters.alloc_count, disabled_counters.alloc_count); + ASSERT_EQUAL(counters.failure_count, disabled_counters.failure_count); +} + /* * Exercise platform_semaphore_init() to ensure that changes basically work. diff --git a/tests/unit/splinter_test.c b/tests/unit/splinter_test.c index 13ea7fed..5377cde9 100644 --- a/tests/unit/splinter_test.c +++ b/tests/unit/splinter_test.c @@ -26,6 +26,7 @@ #include "allocator.h" #include "rc_allocator.h" #include "task.h" +#include "platform_threads.h" #include "functional/test.h" #include "functional/test_async.h" #include "test_common.h" diff --git a/tests/unit/splinterdb_stress_test.c b/tests/unit/splinterdb_stress_test.c index e8fff4f4..c3b5b4be 100644 --- a/tests/unit/splinterdb_stress_test.c +++ b/tests/unit/splinterdb_stress_test.c @@ -12,6 +12,7 @@ #include "splinterdb/default_data_config.h" #include "splinterdb/splinterdb.h" +#include "platform_threads.h" #include "unit_tests.h" #include "../functional/random.h" #include "config.h" diff --git a/tests/unit/task_system_test.c b/tests/unit/task_system_test.c index aa514c85..f9e61308 100644 --- a/tests/unit/task_system_test.c +++ b/tests/unit/task_system_test.c @@ -26,6 +26,7 @@ #include "unit_tests.h" #include "ctest.h" // This is required for all test-case files. #include "platform.h" +#include "platform_threads.h" #include "config.h" // Reqd for definition of master_config{} #include "task.h" @@ -582,4 +583,4 @@ exec_user_thread_loop_for_stop(void *arg) CTEST_LOG_INFO("Last user thread ID=%lu, created on line=%d exiting ...\n", this_threads_idx, thread_cfg->line); -} \ No newline at end of file +} diff --git a/tests/unit/util_test.c b/tests/unit/util_test.c index 46409339..bc57c638 100644 --- a/tests/unit/util_test.c +++ b/tests/unit/util_test.c @@ -10,6 +10,7 @@ */ #include "util.h" #include "ctest.h" // This is required for all test-case files. +#include "platform_threads.h" static const char debug_hex_encode_sample_data[10] = {0, 1, 2, 3, 0xa4, 5, 0xd6, 7, 8, 9}; diff --git a/tests/unit/vector_test.c b/tests/unit/vector_test.c index d3ad2ebd..a32e210b 100644 --- a/tests/unit/vector_test.c +++ b/tests/unit/vector_test.c @@ -10,8 +10,10 @@ */ #include "vector.h" #include "ctest.h" +#include "platform_threads.h" typedef VECTOR(uint64) uint64_vector; +typedef VECTOR(uint64 *) uint64_ptr_vector; CTEST_DATA(vector) { @@ -191,6 +193,61 @@ CTEST2(vector, map_ptrs) } } +uint64 * +ptr_at(uint64 x, uint64 *values) +{ + return &values[x]; +} + +CTEST2(vector, map_elts_to_ptrs) +{ + uint64 values[10]; + for (uint64 i = 0; i < ARRAY_SIZE(values); i++) { + values[i] = i; + } + + uint64_ptr_vector ptrs; + vector_init(&ptrs, platform_get_heap_id()); + platform_status rc = + VECTOR_MAP_ELTS_TO_PTRS(&ptrs, ptr_at, &data->ten, values); + + ASSERT_TRUE(SUCCESS(rc)); + ASSERT_EQUAL(10, vector_length(&ptrs)); + for (int i = 0; i < vector_length(&ptrs); i++) { + ASSERT_TRUE(vector_get(&ptrs, i) == &values[i]); + } + vector_deinit(&ptrs); +} + +uint64 * +ptr_at_fail_after_5(uint64 x, uint64 *values) +{ + if (x < 5) { + return &values[x]; + } + return NULL; +} + +CTEST2(vector, map_elts_to_ptrs_fails_on_null) +{ + uint64 values[10]; + for (uint64 i = 0; i < ARRAY_SIZE(values); i++) { + values[i] = i; + } + + uint64_ptr_vector ptrs; + vector_init(&ptrs, platform_get_heap_id()); + platform_status rc = + VECTOR_MAP_ELTS_TO_PTRS(&ptrs, ptr_at_fail_after_5, &data->ten, values); + + ASSERT_TRUE(STATUS_IS_EQ(rc, STATUS_NO_MEMORY)); + ASSERT_EQUAL(5, vector_length(&ptrs)); + for (int i = 0; i < vector_length(&ptrs); i++) { + ASSERT_TRUE(vector_get(&ptrs, i) == &values[i]); + } + vector_deinit(&ptrs); +} + uint64 add(uint64 acc, uint64_vector *v, uint64 idx) { diff --git a/tests/unit/writable_buffer_test.c b/tests/unit/writable_buffer_test.c index dd978046..a3740c0b 100644 --- a/tests/unit/writable_buffer_test.c +++ b/tests/unit/writable_buffer_test.c @@ -9,6 +9,7 @@ * ----------------------------------------------------------------------------- */ #include "platform_units.h" +#include "platform_threads.h" #include "config.h" #include "unit_tests.h" #include "ctest.h" // This is required for all test-case files.