diff --git a/job/queue.cpp b/job/queue.cpp index 1d1cf7e5..f0014fc6 100644 --- a/job/queue.cpp +++ b/job/queue.cpp @@ -24,19 +24,17 @@ using util::job::queue; /////////////////////////////////////////////////////////////////////////////// -queue::queue (): - queue (std::thread::hardware_concurrency () ?: 1) -{ ; } - - -//----------------------------------------------------------------------------- -queue::queue (unsigned thread_count): +queue::queue (unsigned thread_count, unsigned pool_size): m_tasks { - {}, util::pool {4096} + {}, util::pool {pool_size}, {}, {} }, m_pending (0), - m_threads (thread_count) + m_threads (thread_count), + m_doomed (0), + m_reaper (&queue::reap, this) { + CHECK_GE (m_tasks.finishing.capacity (), thread_count); + for (auto &t: m_threads) t = std::thread (&queue::loop, this); } @@ -55,10 +53,18 @@ queue::~queue () // if things deadlock then it's the users fault currently. for (auto &t: m_threads) t.join (); + + // wake the reaper up with enough tasks that it's guaranteed to resume. + // it will bail early and drain the queue, so the validity of the increment + // isn't super important. + m_doomed.release (m_tasks.finishing.capacity ()); + m_reaper.join (); } /////////////////////////////////////////////////////////////////////////////// +#include + void queue::loop () { @@ -79,9 +85,62 @@ queue::loop () } (); util::scoped_function cleanup ([&, this] () { - m_tasks.store.destroy (todo); + while (!m_tasks.finishing.push (todo)) + ; + m_doomed.release (); }); todo->function (*todo); } } + + +//----------------------------------------------------------------------------- +void +queue::reap () +{ + while (true) { + // wait until we might have something to work with + m_doomed.acquire (); + if (m_stopping) + break; + + // pop and notify as many doomed tasks as we can + int count = 0; + for (task *item; m_tasks.finishing.pop (item); ++count) { + item->done.notify (); + m_tasks.notified.push_back (item); + } + + // destroy any tasks that have a zero reference count + m_tasks.notified.erase ( + std::remove_if ( + m_tasks.notified.begin (), + m_tasks.notified.end (), + [&] (auto i) { + if (i->references.value () < 1) + return false; + + m_tasks.store.destroy (i); + return true; + } + ), + m_tasks.notified.end () + ); + + // subtract the number of tasks we've dealt with (remembering we've + // already claimed one in the process of waking). + m_doomed.acquire (count - 1); + } + + // pre-notify everyone. this lets any observers release the task before + // we wait for them at destruction time. otherwise we tend to find + // deadlocks amongst remaining tasks. + std::vector doomed; + for (auto &i: m_tasks.notified) + m_tasks.store.destroy (i); + for (auto &i: doomed) + i->done.notify (); + for (auto &i: doomed) + m_tasks.store.destroy (i); +} diff --git a/job/queue.hpp b/job/queue.hpp index abd90489..cec6b93f 100644 --- a/job/queue.hpp +++ b/job/queue.hpp @@ -24,6 +24,8 @@ #include "flag.hpp" #include "monitor.hpp" +#include "../parallel/queue.hpp" + #include #include #include @@ -40,8 +42,11 @@ namespace util::job { class queue { public: - queue (); - explicit queue (unsigned thread_count); + queue (): + queue (std::thread::hardware_concurrency (), 1024) + { ; } + + queue (unsigned thread_count, unsigned pool_size); ~queue (); queue (const queue&) = delete; @@ -50,6 +55,7 @@ namespace util::job { queue& operator= (queue&&) = delete; auto parallelism (void) const { return m_threads.size (); } + auto capacity (void) const { return m_tasks.store.capacity (); } struct task; @@ -68,15 +74,13 @@ namespace util::job { return *data; } - cookie (task &_data, queue &_runner): - data (&_data), - runner (_runner) + cookie (task &_data): + data (&_data) { ; } cookie (cookie &&rhs): - data (nullptr), - runner (rhs.runner) + data (nullptr) { std::swap (data, rhs.data); } @@ -87,7 +91,6 @@ namespace util::job { cookie& operator= (const cookie&) = delete; task *data; - queue &runner; }; template @@ -111,7 +114,7 @@ namespace util::job { m_pending.release (); - return cookie (*ptr, *this); + return cookie (*ptr); } @@ -150,32 +153,32 @@ namespace util::job { template task (FunctionT &&func, Args&&...params) { - using tuple_t = std::tuple...>; - static_assert (( - ( - std::is_trivially_copyable_v || - std::is_scalar_v> || - is_same_template_template_v - ) && ...) - ); - static_assert (sizeof (tuple_t) <= sizeof data); + static_assert ((std::is_trivially_copyable_v> && ...)); - tuple_t &punned = *reinterpret_cast (&data); - punned = tuple_t (params...); + using tuple_t = std::tuple< + std::decay_t, + std::tuple< + std::decay_t... + > + >; - if constexpr (std::is_function_v>) { - function = [f=std::ref(func)] (task &base) { - std::apply (f, *reinterpret_cast (&base.data)); - }; - } else { - function = [func] (task &base) { - std::apply (func, *reinterpret_cast (&base.data)); - }; + { + static_assert (sizeof (tuple_t) <= sizeof data); + //tuple_t &punned = *reinterpret_cast (&data); + new (reinterpret_cast (&data)) tuple_t ( + std::forward (func), + { std::forward (params)... } + ); } + + function = [] (task &base) { + auto &punned = *reinterpret_cast (&base.data); + std::apply (std::get<0> (punned), std::get<1> (punned)); + }; } - void acquire (void) { --references; } - void release (void) { ++references; } + auto acquire (void) { return --references; } + auto release (void) { return ++references; } // GCC: switch to hardware_destructive_interference_size when it // becomes available in libstdc++. Until then we use a sensible @@ -189,6 +192,7 @@ namespace util::job { private: void loop (); + void reap (); std::atomic m_stopping = false; std::atomic m_running = 0; @@ -200,11 +204,16 @@ namespace util::job { > pending; pool store; + parallel::queue finishing; + std::vector notified; } m_tasks; semaphore m_pending; std::vector m_threads; + + semaphore m_doomed; + std::thread m_reaper; }; } diff --git a/test/job/queue.cpp b/test/job/queue.cpp index e0f38359..a56e9ddf 100644 --- a/test/job/queue.cpp +++ b/test/job/queue.cpp @@ -6,6 +6,14 @@ #include + +/////////////////////////////////////////////////////////////////////////////// +void sleep_inc (std::atomic &count) noexcept +{ + ++count; +} + + /////////////////////////////////////////////////////////////////////////////// int main (void) @@ -17,30 +25,28 @@ main (void) // executed, and finished. it's not definitive, but executing this many // items this many times seems reasonably reliable in exposing deadlocks. bool success = true; - constexpr int OUTTER = 1; - constexpr int INNER = 1; + constexpr int OUTTER = 4; + constexpr int INNER = 1<<10; for (auto i = 0; i < OUTTER && success; ++i) { std::atomic count = 0; { - util::job::queue q {1}; + util::job::queue q {std::thread::hardware_concurrency (), INNER}; std::vector cookies; for (int j = 0; j < INNER; ++j) { cookies.push_back ( - q.submit ([&count] (int sleep_for) noexcept { - std::this_thread::sleep_for (std::chrono::microseconds (sleep_for % 25)); - ++count; - }, j) + q.submit ( + sleep_inc, + std::ref (count) + ) ); } } - std::cout << count << '\n'; - success = count == INNER && success; } - tap.expect (success, "trivial increment jobs"); + tap.expect (success, "%! trivial increment jobs of size %!", OUTTER, INNER); return tap.status (); } \ No newline at end of file