parallel/queue: comments and type checks

This commit is contained in:
Danny Robson 2018-03-22 13:02:51 +11:00
parent 4773e4be21
commit fc148fa7a4
2 changed files with 78 additions and 22 deletions

View File

@ -17,12 +17,35 @@
#ifndef CRUFT_UTIL_PARALLEL_SPMC_HPP #ifndef CRUFT_UTIL_PARALLEL_SPMC_HPP
#define CRUFT_UTIL_PARALLEL_SPMC_HPP #define CRUFT_UTIL_PARALLEL_SPMC_HPP
#include "../maths.hpp"
#include <atomic> #include <atomic>
#include <type_traits>
namespace util::parallel { namespace util::parallel {
/// A statically sized thread-safe, lock-free, multi-producer
/// multi-consumer queue for trivial types.
///
/// \tparam ValueT the type of values to store
/// \tparam SizeV the number of elements in the underlying store
template <typename ValueT, uint32_t SizeV = 32> template <typename ValueT, uint32_t SizeV = 32>
class spmc { class spmc {
public: public:
// We optimise specifically for memcpyable types only
static_assert (
std::is_trivially_destructible_v<ValueT> && std::is_trivially_copyable_v<ValueT>
);
// We need to wrap indices to the bounds of the data array quite often.
// If it's a power of two then we can use bitmasks to avoid the
// modulus.
static_assert (util::is_pow2 (SizeV));
// We need at least 2 data slots; one for read, and one for write
// pointers. 4 gives a little safety margin for future optimisations.
static_assert (SizeV >= 4);
spmc (): spmc ():
m_write (0), m_write (0),
m_read { {0}, {0} } m_read { {0}, {0} }
@ -59,8 +82,10 @@ namespace util::parallel {
return false; return false;
// try to bump the write cursor to claim that slot // try to bump the write cursor to claim that slot
if (uint32_t orig = curr; !m_write.compare_exchange_weak (orig, next)) if (uint32_t orig = curr; !m_write.compare_exchange_weak (orig, next)) {
asm volatile ("pause;");
continue; continue;
}
// write the data to the buffer // write the data to the buffer
m_data[wrap (curr)] = val; m_data[wrap (curr)] = val;

View File

@ -1,6 +1,8 @@
#include "parallel/spmc.hpp" #include "parallel/spmc.hpp"
#include "job/flag.hpp" #include "job/flag.hpp"
#include "job/semaphore.hpp"
#include "debug.hpp" #include "debug.hpp"
#include "tap.hpp"
#include <algorithm> #include <algorithm>
#include <thread> #include <thread>
@ -8,8 +10,8 @@
static constexpr uint32_t slots = 4; static constexpr uint32_t slots = 4;
static constexpr int parallelism = 15; static constexpr int parallelism = 8;
static constexpr int chunk_size = 1<<16; static constexpr int chunk_size = 1<<12;
util::job::flag start; util::job::flag start;
@ -17,7 +19,22 @@ using queue_t = util::parallel::spmc<int,slots>;
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
void transfer (queue_t &src, std::vector<int> &dst) // push numbers in the range [first,first+chunk_size) to the queue
void
produce (queue_t &dst, const int first)
{
start.wait ();
for (int i = first; i < first + chunk_size; ++i)
while (!dst.push (i))
;
}
//-----------------------------------------------------------------------------
// pop chunk_size elements from the queue to a vector
void
consume (queue_t &src, util::view<int*> dst)
{ {
(void)dst; (void)dst;
start.wait (); start.wait ();
@ -33,40 +50,54 @@ void transfer (queue_t &src, std::vector<int> &dst)
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// * create n producers which push the c integers over [n*c,n*(c+1)) into a queue
// * create n consumers which pop c integers from a queue into a store
// * ensure all integers we expected are in the store
//
// we use an artificially small queue size to force contention.
//
// to stress test we increase the threads, increase the data length, and
// decrease the queue size.
int int
main () main ()
{ {
util::TAP::logger tap;
queue_t src; queue_t src;
std::vector<std::vector<int>> dst (parallelism + 1); // start n consumers, and n producers that fill an array with integers
std::vector<std::thread> workers; std::vector<int> dst (parallelism * chunk_size);
std::vector<std::thread> consumers;
std::vector<std::thread> producers;
for (int i = 0; i < parallelism; ++i) { for (int i = 0; i < parallelism; ++i) {
dst[i].resize (chunk_size); consumers.emplace_back (
workers.emplace_back (transfer, std::ref (src), std::ref (dst[i])); consume,
std::ref (src),
util::view{
dst.data() + i * chunk_size,
chunk_size
}
);
producers.emplace_back (produce, std::ref (src), i * chunk_size);
} }
start.notify (); start.notify ();
for (int i = 0; i < parallelism * chunk_size; ++i) { // wait for everyone to complete
while (!src.push (i)) for (auto &t: producers)
; t.join ();
} for (auto &t: consumers)
for (auto &t: workers)
t.join (); t.join ();
std::vector<int> tally; // sort the result and ensure all values are present
for (auto &d: dst) std::sort (dst.begin (), dst.end ());
tally.insert (tally.end (), d.begin (), d.end ());
std::sort (tally.begin (), tally.end ());
int missing = 0; int missing = 0;
for (int i = 0; i < parallelism * chunk_size; ++i) for (int i = 0; i < parallelism * chunk_size; ++i)
if (tally[i] != i) if (dst[i] != i)
++missing; ++missing;
std::clog << "missing: " << missing << '\n'; tap.expect_eq (missing, 0, "no lost values in parallel produce/consume queue");
return 0; return tap.status ();
} }