/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 2018 Danny Robson <danny@nerdcruft.net> */ #ifndef CRUFT_UTIL_PARALLEL_QUEUE_HPP #define CRUFT_UTIL_PARALLEL_QUEUE_HPP #include "../maths.hpp" #include <atomic> #include <type_traits> namespace cruft::parallel { /// A statically sized thread-safe, lock-free, multi-producer /// multi-consumer queue for trivial types. /// /// \tparam ValueT the type of values to store /// \tparam SizeV the number of elements in the underlying store template <typename ValueT, uint32_t SizeV = 32> class queue { public: // We optimise specifically for memcpyable types only static_assert ( std::is_trivially_destructible_v<ValueT> && std::is_trivially_copyable_v<ValueT> ); // We need to wrap indices to the bounds of the data array quite often. // If it's a power of two then we can use bitmasks to avoid the // modulus. static_assert (cruft::is_pow2 (SizeV)); // We need at least 2 data slots; one for read, and one for write // pointers. 4 gives a little safety margin for future optimisations. static_assert (SizeV >= 4); queue (): m_write (0), m_read { {0}, {0} } { ; } static constexpr auto capacity (void) { return SizeV; } /// attempts to push a value to the queue /// /// algorithm proceeds as follows: /// * check if there appears to be space /// * increment the write pointer /// * serialise the data /// * update the high read pointer /// /// if updating the write pointer fails we bail and return false. /// else, we will be able to commit the data and return true. bool push [[nodiscard]] (const ValueT &val) { do { // find our current and next indices, checking there is // actually room available to store. if not, retry. uint32_t const curr = m_write; uint32_t const next = curr + 1; // we _must_ wrap the indices here to account for the // following case: // // |lo--------wr| // // wr's increment won't be arithmetically equal to lo, but // they have equivalent wrapped indices. if (wrap (next) == wrap (m_read.lo)) return false; // try to bump the write cursor to claim that slot if (uint32_t orig = curr; !m_write.compare_exchange_weak (orig, next)) { asm volatile ("pause;"); continue; } // write the data to the buffer m_data[wrap (curr)] = val; // keep trying to bump the new high read pointer. if it // doesnt succeed then it means that another write is pending // and should be realised _very_ soon, so spin on this. do { uint32_t orig = curr; if (m_read.hi.compare_exchange_weak (orig, next)) break; asm volatile ("pause;"); } while (1); return true; } while (1); } /// attempts to pop a value off the queue into the provided reference /// /// algorithm proceeds as follows: /// * check if there appears to be space /// * optimistically read the data /// * update the read pointer just past this slot /// /// if any part of the operation fails then we bail and return false. /// importantly: the contents of the provided reference may have been /// overwritten, but should not be used unless true was returned. bool pop [[nodiscard]] (ValueT &out) { do { // find the current and next read indices. if there aren't // any read slots available then retry. uint32_t curr = m_read.lo; uint32_t next = curr + 1; if (wrap (curr) == wrap (m_read.hi)) return false; // optimistically read the data into the destination pointer out = m_data[wrap (curr)]; // try to bump the read pointer past the entry we just read. // if this succeeds then indicate success, else retry the // entire sequence. if (uint32_t orig = curr; m_read.lo.compare_exchange_weak (orig, next)) return true; asm volatile ("pause;"); } while (1); } private: ValueT m_data[SizeV]; static constexpr uint32_t wrap (uint32_t idx) { return idx % SizeV; } /// all indices are not wrapped to the data size. instead they are /// allowed to overflow. it all works out the same in the end. std::atomic<uint32_t> m_write; struct { std::atomic<uint32_t> hi; std::atomic<uint32_t> lo; } m_read; }; } #endif