From 31c739853332bd962a991832404586fd7913bd32 Mon Sep 17 00:00:00 2001 From: caomengxuan666 <2507560089@qq.com> Date: Sun, 3 May 2026 15:08:53 +0800 Subject: [PATCH] Add MPMC comparison benchmark --- .gitignore | 1 + CMakeLists.txt | 52 ++++- README.md | 58 +++++- README.zh.md | 61 +++++- benchmarks/bench_mpsc_vs_mpmc_google.cpp | 236 +++++++++++++++++++++++ 5 files changed, 398 insertions(+), 10 deletions(-) create mode 100644 benchmarks/bench_mpsc_vs_mpmc_google.cpp diff --git a/.gitignore b/.gitignore index 1f99f9d..65f4a46 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ compile_commands.json CTestTestfile.cmake _deps CMakeUserPresets.json +out/ # CLion # JetBrains specific template is maintained in a separate JetBrains.gitignore that can diff --git a/CMakeLists.txt b/CMakeLists.txt index 2d7b224..1b28383 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,8 +17,20 @@ FetchContent_Declare( FIND_PACKAGE_ARGS NAMES benchmark CONFIG ) set(BENCHMARK_ENABLE_TESTING OFF) +set(BENCHMARK_ENABLE_WERROR OFF CACHE BOOL "" FORCE) FetchContent_MakeAvailable(googlebenchmark) +if(TARGET benchmark) + target_compile_options(benchmark PRIVATE + $<$:-Wno-c2y-extensions -Wno-invalid-offsetof> + ) +endif() +if(TARGET benchmark_main) + target_compile_options(benchmark_main PRIVATE + $<$:-Wno-c2y-extensions -Wno-invalid-offsetof> + ) +endif() + # Google Test enable_testing() FetchContent_Declare( @@ -39,8 +51,19 @@ set(HDR_HISTOGRAM_BUILD_SHARED OFF CACHE BOOL "" FORCE) set(HDR_LOG_REQUIRED "DISABLED" CACHE STRING "" FORCE) FetchContent_MakeAvailable(hdrhistogram) +# moodycamel ConcurrentQueue +FetchContent_Declare( + concurrentqueue + URL https://github.com/cameron314/concurrentqueue/archive/refs/tags/v1.0.5.zip +) +FetchContent_MakeAvailable(concurrentqueue) + find_package(Threads REQUIRED) +set(ATOMIC_LIBRARY + $<$>,$>>:atomic> +) + set(COMMON_TARGET_PROPERTIES PRIVATE ${DEFAULT_RELEASE_OPTS} @@ -58,7 +81,7 @@ target_link_libraries(mpsc_bench_throughput PRIVATE benchmark::benchmark_main Threads::Threads - $<$>:atomic> + ${ATOMIC_LIBRARY} ) target_compile_options(mpsc_bench_throughput ${COMMON_TARGET_PROPERTIES}) @@ -74,7 +97,7 @@ target_link_libraries(mpsc_bench_latency benchmark::benchmark_main hdr_histogram_static Threads::Threads - $<$>:atomic> + ${ATOMIC_LIBRARY} ) target_compile_options(mpsc_bench_latency ${COMMON_TARGET_PROPERTIES}) @@ -88,24 +111,39 @@ target_link_libraries(mpsc_bench_linearizable PRIVATE benchmark::benchmark_main Threads::Threads - $<$>:atomic> + ${ATOMIC_LIBRARY} ) target_compile_options(mpsc_bench_linearizable ${COMMON_TARGET_PROPERTIES}) +add_executable(mpsc_vs_mpmc_benchmark benchmarks/bench_mpsc_vs_mpmc_google.cpp) +target_include_directories(mpsc_vs_mpmc_benchmark + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${CMAKE_CURRENT_SOURCE_DIR} + ${concurrentqueue_SOURCE_DIR} +) +target_link_libraries(mpsc_vs_mpmc_benchmark + PRIVATE + benchmark::benchmark_main + Threads::Threads + ${ATOMIC_LIBRARY} +) +target_compile_options(mpsc_vs_mpmc_benchmark ${COMMON_TARGET_PROPERTIES}) + # TEST add_executable(mpsc_tests tests/test_MPSC.cpp) target_include_directories(mpsc_tests PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_CURRENT_SOURCE_DIR}) -target_link_libraries(mpsc_tests PRIVATE GTest::gtest_main GTest::gmock Threads::Threads $<$>:atomic>) +target_link_libraries(mpsc_tests PRIVATE GTest::gtest_main GTest::gmock Threads::Threads ${ATOMIC_LIBRARY}) include(GoogleTest) gtest_discover_tests(mpsc_tests) #EXAMPLE add_executable(mpsc_log_system_example examples/log_system.cpp) target_include_directories(mpsc_log_system_example PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_CURRENT_SOURCE_DIR}) -target_link_libraries(mpsc_log_system_example PRIVATE Threads::Threads $<$>:atomic>) +target_link_libraries(mpsc_log_system_example PRIVATE Threads::Threads ${ATOMIC_LIBRARY}) target_compile_options(mpsc_log_system_example ${COMMON_TARGET_PROPERTIES}) add_executable(mpsc_command_dispatcher_example examples/command_dispatcher.cpp) target_include_directories(mpsc_command_dispatcher_example PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include ${CMAKE_CURRENT_SOURCE_DIR}) -target_link_libraries(mpsc_command_dispatcher_example PRIVATE Threads::Threads $<$>:atomic>) -target_compile_options(mpsc_command_dispatcher_example ${COMMON_TARGET_PROPERTIES}) \ No newline at end of file +target_link_libraries(mpsc_command_dispatcher_example PRIVATE Threads::Threads ${ATOMIC_LIBRARY}) +target_compile_options(mpsc_command_dispatcher_example ${COMMON_TARGET_PROPERTIES}) diff --git a/README.md b/README.md index ffdb08f..e4f1c74 100644 --- a/README.md +++ b/README.md @@ -186,8 +186,64 @@ This scenario tests the throughput when producers use the `enqueue_bulk` interfa | **moodycamel** | **2** | **1** | **68.9161** | **Best performance in this section (P=2)** | | | moodycamel | 16 | 1 | 43.4848 | No bulk advantage at 16P | +**Part IV: Additional Google Benchmark Comparison (Clang Release)** -**Part IV: Enqueue/Dequeue Latency** +This comparison is produced by `mpsc_vs_mpmc_benchmark`, using Google Benchmark in a Clang Release build. It uses `moodycamel::ConcurrentQueue` v1.0.5 as a generic MPMC baseline under the same MPSC workloads. + +Run on (20 X 2688 MHz CPU s) +CPU Caches: +L1 Data 48 KiB (x10) +L1 Instruction 32 KiB (x10) +L2 Unified 1280 KiB (x10) +L3 Unified 24576 KiB (x1) +Compiler: Clang 22.1.1 Release + +Command: + +```powershell +cmake -S . -B out/build/clang-local -G Ninja -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=Release +cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark +.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.1s --benchmark_repetitions=1 --benchmark_counters_tabular=true +``` + +Uniform single-element enqueue: + +| Queue | P (Producers) | C (Consumer) | **Throughput (M items/s)** | +| :--- | :--- | :--- | :--- | +| **daking** | 1 | 1 | **86.78** | +| daking | 2 | 1 | **32.29** | +| daking | 4 | 1 | **32.52** | +| daking | 8 | 1 | 31.16 | +| moodycamel | 1 | 1 | 22.74 | +| moodycamel | 2 | 1 | 27.82 | +| moodycamel | 4 | 1 | 29.79 | +| **moodycamel** | 8 | 1 | **32.07** | + +Uneven sequential burst: + +| Queue | P (Producers) | C (Consumer) | Relay % | **Throughput (M items/s)** | +| :--- | :--- | :--- | :--- | :--- | +| daking | 4 | 1 | $50.0\%$ | **33.89** | +| daking | 4 | 1 | $90.0\%$ | **61.72** | +| daking | 4 | 1 | $98.0\%$ | **73.72** | +| moodycamel | 4 | 1 | $50.0\%$ | 23.84 | +| moodycamel | 4 | 1 | $90.0\%$ | 22.25 | +| moodycamel | 4 | 1 | $98.0\%$ | 17.64 | + +Bulk enqueue: + +| Queue | P (Producers) | C (Consumer) | **Throughput (M items/s)** | +| :--- | :--- | :--- | :--- | +| **daking** | 1 | 1 | **102.25** | +| **daking** | 2 | 1 | **102.37** | +| **daking** | 4 | 1 | **85.23** | +| **daking** | 8 | 1 | **77.16** | +| moodycamel | 1 | 1 | 17.02 | +| moodycamel | 2 | 1 | 18.85 | +| moodycamel | 4 | 1 | 18.06 | +| moodycamel | 8 | 1 | 16.02 | + +**Part V: Enqueue/Dequeue Latency** (Based on HdrHistogram, Test on Linux) We get below performance: diff --git a/README.zh.md b/README.zh.md index 4f83f92..3422e84 100644 --- a/README.zh.md +++ b/README.zh.md @@ -186,7 +186,64 @@ Compiler: MSVC -O2 | **moodycamel** | **2** | **1** | **68.9161** | **本部分最高表现 (P=2)** | | moodycamel | 16 | 1 | 43.4848 | 16P时无明显批量优势 | -**第四部分:Enqueue/Dequeue Latency** +**第四部分:额外 Google Benchmark 对比(Clang Release)** + +此对比由 `mpsc_vs_mpmc_benchmark` 产生,使用 Google Benchmark 和 Clang Release 构建。这里将 `moodycamel::ConcurrentQueue` v1.0.5 作为通用 MPMC 基线,并在相同 MPSC 负载下进行对比。 + +Run on (20 X 2688 MHz CPU s) +CPU Caches: +L1 Data 48 KiB (x10) +L1 Instruction 32 KiB (x10) +L2 Unified 1280 KiB (x10) +L3 Unified 24576 KiB (x1) +Compiler: Clang 22.1.1 Release + +运行命令: + +```powershell +cmake -S . -B out/build/clang-local -G Ninja -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=Release +cmake --build out/build/clang-local --target mpsc_vs_mpmc_benchmark +.\out\build\clang-local\mpsc_vs_mpmc_benchmark.exe --benchmark_min_time=0.1s --benchmark_repetitions=1 --benchmark_counters_tabular=true +``` + +均匀单元素入队: + +| 队列 | P (生产者) | C (消费者) | **吞吐量 (M items/s)** | +| :--- | :--- | :--- | :--- | +| **daking** | 1 | 1 | **86.78** | +| daking | 2 | 1 | **32.29** | +| daking | 4 | 1 | **32.52** | +| daking | 8 | 1 | 31.16 | +| moodycamel | 1 | 1 | 22.74 | +| moodycamel | 2 | 1 | 27.82 | +| moodycamel | 4 | 1 | 29.79 | +| **moodycamel** | 8 | 1 | **32.07** | + +不均匀顺序爆发: + +| 队列 | P (生产者) | C (消费者) | 接力百分比 | **吞吐量 (M items/s)** | +| :--- | :--- | :--- | :--- | :--- | +| daking | 4 | 1 | $50.0\%$ | **33.89** | +| daking | 4 | 1 | $90.0\%$ | **61.72** | +| daking | 4 | 1 | $98.0\%$ | **73.72** | +| moodycamel | 4 | 1 | $50.0\%$ | 23.84 | +| moodycamel | 4 | 1 | $90.0\%$ | 22.25 | +| moodycamel | 4 | 1 | $98.0\%$ | 17.64 | + +批量入队: + +| 队列 | P (生产者) | C (消费者) | **吞吐量 (M items/s)** | +| :--- | :--- | :--- | :--- | +| **daking** | 1 | 1 | **102.25** | +| **daking** | 2 | 1 | **102.37** | +| **daking** | 4 | 1 | **85.23** | +| **daking** | 8 | 1 | **77.16** | +| moodycamel | 1 | 1 | 17.02 | +| moodycamel | 2 | 1 | 18.85 | +| moodycamel | 4 | 1 | 18.06 | +| moodycamel | 8 | 1 | 16.02 | + +**第五部分:Enqueue/Dequeue Latency** (此部分基于HdrHistogram,在Linux平台测试) 我们得到了以下延迟表现: @@ -323,4 +380,4 @@ daking::MPSC_queue queue3; ## 许可证 (LICENSE) -MPSC\_queue 使用 [MIT 许可证](https://www.google.com/search?q=./LICENSE.txt) 授权。 \ No newline at end of file +MPSC\_queue 使用 [MIT 许可证](https://www.google.com/search?q=./LICENSE.txt) 授权。 diff --git a/benchmarks/bench_mpsc_vs_mpmc_google.cpp b/benchmarks/bench_mpsc_vs_mpmc_google.cpp new file mode 100644 index 0000000..d02c6e0 --- /dev/null +++ b/benchmarks/bench_mpsc_vs_mpmc_google.cpp @@ -0,0 +1,236 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "concurrentqueue.h" +#include "daking/MPSC_queue.hpp" + +namespace { + +constexpr std::size_t kTotalOps = 2000000; +constexpr std::size_t kBulkSize = 32; + +struct Message { + int producer_id; + std::uint64_t sequence; +}; + +template +struct QueueTraits; + +template +struct QueueTraits> { + static constexpr const char* name = "daking_mpsc"; + + static void enqueue(daking::MPSC_queue& queue, T value) { + queue.enqueue(std::move(value)); + } + + static bool try_dequeue(daking::MPSC_queue& queue, T& value) { + return queue.try_dequeue(value); + } + + template + static void enqueue_bulk(daking::MPSC_queue& queue, Iterator begin, std::size_t count) { + queue.enqueue_bulk(begin, count); + } +}; + +template +struct QueueTraits> { + static constexpr const char* name = "moody_mpmc"; + + static void enqueue(moodycamel::ConcurrentQueue& queue, T value) { + queue.enqueue(std::move(value)); + } + + static bool try_dequeue(moodycamel::ConcurrentQueue& queue, T& value) { + return queue.try_dequeue(value); + } + + template + static void enqueue_bulk(moodycamel::ConcurrentQueue& queue, Iterator begin, std::size_t count) { + queue.enqueue_bulk(begin, count); + } +}; + +template +void drain_consumer(Queue& queue, std::size_t total_items, std::atomic_bool& start) { + using Traits = QueueTraits; + + Message message{}; + std::size_t popped_count = 0; + while (!start.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + while (popped_count < total_items) { + if (Traits::try_dequeue(queue, message)) { + ++popped_count; + } + else { + std::this_thread::yield(); + } + } +} + +template +void run_uniform_single(std::size_t producer_count) { + using Traits = QueueTraits; + + Queue queue; + std::atomic_bool start{ false }; + const std::size_t items_per_producer = kTotalOps / producer_count; + const std::size_t total_items = items_per_producer * producer_count; + + std::thread consumer(drain_consumer, std::ref(queue), total_items, std::ref(start)); + + std::vector producers; + producers.reserve(producer_count); + for (std::size_t producer = 0; producer < producer_count; ++producer) { + producers.emplace_back([&, producer] { + while (!start.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + for (std::uint64_t sequence = 0; sequence < items_per_producer; ++sequence) { + Traits::enqueue(queue, Message{ static_cast(producer), sequence }); + } + }); + } + + start.store(true, std::memory_order_release); + for (auto& producer : producers) { + producer.join(); + } + consumer.join(); +} + +template +void run_uneven_wave(std::size_t relay_denominator) { + using Traits = QueueTraits; + + Queue queue; + constexpr std::size_t producer_count = 4; + const std::size_t items_per_producer = kTotalOps / producer_count; + const std::size_t total_items = items_per_producer * producer_count; + std::vector producer_start(producer_count); + std::atomic_bool consumer_start{ false }; + + std::thread consumer(drain_consumer, std::ref(queue), total_items, std::ref(consumer_start)); + + std::vector producers; + producers.reserve(producer_count); + for (std::size_t producer = 0; producer < producer_count; ++producer) { + producers.emplace_back([&, producer] { + while (!producer_start[producer].load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + const std::size_t relay_at = items_per_producer - items_per_producer / relay_denominator; + for (std::uint64_t sequence = 0; sequence < items_per_producer; ++sequence) { + Traits::enqueue(queue, Message{ static_cast(producer), sequence }); + if (sequence == relay_at && producer + 1 < producer_count) { + producer_start[producer + 1].store(true, std::memory_order_release); + } + } + }); + } + + consumer_start.store(true, std::memory_order_release); + producer_start[0].store(true, std::memory_order_release); + + for (auto& producer : producers) { + producer.join(); + } + consumer.join(); +} + +template +void run_bulk(std::size_t producer_count) { + using Traits = QueueTraits; + + Queue queue; + std::atomic_bool start{ false }; + const std::size_t items_per_producer = kTotalOps / producer_count; + const std::size_t total_items = items_per_producer * producer_count; + + std::thread consumer(drain_consumer, std::ref(queue), total_items, std::ref(start)); + + std::vector producers; + producers.reserve(producer_count); + for (std::size_t producer = 0; producer < producer_count; ++producer) { + producers.emplace_back([&, producer] { + std::array batch{}; + for (std::size_t index = 0; index < batch.size(); ++index) { + batch[index] = Message{ static_cast(producer), static_cast(index) }; + } + + while (!start.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + for (std::size_t sent = 0; sent < items_per_producer; sent += kBulkSize) { + const std::size_t count = std::min(kBulkSize, items_per_producer - sent); + Traits::enqueue_bulk(queue, batch.begin(), count); + } + }); + } + + start.store(true, std::memory_order_release); + for (auto& producer : producers) { + producer.join(); + } + consumer.join(); +} + +template +void bm_uniform_single(benchmark::State& state) { + const std::size_t producer_count = static_cast(state.range(0)); + for (auto _ : state) { + run_uniform_single(producer_count); + } + state.SetItemsProcessed(static_cast(kTotalOps * state.iterations())); + state.SetLabel(std::string(QueueTraits::name) + ", uniform single enqueue"); +} + +template +void bm_uneven_wave(benchmark::State& state) { + const std::size_t relay_denominator = static_cast(state.range(0)); + for (auto _ : state) { + run_uneven_wave(relay_denominator); + } + state.SetItemsProcessed(static_cast(kTotalOps * state.iterations())); + state.SetLabel( + std::string(QueueTraits::name) + + ", 4P uneven relay " + + std::to_string(1.0 - 1.0 / static_cast(relay_denominator)) + ); +} + +template +void bm_bulk(benchmark::State& state) { + const std::size_t producer_count = static_cast(state.range(0)); + for (auto _ : state) { + run_bulk(producer_count); + } + state.SetItemsProcessed(static_cast(kTotalOps * state.iterations())); + state.SetLabel(std::string(QueueTraits::name) + ", bulk enqueue"); +} + +using DakingQueue = daking::MPSC_queue; +using MoodyQueue = moodycamel::ConcurrentQueue; + +} // namespace + +BENCHMARK_TEMPLATE(bm_uniform_single, DakingQueue)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->UseRealTime(); +BENCHMARK_TEMPLATE(bm_uniform_single, MoodyQueue)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->UseRealTime(); + +BENCHMARK_TEMPLATE(bm_uneven_wave, DakingQueue)->Arg(2)->Arg(10)->Arg(50)->UseRealTime(); +BENCHMARK_TEMPLATE(bm_uneven_wave, MoodyQueue)->Arg(2)->Arg(10)->Arg(50)->UseRealTime(); + +BENCHMARK_TEMPLATE(bm_bulk, DakingQueue)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->UseRealTime(); +BENCHMARK_TEMPLATE(bm_bulk, MoodyQueue)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->UseRealTime();