Danny Robson
f6056153e3
This places, at long last, the core library code into the same namespace as the extended library code.
161 lines
5.3 KiB
C++
161 lines
5.3 KiB
C++
/*
|
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
*
|
|
* Copyright 2018 Danny Robson <danny@nerdcruft.net>
|
|
*/
|
|
|
|
#ifndef CRUFT_UTIL_PARALLEL_QUEUE_HPP
|
|
#define CRUFT_UTIL_PARALLEL_QUEUE_HPP
|
|
|
|
#include "../maths.hpp"
|
|
|
|
#include <atomic>
|
|
#include <type_traits>
|
|
|
|
|
|
namespace cruft::parallel {
|
|
/// A statically sized thread-safe, lock-free, multi-producer
|
|
/// multi-consumer queue for trivial types.
|
|
///
|
|
/// \tparam ValueT the type of values to store
|
|
/// \tparam SizeV the number of elements in the underlying store
|
|
template <typename ValueT, uint32_t SizeV = 32>
|
|
class queue {
|
|
public:
|
|
// We optimise specifically for memcpyable types only
|
|
static_assert (
|
|
std::is_trivially_destructible_v<ValueT> && std::is_trivially_copyable_v<ValueT>
|
|
);
|
|
|
|
// We need to wrap indices to the bounds of the data array quite often.
|
|
// If it's a power of two then we can use bitmasks to avoid the
|
|
// modulus.
|
|
static_assert (cruft::is_pow2 (SizeV));
|
|
|
|
// We need at least 2 data slots; one for read, and one for write
|
|
// pointers. 4 gives a little safety margin for future optimisations.
|
|
static_assert (SizeV >= 4);
|
|
|
|
queue ():
|
|
m_write (0),
|
|
m_read { {0}, {0} }
|
|
{ ; }
|
|
|
|
|
|
static constexpr auto capacity (void) { return SizeV; }
|
|
|
|
|
|
/// attempts to push a value to the queue
|
|
///
|
|
/// algorithm proceeds as follows:
|
|
/// * check if there appears to be space
|
|
/// * increment the write pointer
|
|
/// * serialise the data
|
|
/// * update the high read pointer
|
|
///
|
|
/// if updating the write pointer fails we bail and return false.
|
|
/// else, we will be able to commit the data and return true.
|
|
bool
|
|
push [[nodiscard]] (const ValueT &val)
|
|
{
|
|
do {
|
|
// find our current and next indices, checking there is
|
|
// actually room available to store. if not, retry.
|
|
uint32_t const curr = m_write;
|
|
uint32_t const next = curr + 1;
|
|
|
|
// we _must_ wrap the indices here to account for the
|
|
// following case:
|
|
//
|
|
// |lo--------wr|
|
|
//
|
|
// wr's increment won't be arithmetically equal to lo, but
|
|
// they have equivalent wrapped indices.
|
|
if (wrap (next) == wrap (m_read.lo))
|
|
return false;
|
|
|
|
// try to bump the write cursor to claim that slot
|
|
if (uint32_t orig = curr; !m_write.compare_exchange_weak (orig, next)) {
|
|
asm volatile ("pause;");
|
|
continue;
|
|
}
|
|
|
|
// write the data to the buffer
|
|
m_data[wrap (curr)] = val;
|
|
|
|
// keep trying to bump the new high read pointer. if it
|
|
// doesnt succeed then it means that another write is pending
|
|
// and should be realised _very_ soon, so spin on this.
|
|
do {
|
|
uint32_t orig = curr;
|
|
if (m_read.hi.compare_exchange_weak (orig, next))
|
|
break;
|
|
asm volatile ("pause;");
|
|
} while (1);
|
|
|
|
return true;
|
|
} while (1);
|
|
}
|
|
|
|
|
|
/// attempts to pop a value off the queue into the provided reference
|
|
///
|
|
/// algorithm proceeds as follows:
|
|
/// * check if there appears to be space
|
|
/// * optimistically read the data
|
|
/// * update the read pointer just past this slot
|
|
///
|
|
/// if any part of the operation fails then we bail and return false.
|
|
/// importantly: the contents of the provided reference may have been
|
|
/// overwritten, but should not be used unless true was returned.
|
|
bool
|
|
pop [[nodiscard]] (ValueT &out)
|
|
{
|
|
do
|
|
{
|
|
// find the current and next read indices. if there aren't
|
|
// any read slots available then retry.
|
|
uint32_t curr = m_read.lo;
|
|
uint32_t next = curr + 1;
|
|
|
|
if (wrap (curr) == wrap (m_read.hi))
|
|
return false;
|
|
|
|
// optimistically read the data into the destination pointer
|
|
out = m_data[wrap (curr)];
|
|
|
|
// try to bump the read pointer past the entry we just read.
|
|
// if this succeeds then indicate success, else retry the
|
|
// entire sequence.
|
|
if (uint32_t orig = curr; m_read.lo.compare_exchange_weak (orig, next))
|
|
return true;
|
|
|
|
asm volatile ("pause;");
|
|
} while (1);
|
|
}
|
|
|
|
|
|
private:
|
|
ValueT m_data[SizeV];
|
|
|
|
static constexpr uint32_t
|
|
wrap (uint32_t idx)
|
|
{
|
|
return idx % SizeV;
|
|
}
|
|
|
|
/// all indices are not wrapped to the data size. instead they are
|
|
/// allowed to overflow. it all works out the same in the end.
|
|
std::atomic<uint32_t> m_write;
|
|
|
|
struct {
|
|
std::atomic<uint32_t> hi;
|
|
std::atomic<uint32_t> lo;
|
|
} m_read;
|
|
};
|
|
}
|
|
|
|
#endif
|