libcruft-util/parallel/queue.hpp
Danny Robson 3689b08535 parallel/queue: add nodiscard to push/pop
Users _must_ check the return value otherwise they've no indication
whether anything happened... This is a pretty common mistake because it
looks a lot like a typical push/pop operation.
2018-03-22 14:57:34 +11:00

169 lines
5.7 KiB
C++

/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright 2018 Danny Robson <danny@nerdcruft.net>
*/
#ifndef CRUFT_UTIL_PARALLEL_QUEUE_HPP
#define CRUFT_UTIL_PARALLEL_QUEUE_HPP
#include "../maths.hpp"
#include <atomic>
#include <type_traits>
namespace util::parallel {
/// A statically sized thread-safe, lock-free, multi-producer
/// multi-consumer queue for trivial types.
///
/// \tparam ValueT the type of values to store
/// \tparam SizeV the number of elements in the underlying store
template <typename ValueT, uint32_t SizeV = 32>
class queue {
public:
// We optimise specifically for memcpyable types only
static_assert (
std::is_trivially_destructible_v<ValueT> && std::is_trivially_copyable_v<ValueT>
);
// We need to wrap indices to the bounds of the data array quite often.
// If it's a power of two then we can use bitmasks to avoid the
// modulus.
static_assert (util::is_pow2 (SizeV));
// We need at least 2 data slots; one for read, and one for write
// pointers. 4 gives a little safety margin for future optimisations.
static_assert (SizeV >= 4);
queue ():
m_write (0),
m_read { {0}, {0} }
{ ; }
static constexpr auto capacity (void) { return SizeV; }
/// attempts to push a value to the queue
///
/// algorithm proceeds as follows:
/// * check if there appears to be space
/// * increment the write pointer
/// * serialise the data
/// * update the high read pointer
///
/// if updating the write pointer fails we bail and return false.
/// else, we will be able to commit the data and return true.
bool
push [[nodiscard]] (const ValueT &val)
{
do {
// find our current and next indices, checking there is
// actually room available to store. if not, retry.
uint32_t const curr = m_write;
uint32_t const next = curr + 1;
// we _must_ wrap the indices here to account for the
// following case:
//
// |lo--------wr|
//
// wr's increment won't be arithmetically equal to lo, but
// they have equivalent wrapped indices.
if (wrap (next) == wrap (m_read.lo))
return false;
// try to bump the write cursor to claim that slot
if (uint32_t orig = curr; !m_write.compare_exchange_weak (orig, next)) {
asm volatile ("pause;");
continue;
}
// write the data to the buffer
m_data[wrap (curr)] = val;
// keep trying to bump the new high read pointer. if it
// doesnt succeed then it means that another write is pending
// and should be realised _very_ soon, so spin on this.
do {
uint32_t orig = curr;
if (m_read.hi.compare_exchange_weak (orig, next))
break;
asm volatile ("pause;");
} while (1);
return true;
} while (1);
}
/// attempts to pop a value off the queue into the provided reference
///
/// algorithm proceeds as follows:
/// * check if there appears to be space
/// * optimistically read the data
/// * update the read pointer just past this slot
///
/// if any part of the operation fails then we bail and return false.
/// importantly: the contents of the provided reference may have been
/// overwritten, but should not be used unless true was returned.
bool
pop [[nodiscard]] (ValueT &out)
{
do
{
// find the current and next read indices. if there aren't
// any read slots available then retry.
uint32_t curr = m_read.lo;
uint32_t next = curr + 1;
if (wrap (curr) == wrap (m_read.hi))
return false;
// optimistically read the data into the destination pointer
out = m_data[wrap (curr)];
// try to bump the read pointer past the entry we just read.
// if this succeeds then indicate success, else retry the
// entire sequence.
if (uint32_t orig = curr; m_read.lo.compare_exchange_weak (orig, next))
return true;
asm volatile ("pause;");
} while (1);
}
private:
ValueT m_data[SizeV];
static constexpr uint32_t
wrap (uint32_t idx)
{
return idx % SizeV;
}
/// all indices are not wrapped to the data size. instead they are
/// allowed to overflow. it all works out the same in the end.
std::atomic<uint32_t> m_write;
struct {
std::atomic<uint32_t> hi;
std::atomic<uint32_t> lo;
} m_read;
};
}
#endif