diff --git a/parallel/spmc.hpp b/parallel/spmc.hpp index eb2fa0da..1ddd4f34 100644 --- a/parallel/spmc.hpp +++ b/parallel/spmc.hpp @@ -17,12 +17,35 @@ #ifndef CRUFT_UTIL_PARALLEL_SPMC_HPP #define CRUFT_UTIL_PARALLEL_SPMC_HPP +#include "../maths.hpp" + #include +#include + namespace util::parallel { + /// A statically sized thread-safe, lock-free, multi-producer + /// multi-consumer queue for trivial types. + /// + /// \tparam ValueT the type of values to store + /// \tparam SizeV the number of elements in the underlying store template class spmc { public: + // We optimise specifically for memcpyable types only + static_assert ( + std::is_trivially_destructible_v && std::is_trivially_copyable_v + ); + + // We need to wrap indices to the bounds of the data array quite often. + // If it's a power of two then we can use bitmasks to avoid the + // modulus. + static_assert (util::is_pow2 (SizeV)); + + // We need at least 2 data slots; one for read, and one for write + // pointers. 4 gives a little safety margin for future optimisations. + static_assert (SizeV >= 4); + spmc (): m_write (0), m_read { {0}, {0} } @@ -59,8 +82,10 @@ namespace util::parallel { return false; // try to bump the write cursor to claim that slot - if (uint32_t orig = curr; !m_write.compare_exchange_weak (orig, next)) + if (uint32_t orig = curr; !m_write.compare_exchange_weak (orig, next)) { + asm volatile ("pause;"); continue; + } // write the data to the buffer m_data[wrap (curr)] = val; diff --git a/test/parallel/spmc.cpp b/test/parallel/spmc.cpp index 89286389..4dad89d2 100644 --- a/test/parallel/spmc.cpp +++ b/test/parallel/spmc.cpp @@ -1,6 +1,8 @@ #include "parallel/spmc.hpp" #include "job/flag.hpp" +#include "job/semaphore.hpp" #include "debug.hpp" +#include "tap.hpp" #include #include @@ -8,8 +10,8 @@ static constexpr uint32_t slots = 4; -static constexpr int parallelism = 15; -static constexpr int chunk_size = 1<<16; +static constexpr int parallelism = 8; +static constexpr int chunk_size = 1<<12; util::job::flag start; @@ -17,7 +19,22 @@ using queue_t = util::parallel::spmc; /////////////////////////////////////////////////////////////////////////////// -void transfer (queue_t &src, std::vector &dst) +// push numbers in the range [first,first+chunk_size) to the queue +void +produce (queue_t &dst, const int first) +{ + start.wait (); + + for (int i = first; i < first + chunk_size; ++i) + while (!dst.push (i)) + ; +} + + +//----------------------------------------------------------------------------- +// pop chunk_size elements from the queue to a vector +void +consume (queue_t &src, util::view dst) { (void)dst; start.wait (); @@ -33,40 +50,54 @@ void transfer (queue_t &src, std::vector &dst) /////////////////////////////////////////////////////////////////////////////// +// * create n producers which push the c integers over [n*c,n*(c+1)) into a queue +// * create n consumers which pop c integers from a queue into a store +// * ensure all integers we expected are in the store +// +// we use an artificially small queue size to force contention. +// +// to stress test we increase the threads, increase the data length, and +// decrease the queue size. int main () { + util::TAP::logger tap; queue_t src; - std::vector> dst (parallelism + 1); - std::vector workers; + // start n consumers, and n producers that fill an array with integers + std::vector dst (parallelism * chunk_size); + std::vector consumers; + std::vector producers; for (int i = 0; i < parallelism; ++i) { - dst[i].resize (chunk_size); - workers.emplace_back (transfer, std::ref (src), std::ref (dst[i])); + consumers.emplace_back ( + consume, + std::ref (src), + util::view{ + dst.data() + i * chunk_size, + chunk_size + } + ); + + producers.emplace_back (produce, std::ref (src), i * chunk_size); } start.notify (); - for (int i = 0; i < parallelism * chunk_size; ++i) { - while (!src.push (i)) - ; - } - - for (auto &t: workers) + // wait for everyone to complete + for (auto &t: producers) + t.join (); + for (auto &t: consumers) t.join (); - std::vector tally; - for (auto &d: dst) - tally.insert (tally.end (), d.begin (), d.end ()); - - std::sort (tally.begin (), tally.end ()); + // sort the result and ensure all values are present + std::sort (dst.begin (), dst.end ()); int missing = 0; for (int i = 0; i < parallelism * chunk_size; ++i) - if (tally[i] != i) + if (dst[i] != i) ++missing; - std::clog << "missing: " << missing << '\n'; - return 0; + tap.expect_eq (missing, 0, "no lost values in parallel produce/consume queue"); + return tap.status (); }