From 4773e4be210d1c870bca444e622a41d4253e2f5d Mon Sep 17 00:00:00 2001 From: Danny Robson Date: Wed, 21 Mar 2018 18:53:24 +1100 Subject: [PATCH] parallel: initial single-producer/multiple-consumer queue --- CMakeLists.txt | 3 + parallel/spmc.cpp | 0 parallel/spmc.hpp | 140 +++++++++++++++++++++++++++++++++++++++++ test/parallel/spmc.cpp | 72 +++++++++++++++++++++ 4 files changed, 215 insertions(+) create mode 100644 parallel/spmc.cpp create mode 100644 parallel/spmc.hpp create mode 100644 test/parallel/spmc.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 4fed243c..ba8b51cd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -322,6 +322,8 @@ list ( memory/deleter.cpp memory/deleter.hpp nocopy.hpp + parallel/spmc.cpp + parallel/spmc.hpp parse.cpp parse.hpp pascal.cpp @@ -509,6 +511,7 @@ if (TESTS) maths/fast matrix memory/deleter + parallel/spmc parse point polynomial diff --git a/parallel/spmc.cpp b/parallel/spmc.cpp new file mode 100644 index 00000000..e69de29b diff --git a/parallel/spmc.hpp b/parallel/spmc.hpp new file mode 100644 index 00000000..eb2fa0da --- /dev/null +++ b/parallel/spmc.hpp @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Copyright 2018 Danny Robson + */ + +#ifndef CRUFT_UTIL_PARALLEL_SPMC_HPP +#define CRUFT_UTIL_PARALLEL_SPMC_HPP + +#include + +namespace util::parallel { + template + class spmc { + public: + spmc (): + m_write (0), + m_read { {0}, {0} } + { ; } + + + /// attempts to push a value to the queue + /// + /// algorithm proceeds as follows: + /// * check if there appears to be space + /// * increment the write pointer + /// * serialise the data + /// * update the high read pointer + /// + /// if updating the write pointer fails we bail and return false. + /// else, we will be able to commit the data and return true. + bool + push (const ValueT &val) + { + do { + // find our current and next indices, checking there is + // actually room available to store. if not, retry. + uint32_t const curr = m_write; + uint32_t const next = curr + 1; + + // we _must_ wrap the indices here to account for the + // following case: + // + // |lo--------wr| + // + // wr's increment won't be arithmetically equal to lo, but + // they have equivalent wrapped indices. + if (wrap (next) == wrap (m_read.lo)) + return false; + + // try to bump the write cursor to claim that slot + if (uint32_t orig = curr; !m_write.compare_exchange_weak (orig, next)) + continue; + + // write the data to the buffer + m_data[wrap (curr)] = val; + + // keep trying to bump the new high read pointer. if it + // doesnt succeed then it means that another write is pending + // and should be realised _very_ soon, so spin on this. + do { + uint32_t orig = curr; + if (m_read.hi.compare_exchange_weak (orig, next)) + break; + asm volatile ("pause;"); + } while (1); + + return true; + } while (1); + } + + + /// attempts to pop a value off the queue into the provided reference + /// + /// algorithm proceeds as follows: + /// * check if there appears to be space + /// * optimistically read the data + /// * update the read pointer just past this slot + /// + /// if any part of the operation fails then we bail and return false. + /// importantly: the contents of the provided reference may have been + /// overwritten, but should not be used unless true was returned. + bool + pop (ValueT &out) + { + do + { + // find the current and next read indices. if there aren't + // any read slots available then retry. + uint32_t curr = m_read.lo; + uint32_t next = curr + 1; + + if (wrap (curr) == wrap (m_read.hi)) + return false; + + // optimistically read the data into the destination pointer + out = m_data[wrap (curr)]; + + // try to bump the read pointer past the entry we just read. + // if this succeeds then indicate success, else retry the + // entire sequence. + if (uint32_t orig = curr; m_read.lo.compare_exchange_weak (orig, next)) + return true; + + asm volatile ("pause;"); + } while (1); + } + + + private: + ValueT m_data[SizeV]; + + static constexpr uint32_t + wrap (uint32_t idx) + { + return idx % SizeV; + } + + /// all indices are not wrapped to the data size. instead they are + /// allowed to overflow. it all works out the same in the end. + std::atomic m_write; + + struct { + std::atomic hi; + std::atomic lo; + } m_read; + }; +} + +#endif diff --git a/test/parallel/spmc.cpp b/test/parallel/spmc.cpp new file mode 100644 index 00000000..89286389 --- /dev/null +++ b/test/parallel/spmc.cpp @@ -0,0 +1,72 @@ +#include "parallel/spmc.hpp" +#include "job/flag.hpp" +#include "debug.hpp" + +#include +#include +#include + + +static constexpr uint32_t slots = 4; +static constexpr int parallelism = 15; +static constexpr int chunk_size = 1<<16; + +util::job::flag start; + +using queue_t = util::parallel::spmc; + + +/////////////////////////////////////////////////////////////////////////////// +void transfer (queue_t &src, std::vector &dst) +{ + (void)dst; + start.wait (); + + int last; + + for (int i = 0; i < chunk_size; ++i) { + while (!src.pop (last)) + ; + dst[i] = last; + } +} + + +/////////////////////////////////////////////////////////////////////////////// +int +main () +{ + queue_t src; + + std::vector> dst (parallelism + 1); + std::vector workers; + + for (int i = 0; i < parallelism; ++i) { + dst[i].resize (chunk_size); + workers.emplace_back (transfer, std::ref (src), std::ref (dst[i])); + } + + start.notify (); + + for (int i = 0; i < parallelism * chunk_size; ++i) { + while (!src.push (i)) + ; + } + + for (auto &t: workers) + t.join (); + + std::vector tally; + for (auto &d: dst) + tally.insert (tally.end (), d.begin (), d.end ()); + + std::sort (tally.begin (), tally.end ()); + + int missing = 0; + for (int i = 0; i < parallelism * chunk_size; ++i) + if (tally[i] != i) + ++missing; + + std::clog << "missing: " << missing << '\n'; + return 0; +}