From b607e94c8e50785a101ed114c3e4a893fc38371c Mon Sep 17 00:00:00 2001 From: Djordje Nedic Date: Thu, 7 Dec 2023 14:55:19 +0100 Subject: [PATCH 1/3] feat(tests): Introduce more multi-threaded tests This introduces multi-threaded tests for the RingBuffer, Queue and BipartiteBuf. --- tests/spsc/bipartite_buf.cpp | 51 ++++++++++++++++++++++++++++++++++++ tests/spsc/queue.cpp | 38 +++++++++++++++++++++++++++ tests/spsc/ring_buf.cpp | 49 +++++++++++++++++++++++++++++++++- 3 files changed, 137 insertions(+), 1 deletion(-) diff --git a/tests/spsc/bipartite_buf.cpp b/tests/spsc/bipartite_buf.cpp index 29d6850..0c2270f 100644 --- a/tests/spsc/bipartite_buf.cpp +++ b/tests/spsc/bipartite_buf.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "lockfree.hpp" @@ -234,3 +235,53 @@ TEST_CASE("std::span API test", "[bb_std_span_api]") { REQUIRE(read_pair.first == read_span.data()); } + +TEST_CASE("Multithreaded read/write multiple", "[bb_multithread_multiple]") { + std::vector threads; + lockfree::spsc::BipartiteBuf bb; + std::vector written; + std::vector read; + + const size_t data_size = 59; // Intentionally a prime number + const size_t elements_to_transfer = 2048; + + // consumer + threads.emplace_back([&]() { + size_t read_count = 0; + do { + std::pair read_region = bb.ReadAcquire(); + if (read_region.second) { + read.insert(read.end(), &read_region.first[0], + &read_region.first[read_region.second]); + bb.ReadRelease(read_region.second); + read_count += read_region.second; + } + } while (read_count < elements_to_transfer); + }); + + // producer + threads.emplace_back([&]() { + unsigned int data[data_size] = {0}; + for (unsigned int i = 0; i < data_size; i++) { + data[i] = rand(); + } + + size_t write_count = 0; + do { + unsigned int *write_region = bb.WriteAcquire(data_size); + if (write_region != nullptr) { + std::copy(&data[0], &data[data_size], write_region); + bb.WriteRelease(data_size); + written.insert(written.end(), &data[0], &data[data_size]); + write_count += data_size; + } + } while (write_count < elements_to_transfer); + }); + + for (auto &t : threads) { + t.join(); + } + + REQUIRE( + std::equal(std::begin(written), std::end(written), std::begin(read))); +} diff --git a/tests/spsc/queue.cpp b/tests/spsc/queue.cpp index b9c29e3..3b32fbb 100644 --- a/tests/spsc/queue.cpp +++ b/tests/spsc/queue.cpp @@ -1,5 +1,6 @@ #include #include +#include #include @@ -87,3 +88,40 @@ TEST_CASE("Optional API", "[q_optional_api]") { REQUIRE(queue.PopOptional() == -1024); } + +TEST_CASE("Multithreaded read/write", "[q_multithread]") { + std::vector threads; + lockfree::spsc::Queue queue; + std::vector written; + std::vector read; + + // consumer + threads.emplace_back([&]() { + uint64_t element = 0; + do { + bool read_success = queue.Pop(element); + if (read_success) { + read.push_back(element); + } + } while (element < 2047); + }); + + // producer + threads.emplace_back([&]() { + uint64_t element = 0; + do { + bool push_success = queue.Push(element); + if (push_success) { + written.push_back(element); + element++; + } + } while (element < 2048); + }); + + for (auto &t : threads) { + t.join(); + } + + REQUIRE( + std::equal(std::begin(written), std::end(written), std::begin(read))); +} diff --git a/tests/spsc/ring_buf.cpp b/tests/spsc/ring_buf.cpp index 326d7a2..9d08f84 100644 --- a/tests/spsc/ring_buf.cpp +++ b/tests/spsc/ring_buf.cpp @@ -324,7 +324,7 @@ TEST_CASE("Peek std::span", "[rb_peek_span]") { std::begin(test_data_read))); } -TEST_CASE("Multithreaded read/write", "[rb_multi]") { +TEST_CASE("Multithreaded read/write", "[rb_multithread]") { std::vector threads; lockfree::spsc::RingBuf rb; std::vector written; @@ -357,3 +357,50 @@ TEST_CASE("Multithreaded read/write", "[rb_multi]") { REQUIRE( std::equal(std::begin(written), std::end(written), std::begin(read))); } + +TEST_CASE("Multithreaded read/write multiple", "[rb_multithread_multiple]") { + std::vector threads; + lockfree::spsc::RingBuf rb; + std::vector written; + std::vector read; + + const size_t data_size = 59; // Intentionally a prime number + const size_t elements_to_transfer = 2048; + + // consumer + threads.emplace_back([&]() { + unsigned int data[data_size] = {0}; + size_t read_count = 0; + do { + bool read_success = rb.Read(data, data_size); + if (read_success) { + read.insert(read.end(), &data[0], &data[data_size]); + read_count += data_size; + } + } while (read_count < elements_to_transfer); + }); + + // producer + threads.emplace_back([&]() { + unsigned int data[data_size] = {0}; + for (unsigned int i = 0; i < data_size; i++) { + data[i] = rand(); + } + + size_t write_count = 0; + do { + bool write_success = rb.Write(data, data_size); + if (write_success) { + written.insert(written.end(), &data[0], &data[data_size]); + write_count += data_size; + } + } while (write_count < elements_to_transfer); + }); + + for (auto &t : threads) { + t.join(); + } + + REQUIRE( + std::equal(std::begin(written), std::end(written), std::begin(read))); +} From d879260a2eb66a6d4c298b6da6d6f800c2e8b9c3 Mon Sep 17 00:00:00 2001 From: Djordje Nedic Date: Thu, 7 Dec 2023 15:04:50 +0100 Subject: [PATCH 2/3] feat(tests): Introduce tests README --- tests/README.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 tests/README.md diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..8959917 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,26 @@ +# Tests + +The library contains tests for all data structures and their respective features. +Each data structure has it's own test file, split into `spsc` and `mpmc` folders. + +## Building and running + +In order to build tests, simply run CMake in the library root: +``` +cmake -B build +cmake --build build +``` + +> **Note:** Due to `std::span` tests, a compiler with C++20 support is required for building tests + +After that, you can run tests either with `ctest`: +``` +ctest --output-on-failure --test-dir build/tests +``` +or by executing the `build/tests/tests` binary. + +## Writing tests +If adding a new feature, or fixing a bug, it is necessary to add tests in order to avoid future regressions. +You can take a look at existing tests for examples. + +[Catch2](https://github.com/catchorg/Catch2) is used as the testing framework, you can read the documentation of the library [here](https://github.com/catchorg/Catch2/blob/devel/docs/tutorial.md#writing-tests). From 37bd4c35a4e4c91a1a7d4b5cab56fb6a1e09f56c Mon Sep 17 00:00:00 2001 From: Djordje Nedic Date: Thu, 7 Dec 2023 15:05:13 +0100 Subject: [PATCH 3/3] feat(tests): Parametrize multi-threaded tests --- tests/CMakeLists.txt | 6 ++++++ tests/README.md | 9 +++++++++ tests/spsc/bipartite_buf.cpp | 5 ++--- tests/spsc/queue.cpp | 4 ++-- tests/spsc/ring_buf.cpp | 9 ++++----- 5 files changed, 23 insertions(+), 10 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 1ed8eeb..beb4a90 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -22,6 +22,12 @@ add_executable(tests mpmc/priority_queue.cpp ) +if (NOT DEFINED TEST_MT_TRANSFER_CNT) + set(TEST_MT_TRANSFER_CNT 10240) +endif() + +target_compile_definitions(tests PRIVATE TEST_MT_TRANSFER_CNT=${TEST_MT_TRANSFER_CNT}) + # Required in order to test the std::span API as well target_compile_features(tests PRIVATE cxx_std_20) diff --git a/tests/README.md b/tests/README.md index 8959917..265a04f 100644 --- a/tests/README.md +++ b/tests/README.md @@ -19,6 +19,15 @@ ctest --output-on-failure --test-dir build/tests ``` or by executing the `build/tests/tests` binary. +## Multi-threaded tests +Apart from regular unit tests, the library also contains multi-threaded tests. +As these tests are not deterministic by their nature, and can give false negatives, the number of elements copied is parametrized. + +To define the number of elements to transfer in multi-threaded tests, pass the `TEST_MT_TRANSFER_CNT` variable to CMake: +``` +cmake -DTEST_MT_TRANSFER_CNT=100000 -B build +``` + ## Writing tests If adding a new feature, or fixing a bug, it is necessary to add tests in order to avoid future regressions. You can take a look at existing tests for examples. diff --git a/tests/spsc/bipartite_buf.cpp b/tests/spsc/bipartite_buf.cpp index 0c2270f..f90c34f 100644 --- a/tests/spsc/bipartite_buf.cpp +++ b/tests/spsc/bipartite_buf.cpp @@ -243,7 +243,6 @@ TEST_CASE("Multithreaded read/write multiple", "[bb_multithread_multiple]") { std::vector read; const size_t data_size = 59; // Intentionally a prime number - const size_t elements_to_transfer = 2048; // consumer threads.emplace_back([&]() { @@ -256,7 +255,7 @@ TEST_CASE("Multithreaded read/write multiple", "[bb_multithread_multiple]") { bb.ReadRelease(read_region.second); read_count += read_region.second; } - } while (read_count < elements_to_transfer); + } while (read_count < TEST_MT_TRANSFER_CNT); }); // producer @@ -275,7 +274,7 @@ TEST_CASE("Multithreaded read/write multiple", "[bb_multithread_multiple]") { written.insert(written.end(), &data[0], &data[data_size]); write_count += data_size; } - } while (write_count < elements_to_transfer); + } while (write_count < TEST_MT_TRANSFER_CNT); }); for (auto &t : threads) { diff --git a/tests/spsc/queue.cpp b/tests/spsc/queue.cpp index 3b32fbb..6b6480f 100644 --- a/tests/spsc/queue.cpp +++ b/tests/spsc/queue.cpp @@ -103,7 +103,7 @@ TEST_CASE("Multithreaded read/write", "[q_multithread]") { if (read_success) { read.push_back(element); } - } while (element < 2047); + } while (element < TEST_MT_TRANSFER_CNT); }); // producer @@ -115,7 +115,7 @@ TEST_CASE("Multithreaded read/write", "[q_multithread]") { written.push_back(element); element++; } - } while (element < 2048); + } while (element < TEST_MT_TRANSFER_CNT + 1); }); for (auto &t : threads) { diff --git a/tests/spsc/ring_buf.cpp b/tests/spsc/ring_buf.cpp index 9d08f84..8e2f45f 100644 --- a/tests/spsc/ring_buf.cpp +++ b/tests/spsc/ring_buf.cpp @@ -338,7 +338,7 @@ TEST_CASE("Multithreaded read/write", "[rb_multithread]") { if (read_success) { read.push_back(data[0]); } - } while (data[0] < 2047); + } while (data[0] < TEST_MT_TRANSFER_CNT); }); // producer threads.emplace_back([&]() { @@ -349,7 +349,7 @@ TEST_CASE("Multithreaded read/write", "[rb_multithread]") { written.push_back(cnt); cnt++; } - } while (cnt < 2048); + } while (cnt < TEST_MT_TRANSFER_CNT + 1); }); for (auto &t : threads) { t.join(); @@ -365,7 +365,6 @@ TEST_CASE("Multithreaded read/write multiple", "[rb_multithread_multiple]") { std::vector read; const size_t data_size = 59; // Intentionally a prime number - const size_t elements_to_transfer = 2048; // consumer threads.emplace_back([&]() { @@ -377,7 +376,7 @@ TEST_CASE("Multithreaded read/write multiple", "[rb_multithread_multiple]") { read.insert(read.end(), &data[0], &data[data_size]); read_count += data_size; } - } while (read_count < elements_to_transfer); + } while (read_count < TEST_MT_TRANSFER_CNT); }); // producer @@ -394,7 +393,7 @@ TEST_CASE("Multithreaded read/write multiple", "[rb_multithread_multiple]") { written.insert(written.end(), &data[0], &data[data_size]); write_count += data_size; } - } while (write_count < elements_to_transfer); + } while (write_count < TEST_MT_TRANSFER_CNT); }); for (auto &t : threads) {