From 320fe6c3780aabf40d1f3c6ebf736a3a51287c6d Mon Sep 17 00:00:00 2001 From: Danny Robson Date: Mon, 3 Jul 2017 17:05:01 +1000 Subject: [PATCH] job/queue: add trivial job queuing system --- CMakeLists.txt | 3 + job/queue.cpp | 84 +++++++++++++++++++++++++ job/queue.hpp | 153 +++++++++++++++++++++++++++++++++++++++++++++ test/job/queue.cpp | 34 ++++++++++ 4 files changed, 274 insertions(+) create mode 100644 job/queue.cpp create mode 100644 job/queue.hpp create mode 100644 test/job/queue.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index dae941d7..f143c93a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -306,6 +306,8 @@ list ( io.hpp io.ipp iterator.hpp + job/queue.cpp + job/queue.hpp json/fwd.hpp json/except.cpp json/except.hpp @@ -481,6 +483,7 @@ if (TESTS) hton introspection iterator + job/queue json_types maths matrix diff --git a/job/queue.cpp b/job/queue.cpp new file mode 100644 index 00000000..97b0eee0 --- /dev/null +++ b/job/queue.cpp @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Copyright 2017 Danny Robson + */ + +#include "./queue.hpp" + +#include + +using util::job::queue; + + +/////////////////////////////////////////////////////////////////////////////// +queue::queue (): + queue (std::thread::hardware_concurrency () ?: 1) +{ ; } + + +//----------------------------------------------------------------------------- +queue::queue (unsigned thread_count): + m_loop ([] (store &s) { + args obj; + + while (true) { + // acquire the work lock and see if we need to quit, continue, + // or sleep + std::unique_lock lk (s.mutex); + if (s.pending.empty ()) { + s.cv.wait (lk, [&] () { + return s.stopping.load () || !s.pending.empty (); + }); + } + + if (s.stopping.load ()) + break; + + // extract the arguments and forward them to the functor + obj = std::move (s.pending.front ()); + s.pending.pop_front (); + lk.unlock (); + s.cv.notify_one (); + + obj.function (obj); + } + }), + m_threads (thread_count) +{ + for (auto &t: m_threads) + t = std::thread (m_loop, std::ref (m_store)); +} + + +//----------------------------------------------------------------------------- +queue::~queue () +{ + // tell everyone we want to quit + { + std::lock_guard lk {m_store.mutex}; + m_store.stopping.store (true); + } + m_store.cv.notify_all (); + + // wait for everyone to tidy up. perhaps we'd like to use a timeout, but + // if things deadlock then it's the users fault currently. + std::for_each ( + std::begin (m_threads), + std::end (m_threads), + [] (auto &t) + { + t.join (); + }); +} + diff --git a/job/queue.hpp b/job/queue.hpp new file mode 100644 index 00000000..ad0432e4 --- /dev/null +++ b/job/queue.hpp @@ -0,0 +1,153 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Copyright 2017 Danny Robson + */ + +#ifndef CRUFT_UTIL_JOB_QUEUE_HPP +#define CRUFT_UTIL_JOB_QUEUE_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace util::job { + class queue { + public: + queue (); + queue (unsigned thread_count); + ~queue (); + + using cookie = int; + + template + cookie + submit (cookie &parent, Function&&, Args&&...); + + /// record a functor and a set of parameters to execute at some point + /// in the future by an arbitrary available thread. + template + cookie + submit (Function &&func, Args &&...params) + { + { + std::unique_lock lk (m_store.mutex); + m_store.pending.emplace_back ( + std::forward (func), + std::forward (params)... + ); + } + + m_store.cv.notify_one (); + + // TODO: return a useful identifier to allow waiting + return 0; + } + + void wait (cookie); + + // HACK: this doesn't actually implement a proper barrier and may not + // even guarantee that all jobs are flushed. it's provided to make + // some amount of testing slightly more convenient by draining most + // of the queue on demand. + void + flush (void) + { + // setup a cv and completion flag. this is a poor man's barrier. + std::mutex m; + std::unique_lock lk (m); + std::condition_variable cv; + std::atomic done = false; + + // submit a job to the back of the queue that sets the done flag + // and wakes us back up again. + submit ([&] (void) { + { + std::lock_guard _{m}; + done.store (true); + } + + cv.notify_one (); + }); + + // wait until the flag is set then exit. + do { + cv.wait (lk, [&] () { return done.load (); }); + } while (!done.load ()); + } + + private: + /// stores a functor and associated arguments in a fixed size buffer + /// for later execution. + /// + /// for ease of implementation the arguments are currently restricted + /// as follows: + /// * trivial types (memcpy'able) + /// * a fixed total maximum size of around one cache line + /// these limitations will be eliminated at some point in the future + /// + /// the user supplied functor is wrapped with our own that unpacks and + /// forwards the arguments from the data buffer. this function must + /// be passed a copy of the current arg object as the only argument. + struct args { + args () = default; + + template + args (Function &&func, Args&&...params) + { + using tuple_t = std::tuple; + static_assert ((std::is_trivial_v && ...)); + static_assert (sizeof (tuple_t) <= sizeof data); + + union { + decltype(data) *byte_ptr; + tuple_t *args_ptr; + }; + byte_ptr = &data; + *args_ptr = std::make_tuple (std::forward (params)...); + + function = [func] (args &base) { + std::apply (func, *reinterpret_cast (&base.data)); + }; + }; + + // GCC: switch to hardware_destructive_interference_size when it + // becomes available in libstdc++. Until then we use a sensible + // guess. + std::array data; + + std::function function; + }; + + struct store { + std::atomic stopping = false; + std::deque pending; + + std::condition_variable cv; + std::mutex mutex; + }; + + store m_store; + std::function m_loop; + std::vector m_threads; + }; +} + +#endif diff --git a/test/job/queue.cpp b/test/job/queue.cpp new file mode 100644 index 00000000..c17b5372 --- /dev/null +++ b/test/job/queue.cpp @@ -0,0 +1,34 @@ +#include "job/queue.hpp" +#include "tap.hpp" + +#include + +int +main (void) +{ + util::TAP::logger tap; + + // dispatch `INNER' simple jobs `OUTTER' times that simply increment an + // atomic variable and quit. this tests that all threads are created, + // executed, and finished. it's not definitive, but executing this many + // items this many times seems reasonably reliable in exposing deadlocks. + bool success = true; + constexpr int OUTTER = 16; + constexpr int INNER = 1024; + + for (auto i = 0; i < OUTTER && success; ++i) { + std::atomic count = 0; + + { + util::job::queue q {}; + for (int j = 0; j < INNER; ++j) + q.submit ([&count] () noexcept { ++count; }); + q.flush (); + } + + success = count == INNER && success; + } + + tap.expect (success, "trivial increment jobs"); + return tap.status (); +} \ No newline at end of file