From f3f36668775f1e4854e2f26218a4c1f3202f6f25 Mon Sep 17 00:00:00 2001 From: Danny Robson Date: Thu, 15 Mar 2018 15:21:05 +1100 Subject: [PATCH] job/queue: rework locking for reliability --- job/queue.cpp | 81 ++++++++++--------- job/queue.hpp | 197 +++++++++++++++++++++++++++++---------------- test/job/queue.cpp | 26 ++++-- 3 files changed, 189 insertions(+), 115 deletions(-) diff --git a/job/queue.cpp b/job/queue.cpp index 97b0eee0..1d1cf7e5 100644 --- a/job/queue.cpp +++ b/job/queue.cpp @@ -11,11 +11,13 @@ * See the License for the specific language governing permissions and * limitations under the License. * - * Copyright 2017 Danny Robson + * Copyright 2018 Danny Robson */ #include "./queue.hpp" +#include "../raii.hpp" + #include using util::job::queue; @@ -29,56 +31,57 @@ queue::queue (): //----------------------------------------------------------------------------- queue::queue (unsigned thread_count): - m_loop ([] (store &s) { - args obj; - - while (true) { - // acquire the work lock and see if we need to quit, continue, - // or sleep - std::unique_lock lk (s.mutex); - if (s.pending.empty ()) { - s.cv.wait (lk, [&] () { - return s.stopping.load () || !s.pending.empty (); - }); - } - - if (s.stopping.load ()) - break; - - // extract the arguments and forward them to the functor - obj = std::move (s.pending.front ()); - s.pending.pop_front (); - lk.unlock (); - s.cv.notify_one (); - - obj.function (obj); - } - }), + m_tasks { + {}, util::pool {4096} + }, + m_pending (0), m_threads (thread_count) { for (auto &t: m_threads) - t = std::thread (m_loop, std::ref (m_store)); + t = std::thread (&queue::loop, this); } //----------------------------------------------------------------------------- queue::~queue () { - // tell everyone we want to quit - { - std::lock_guard lk {m_store.mutex}; - m_store.stopping.store (true); - } - m_store.cv.notify_all (); + m_stopping = true; + + // raise the semaphore enough times to resume all the worker threads + for (size_t i = 0; i < m_threads.size (); ++i) + m_pending.release (); // wait for everyone to tidy up. perhaps we'd like to use a timeout, but // if things deadlock then it's the users fault currently. - std::for_each ( - std::begin (m_threads), - std::end (m_threads), - [] (auto &t) - { + for (auto &t: m_threads) t.join (); - }); } + +/////////////////////////////////////////////////////////////////////////////// +void +queue::loop () +{ + while (true) { + m_pending.acquire (); + if (m_stopping) + return; + + util::scoped_counter running_count (m_running); + + CHECK (!m_tasks.pending->empty ()); + + auto todo = [this] () { + auto obj = m_tasks.pending.acquire (); + auto res = obj->front (); + obj->pop_front (); + return res; + } (); + + util::scoped_function cleanup ([&, this] () { + m_tasks.store.destroy (todo); + }); + + todo->function (*todo); + } +} diff --git a/job/queue.hpp b/job/queue.hpp index ddc3194e..f1de0337 100644 --- a/job/queue.hpp +++ b/job/queue.hpp @@ -17,6 +17,15 @@ #ifndef CRUFT_UTIL_JOB_QUEUE_HPP #define CRUFT_UTIL_JOB_QUEUE_HPP +#include "../pool.hpp" + +#include "../tuple.hpp" + +#include "ticketlock.hpp" +#include "semaphore.hpp" +#include "flag.hpp" +#include "monitor.hpp" + #include #include #include @@ -28,6 +37,8 @@ #include #include +#include + namespace util::job { class queue { public: @@ -35,66 +46,83 @@ namespace util::job { explicit queue (unsigned thread_count); ~queue (); - using cookie = int; + queue (const queue&) = delete; + queue (queue&&) = delete; + queue& operator= (const queue&) = delete; + queue& operator= (queue&&) = delete; + + auto parallelism (void) const { return m_threads.size (); } + + struct task; + + struct [[nodiscard]] cookie { + ~cookie () + { + if (data) { + data->done.wait (); + data->references.release (); + } + } + + task& + operator-> (void) + { + return *data; + } + + cookie (task &_data, queue &_runner): + data (&_data), + runner (_runner) + { ; } + + + cookie (cookie &&rhs): + data (nullptr), + runner (rhs.runner) + { + std::swap (data, rhs.data); + } + + cookie& operator= (cookie&&) = delete; + + cookie (const cookie&) = delete; + cookie& operator= (const cookie&) = delete; + + task *data; + queue &runner; + }; template cookie - submit (cookie &parent, Function&&, Args&&...); + submit (task &parent, Function&&, Args&&...); /// record a functor and a set of parameters to execute at some point /// in the future by an arbitrary available thread. template cookie - submit (Function &&func, Args &&...params) + submit (Function &&func, Args &&...args) { - { - std::unique_lock lk (m_store.mutex); - m_store.pending.emplace_back ( - std::forward (func), - std::forward (params)... - ); - } + CHECK (!m_stopping); - m_store.cv.notify_one (); + auto ptr = m_tasks.store.construct ( + std::forward (func), + std::forward (args)... + ); - // TODO: return a useful identifier to allow waiting - return 0; - } + m_tasks.pending->push_back (ptr); - void wait (cookie); + m_pending.release (); - // HACK: this doesn't actually implement a proper barrier and may not - // even guarantee that all jobs are flushed. it's provided to make - // some amount of testing slightly more convenient by draining most - // of the queue on demand. - void - flush (void) - { - // setup a cv and completion flag. this is a poor man's barrier. - std::mutex m; - std::unique_lock lk (m); - std::condition_variable cv; - std::atomic done = false; - - // submit a job to the back of the queue that sets the done flag - // and wakes us back up again. - submit ([&] (void) { - { - std::lock_guard _{m}; - done.store (true); - } - - cv.notify_one (); - }); - - // wait until the flag is set then exit. - do { - cv.wait (lk, [&] () { return done.load (); }); - } while (!done.load ()); + return cookie (*ptr, *this); } - private: + // block until all jobs currently queued have been started + void flush (void); + + // block until there are no more jobs queued or executing + void finish (void); + /// stores a functor and associated arguments in a fixed size buffer /// for later execution. /// @@ -107,46 +135,77 @@ namespace util::job { /// the user supplied functor is wrapped with our own that unpacks and /// forwards the arguments from the data buffer. this function must /// be passed a copy of the current arg object as the only argument. - struct args { - args () = default; + struct task { + task () = default; - template - args (Function &&func, Args&&...params) + ~task () { - using tuple_t = std::tuple...>; - static_assert ((std::is_trivial_v> && ...)); + done.notify (); + references.acquire (); + } + + task (const task&) = delete; + task (task&&) = delete; + task& operator= (const task&) = delete; + task& operator= (task&&) = delete; + + template + task (FunctionT &&func, Args&&...params) + { + using tuple_t = std::tuple...>; + static_assert (( + ( + std::is_trivially_copyable_v || + std::is_scalar_v> || + is_same_template_template_v + ) && ...) + ); static_assert (sizeof (tuple_t) <= sizeof data); - union { - decltype(data) *byte_ptr; - tuple_t *args_ptr; - }; - byte_ptr = &data; - *args_ptr = std::make_tuple (params...); + tuple_t &punned = *reinterpret_cast (&data); + punned = tuple_t (params...); - function = [func] (args &base) { - std::apply (func, *reinterpret_cast (&base.data)); - }; - }; + if constexpr (std::is_function_v>) { + function = [f=std::ref(func)] (task &base) { + std::apply (f, *reinterpret_cast (&base.data)); + }; + } else { + function = [func] (task &base) { + std::apply (func, *reinterpret_cast (&base.data)); + }; + } + } + + void acquire (void) { --references; } + void release (void) { ++references; } // GCC: switch to hardware_destructive_interference_size when it // becomes available in libstdc++. Until then we use a sensible // guess. std::array data; - std::function function; + std::function function; + semaphore references = 0; + flag done; }; - struct store { - std::atomic stopping = false; - std::deque pending; + private: + void loop (); - std::condition_variable cv; - std::mutex mutex; - }; + std::atomic m_stopping = false; + std::atomic m_running = 0; + + struct { + monitor< + std::deque, + ticketlock + > pending; + + pool store; + } m_tasks; + + semaphore m_pending; - store m_store; - std::function m_loop; std::vector m_threads; }; } diff --git a/test/job/queue.cpp b/test/job/queue.cpp index c17b5372..e0f38359 100644 --- a/test/job/queue.cpp +++ b/test/job/queue.cpp @@ -1,8 +1,12 @@ +/////////////////////////////////////////////////////////////////////////////// #include "job/queue.hpp" #include "tap.hpp" -#include +#include +#include + +/////////////////////////////////////////////////////////////////////////////// int main (void) { @@ -13,19 +17,27 @@ main (void) // executed, and finished. it's not definitive, but executing this many // items this many times seems reasonably reliable in exposing deadlocks. bool success = true; - constexpr int OUTTER = 16; - constexpr int INNER = 1024; + constexpr int OUTTER = 1; + constexpr int INNER = 1; for (auto i = 0; i < OUTTER && success; ++i) { std::atomic count = 0; { - util::job::queue q {}; - for (int j = 0; j < INNER; ++j) - q.submit ([&count] () noexcept { ++count; }); - q.flush (); + util::job::queue q {1}; + std::vector cookies; + for (int j = 0; j < INNER; ++j) { + cookies.push_back ( + q.submit ([&count] (int sleep_for) noexcept { + std::this_thread::sleep_for (std::chrono::microseconds (sleep_for % 25)); + ++count; + }, j) + ); + } } + std::cout << count << '\n'; + success = count == INNER && success; }