libcruft-util/test/parallel/queue.cpp
Danny Robson f6056153e3 rename root namespace from util to cruft
This places, at long last, the core library code into the same namespace
as the extended library code.
2018-08-05 14:42:02 +10:00

103 lines
2.5 KiB
C++

#include "parallel/queue.hpp"
#include "thread/flag.hpp"
#include "thread/semaphore.hpp"
#include "debug.hpp"
#include "tap.hpp"
#include <algorithm>
#include <thread>
static constexpr uint32_t slots = 4;
static constexpr int parallelism = 8;
static constexpr int chunk_size = 1<<12;
cruft::thread::flag start;
using queue_t = cruft::parallel::queue<int,slots>;
///////////////////////////////////////////////////////////////////////////////
// push numbers in the range [first,first+chunk_size) to the queue
void
produce (queue_t &dst, const int first)
{
start.wait ();
for (int i = first; i < first + chunk_size; ++i)
while (!dst.push (i))
;
}
//-----------------------------------------------------------------------------
// pop chunk_size elements from the queue to a vector
void
consume (queue_t &src, cruft::view<int*> dst)
{
(void)dst;
start.wait ();
int last;
for (int i = 0; i < chunk_size; ++i) {
while (!src.pop (last))
;
dst[i] = last;
}
}
///////////////////////////////////////////////////////////////////////////////
// * create n producers which push the c integers over [n*c,n*(c+1)) into a queue
// * create n consumers which pop c integers from a queue into a store
// * ensure all integers we expected are in the store
//
// we use an artificially small queue size to force contention.
//
// to stress test we increase the threads, increase the data length, and
// decrease the queue size.
int
main ()
{
cruft::TAP::logger tap;
queue_t src;
// start n consumers, and n producers that fill an array with integers
std::vector<int> dst (parallelism * chunk_size);
std::vector<std::thread> consumers;
std::vector<std::thread> producers;
for (int i = 0; i < parallelism; ++i) {
consumers.emplace_back (
consume,
std::ref (src),
cruft::view{
dst.data() + i * chunk_size,
chunk_size
}
);
producers.emplace_back (produce, std::ref (src), i * chunk_size);
}
start.notify ();
// wait for everyone to complete
for (auto &t: producers)
t.join ();
for (auto &t: consumers)
t.join ();
// sort the result and ensure all values are present
std::sort (dst.begin (), dst.end ());
int missing = 0;
for (int i = 0; i < parallelism * chunk_size; ++i)
if (dst[i] != i)
++missing;
tap.expect_eq (missing, 0, "no lost values in parallel produce/consume queue");
return tap.status ();
}