job/queue: use a reaper thread to clear finished tasks

clearing the tasks on the worker threads can cause the queue to stall
while the cookie is notified, released, and deleted. we punt the cleanup
off to a reaper thread so that the workers can continue.
This commit is contained in:
Danny Robson 2018-03-22 14:59:03 +11:00
parent 3689b08535
commit 9bfefb3dab
3 changed files with 124 additions and 50 deletions

View File

@ -24,19 +24,17 @@ using util::job::queue;
///////////////////////////////////////////////////////////////////////////////
queue::queue ():
queue (std::thread::hardware_concurrency () ?: 1)
{ ; }
//-----------------------------------------------------------------------------
queue::queue (unsigned thread_count):
queue::queue (unsigned thread_count, unsigned pool_size):
m_tasks {
{}, util::pool<task> {4096}
{}, util::pool<task> {pool_size}, {}, {}
},
m_pending (0),
m_threads (thread_count)
m_threads (thread_count),
m_doomed (0),
m_reaper (&queue::reap, this)
{
CHECK_GE (m_tasks.finishing.capacity (), thread_count);
for (auto &t: m_threads)
t = std::thread (&queue::loop, this);
}
@ -55,10 +53,18 @@ queue::~queue ()
// if things deadlock then it's the users fault currently.
for (auto &t: m_threads)
t.join ();
// wake the reaper up with enough tasks that it's guaranteed to resume.
// it will bail early and drain the queue, so the validity of the increment
// isn't super important.
m_doomed.release (m_tasks.finishing.capacity ());
m_reaper.join ();
}
///////////////////////////////////////////////////////////////////////////////
#include <iostream>
void
queue::loop ()
{
@ -79,9 +85,62 @@ queue::loop ()
} ();
util::scoped_function cleanup ([&, this] () {
m_tasks.store.destroy (todo);
while (!m_tasks.finishing.push (todo))
;
m_doomed.release ();
});
todo->function (*todo);
}
}
//-----------------------------------------------------------------------------
void
queue::reap ()
{
while (true) {
// wait until we might have something to work with
m_doomed.acquire ();
if (m_stopping)
break;
// pop and notify as many doomed tasks as we can
int count = 0;
for (task *item; m_tasks.finishing.pop (item); ++count) {
item->done.notify ();
m_tasks.notified.push_back (item);
}
// destroy any tasks that have a zero reference count
m_tasks.notified.erase (
std::remove_if (
m_tasks.notified.begin (),
m_tasks.notified.end (),
[&] (auto i) {
if (i->references.value () < 1)
return false;
m_tasks.store.destroy (i);
return true;
}
),
m_tasks.notified.end ()
);
// subtract the number of tasks we've dealt with (remembering we've
// already claimed one in the process of waking).
m_doomed.acquire (count - 1);
}
// pre-notify everyone. this lets any observers release the task before
// we wait for them at destruction time. otherwise we tend to find
// deadlocks amongst remaining tasks.
std::vector<task*> doomed;
for (auto &i: m_tasks.notified)
m_tasks.store.destroy (i);
for (auto &i: doomed)
i->done.notify ();
for (auto &i: doomed)
m_tasks.store.destroy (i);
}

View File

@ -24,6 +24,8 @@
#include "flag.hpp"
#include "monitor.hpp"
#include "../parallel/queue.hpp"
#include <array>
#include <deque>
#include <thread>
@ -40,8 +42,11 @@
namespace util::job {
class queue {
public:
queue ();
explicit queue (unsigned thread_count);
queue ():
queue (std::thread::hardware_concurrency (), 1024)
{ ; }
queue (unsigned thread_count, unsigned pool_size);
~queue ();
queue (const queue&) = delete;
@ -50,6 +55,7 @@ namespace util::job {
queue& operator= (queue&&) = delete;
auto parallelism (void) const { return m_threads.size (); }
auto capacity (void) const { return m_tasks.store.capacity (); }
struct task;
@ -68,15 +74,13 @@ namespace util::job {
return *data;
}
cookie (task &_data, queue &_runner):
data (&_data),
runner (_runner)
cookie (task &_data):
data (&_data)
{ ; }
cookie (cookie &&rhs):
data (nullptr),
runner (rhs.runner)
data (nullptr)
{
std::swap (data, rhs.data);
}
@ -87,7 +91,6 @@ namespace util::job {
cookie& operator= (const cookie&) = delete;
task *data;
queue &runner;
};
template <class Function, typename ...Args>
@ -111,7 +114,7 @@ namespace util::job {
m_pending.release ();
return cookie (*ptr, *this);
return cookie (*ptr);
}
@ -150,32 +153,32 @@ namespace util::job {
template <class FunctionT, typename ...Args>
task (FunctionT &&func, Args&&...params)
{
using tuple_t = std::tuple<std::remove_reference_t <Args>...>;
static_assert ((
(
std::is_trivially_copyable_v<Args> ||
std::is_scalar_v<std::remove_reference_t<Args>> ||
is_same_template_template_v<std::reference_wrapper, Args>
) && ...)
);
static_assert (sizeof (tuple_t) <= sizeof data);
static_assert ((std::is_trivially_copyable_v<std::decay_t<Args>> && ...));
tuple_t &punned = *reinterpret_cast<tuple_t*> (&data);
punned = tuple_t (params...);
using tuple_t = std::tuple<
std::decay_t<FunctionT>,
std::tuple<
std::decay_t<Args>...
>
>;
if constexpr (std::is_function_v<std::remove_reference_t<FunctionT>>) {
function = [f=std::ref(func)] (task &base) {
std::apply (f, *reinterpret_cast<tuple_t*> (&base.data));
};
} else {
function = [func] (task &base) {
std::apply (func, *reinterpret_cast<tuple_t*> (&base.data));
};
{
static_assert (sizeof (tuple_t) <= sizeof data);
//tuple_t &punned = *reinterpret_cast<tuple_t*> (&data);
new (reinterpret_cast<tuple_t*> (&data)) tuple_t (
std::forward<FunctionT> (func),
{ std::forward<Args> (params)... }
);
}
function = [] (task &base) {
auto &punned = *reinterpret_cast<tuple_t*> (&base.data);
std::apply (std::get<0> (punned), std::get<1> (punned));
};
}
void acquire (void) { --references; }
void release (void) { ++references; }
auto acquire (void) { return --references; }
auto release (void) { return ++references; }
// GCC: switch to hardware_destructive_interference_size when it
// becomes available in libstdc++. Until then we use a sensible
@ -189,6 +192,7 @@ namespace util::job {
private:
void loop ();
void reap ();
std::atomic<bool> m_stopping = false;
std::atomic<int> m_running = 0;
@ -200,11 +204,16 @@ namespace util::job {
> pending;
pool<task> store;
parallel::queue<task*> finishing;
std::vector<task*> notified;
} m_tasks;
semaphore m_pending;
std::vector<std::thread> m_threads;
semaphore m_doomed;
std::thread m_reaper;
};
}

View File

@ -6,6 +6,14 @@
#include <iostream>
///////////////////////////////////////////////////////////////////////////////
void sleep_inc (std::atomic<int> &count) noexcept
{
++count;
}
///////////////////////////////////////////////////////////////////////////////
int
main (void)
@ -17,30 +25,28 @@ main (void)
// executed, and finished. it's not definitive, but executing this many
// items this many times seems reasonably reliable in exposing deadlocks.
bool success = true;
constexpr int OUTTER = 1;
constexpr int INNER = 1;
constexpr int OUTTER = 4;
constexpr int INNER = 1<<10;
for (auto i = 0; i < OUTTER && success; ++i) {
std::atomic<int> count = 0;
{
util::job::queue q {1};
util::job::queue q {std::thread::hardware_concurrency (), INNER};
std::vector<util::job::queue::cookie> cookies;
for (int j = 0; j < INNER; ++j) {
cookies.push_back (
q.submit ([&count] (int sleep_for) noexcept {
std::this_thread::sleep_for (std::chrono::microseconds (sleep_for % 25));
++count;
}, j)
q.submit (
sleep_inc,
std::ref (count)
)
);
}
}
std::cout << count << '\n';
success = count == INNER && success;
}
tap.expect (success, "trivial increment jobs");
tap.expect (success, "%! trivial increment jobs of size %!", OUTTER, INNER);
return tap.status ();
}