parallel: initial single-producer/multiple-consumer queue
This commit is contained in:
parent
82d63f68b1
commit
4773e4be21
@ -322,6 +322,8 @@ list (
|
|||||||
memory/deleter.cpp
|
memory/deleter.cpp
|
||||||
memory/deleter.hpp
|
memory/deleter.hpp
|
||||||
nocopy.hpp
|
nocopy.hpp
|
||||||
|
parallel/spmc.cpp
|
||||||
|
parallel/spmc.hpp
|
||||||
parse.cpp
|
parse.cpp
|
||||||
parse.hpp
|
parse.hpp
|
||||||
pascal.cpp
|
pascal.cpp
|
||||||
@ -509,6 +511,7 @@ if (TESTS)
|
|||||||
maths/fast
|
maths/fast
|
||||||
matrix
|
matrix
|
||||||
memory/deleter
|
memory/deleter
|
||||||
|
parallel/spmc
|
||||||
parse
|
parse
|
||||||
point
|
point
|
||||||
polynomial
|
polynomial
|
||||||
|
0
parallel/spmc.cpp
Normal file
0
parallel/spmc.cpp
Normal file
140
parallel/spmc.hpp
Normal file
140
parallel/spmc.hpp
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
/*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
* Copyright 2018 Danny Robson <danny@nerdcruft.net>
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef CRUFT_UTIL_PARALLEL_SPMC_HPP
|
||||||
|
#define CRUFT_UTIL_PARALLEL_SPMC_HPP
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
|
namespace util::parallel {
|
||||||
|
template <typename ValueT, uint32_t SizeV = 32>
|
||||||
|
class spmc {
|
||||||
|
public:
|
||||||
|
spmc ():
|
||||||
|
m_write (0),
|
||||||
|
m_read { {0}, {0} }
|
||||||
|
{ ; }
|
||||||
|
|
||||||
|
|
||||||
|
/// attempts to push a value to the queue
|
||||||
|
///
|
||||||
|
/// algorithm proceeds as follows:
|
||||||
|
/// * check if there appears to be space
|
||||||
|
/// * increment the write pointer
|
||||||
|
/// * serialise the data
|
||||||
|
/// * update the high read pointer
|
||||||
|
///
|
||||||
|
/// if updating the write pointer fails we bail and return false.
|
||||||
|
/// else, we will be able to commit the data and return true.
|
||||||
|
bool
|
||||||
|
push (const ValueT &val)
|
||||||
|
{
|
||||||
|
do {
|
||||||
|
// find our current and next indices, checking there is
|
||||||
|
// actually room available to store. if not, retry.
|
||||||
|
uint32_t const curr = m_write;
|
||||||
|
uint32_t const next = curr + 1;
|
||||||
|
|
||||||
|
// we _must_ wrap the indices here to account for the
|
||||||
|
// following case:
|
||||||
|
//
|
||||||
|
// |lo--------wr|
|
||||||
|
//
|
||||||
|
// wr's increment won't be arithmetically equal to lo, but
|
||||||
|
// they have equivalent wrapped indices.
|
||||||
|
if (wrap (next) == wrap (m_read.lo))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// try to bump the write cursor to claim that slot
|
||||||
|
if (uint32_t orig = curr; !m_write.compare_exchange_weak (orig, next))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// write the data to the buffer
|
||||||
|
m_data[wrap (curr)] = val;
|
||||||
|
|
||||||
|
// keep trying to bump the new high read pointer. if it
|
||||||
|
// doesnt succeed then it means that another write is pending
|
||||||
|
// and should be realised _very_ soon, so spin on this.
|
||||||
|
do {
|
||||||
|
uint32_t orig = curr;
|
||||||
|
if (m_read.hi.compare_exchange_weak (orig, next))
|
||||||
|
break;
|
||||||
|
asm volatile ("pause;");
|
||||||
|
} while (1);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} while (1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// attempts to pop a value off the queue into the provided reference
|
||||||
|
///
|
||||||
|
/// algorithm proceeds as follows:
|
||||||
|
/// * check if there appears to be space
|
||||||
|
/// * optimistically read the data
|
||||||
|
/// * update the read pointer just past this slot
|
||||||
|
///
|
||||||
|
/// if any part of the operation fails then we bail and return false.
|
||||||
|
/// importantly: the contents of the provided reference may have been
|
||||||
|
/// overwritten, but should not be used unless true was returned.
|
||||||
|
bool
|
||||||
|
pop (ValueT &out)
|
||||||
|
{
|
||||||
|
do
|
||||||
|
{
|
||||||
|
// find the current and next read indices. if there aren't
|
||||||
|
// any read slots available then retry.
|
||||||
|
uint32_t curr = m_read.lo;
|
||||||
|
uint32_t next = curr + 1;
|
||||||
|
|
||||||
|
if (wrap (curr) == wrap (m_read.hi))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// optimistically read the data into the destination pointer
|
||||||
|
out = m_data[wrap (curr)];
|
||||||
|
|
||||||
|
// try to bump the read pointer past the entry we just read.
|
||||||
|
// if this succeeds then indicate success, else retry the
|
||||||
|
// entire sequence.
|
||||||
|
if (uint32_t orig = curr; m_read.lo.compare_exchange_weak (orig, next))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
asm volatile ("pause;");
|
||||||
|
} while (1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private:
|
||||||
|
ValueT m_data[SizeV];
|
||||||
|
|
||||||
|
static constexpr uint32_t
|
||||||
|
wrap (uint32_t idx)
|
||||||
|
{
|
||||||
|
return idx % SizeV;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// all indices are not wrapped to the data size. instead they are
|
||||||
|
/// allowed to overflow. it all works out the same in the end.
|
||||||
|
std::atomic<uint32_t> m_write;
|
||||||
|
|
||||||
|
struct {
|
||||||
|
std::atomic<uint32_t> hi;
|
||||||
|
std::atomic<uint32_t> lo;
|
||||||
|
} m_read;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
72
test/parallel/spmc.cpp
Normal file
72
test/parallel/spmc.cpp
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
#include "parallel/spmc.hpp"
|
||||||
|
#include "job/flag.hpp"
|
||||||
|
#include "debug.hpp"
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <thread>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
|
||||||
|
static constexpr uint32_t slots = 4;
|
||||||
|
static constexpr int parallelism = 15;
|
||||||
|
static constexpr int chunk_size = 1<<16;
|
||||||
|
|
||||||
|
util::job::flag start;
|
||||||
|
|
||||||
|
using queue_t = util::parallel::spmc<int,slots>;
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
void transfer (queue_t &src, std::vector<int> &dst)
|
||||||
|
{
|
||||||
|
(void)dst;
|
||||||
|
start.wait ();
|
||||||
|
|
||||||
|
int last;
|
||||||
|
|
||||||
|
for (int i = 0; i < chunk_size; ++i) {
|
||||||
|
while (!src.pop (last))
|
||||||
|
;
|
||||||
|
dst[i] = last;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
int
|
||||||
|
main ()
|
||||||
|
{
|
||||||
|
queue_t src;
|
||||||
|
|
||||||
|
std::vector<std::vector<int>> dst (parallelism + 1);
|
||||||
|
std::vector<std::thread> workers;
|
||||||
|
|
||||||
|
for (int i = 0; i < parallelism; ++i) {
|
||||||
|
dst[i].resize (chunk_size);
|
||||||
|
workers.emplace_back (transfer, std::ref (src), std::ref (dst[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
start.notify ();
|
||||||
|
|
||||||
|
for (int i = 0; i < parallelism * chunk_size; ++i) {
|
||||||
|
while (!src.push (i))
|
||||||
|
;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto &t: workers)
|
||||||
|
t.join ();
|
||||||
|
|
||||||
|
std::vector<int> tally;
|
||||||
|
for (auto &d: dst)
|
||||||
|
tally.insert (tally.end (), d.begin (), d.end ());
|
||||||
|
|
||||||
|
std::sort (tally.begin (), tally.end ());
|
||||||
|
|
||||||
|
int missing = 0;
|
||||||
|
for (int i = 0; i < parallelism * chunk_size; ++i)
|
||||||
|
if (tally[i] != i)
|
||||||
|
++missing;
|
||||||
|
|
||||||
|
std::clog << "missing: " << missing << '\n';
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user