job/queue: rework locking for reliability

This commit is contained in:
Danny Robson 2018-03-15 15:21:05 +11:00
parent 0c824919de
commit f3f3666877
3 changed files with 189 additions and 115 deletions

View File

@ -11,11 +11,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
* *
* Copyright 2017 Danny Robson <danny@nerdcruft.net> * Copyright 2018 Danny Robson <danny@nerdcruft.net>
*/ */
#include "./queue.hpp" #include "./queue.hpp"
#include "../raii.hpp"
#include <iostream> #include <iostream>
using util::job::queue; using util::job::queue;
@ -29,56 +31,57 @@ queue::queue ():
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
queue::queue (unsigned thread_count): queue::queue (unsigned thread_count):
m_loop ([] (store &s) { m_tasks {
args obj; {}, util::pool<task> {4096}
},
while (true) { m_pending (0),
// acquire the work lock and see if we need to quit, continue,
// or sleep
std::unique_lock<std::mutex> lk (s.mutex);
if (s.pending.empty ()) {
s.cv.wait (lk, [&] () {
return s.stopping.load () || !s.pending.empty ();
});
}
if (s.stopping.load ())
break;
// extract the arguments and forward them to the functor
obj = std::move (s.pending.front ());
s.pending.pop_front ();
lk.unlock ();
s.cv.notify_one ();
obj.function (obj);
}
}),
m_threads (thread_count) m_threads (thread_count)
{ {
for (auto &t: m_threads) for (auto &t: m_threads)
t = std::thread (m_loop, std::ref (m_store)); t = std::thread (&queue::loop, this);
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
queue::~queue () queue::~queue ()
{ {
// tell everyone we want to quit m_stopping = true;
{
std::lock_guard<std::mutex> lk {m_store.mutex}; // raise the semaphore enough times to resume all the worker threads
m_store.stopping.store (true); for (size_t i = 0; i < m_threads.size (); ++i)
} m_pending.release ();
m_store.cv.notify_all ();
// wait for everyone to tidy up. perhaps we'd like to use a timeout, but // wait for everyone to tidy up. perhaps we'd like to use a timeout, but
// if things deadlock then it's the users fault currently. // if things deadlock then it's the users fault currently.
std::for_each ( for (auto &t: m_threads)
std::begin (m_threads),
std::end (m_threads),
[] (auto &t)
{
t.join (); t.join ();
});
} }
///////////////////////////////////////////////////////////////////////////////
void
queue::loop ()
{
while (true) {
m_pending.acquire ();
if (m_stopping)
return;
util::scoped_counter running_count (m_running);
CHECK (!m_tasks.pending->empty ());
auto todo = [this] () {
auto obj = m_tasks.pending.acquire ();
auto res = obj->front ();
obj->pop_front ();
return res;
} ();
util::scoped_function cleanup ([&, this] () {
m_tasks.store.destroy (todo);
});
todo->function (*todo);
}
}

View File

@ -17,6 +17,15 @@
#ifndef CRUFT_UTIL_JOB_QUEUE_HPP #ifndef CRUFT_UTIL_JOB_QUEUE_HPP
#define CRUFT_UTIL_JOB_QUEUE_HPP #define CRUFT_UTIL_JOB_QUEUE_HPP
#include "../pool.hpp"
#include "../tuple.hpp"
#include "ticketlock.hpp"
#include "semaphore.hpp"
#include "flag.hpp"
#include "monitor.hpp"
#include <array> #include <array>
#include <deque> #include <deque>
#include <thread> #include <thread>
@ -28,6 +37,8 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <iostream>
namespace util::job { namespace util::job {
class queue { class queue {
public: public:
@ -35,66 +46,83 @@ namespace util::job {
explicit queue (unsigned thread_count); explicit queue (unsigned thread_count);
~queue (); ~queue ();
using cookie = int; queue (const queue&) = delete;
queue (queue&&) = delete;
queue& operator= (const queue&) = delete;
queue& operator= (queue&&) = delete;
auto parallelism (void) const { return m_threads.size (); }
struct task;
struct [[nodiscard]] cookie {
~cookie ()
{
if (data) {
data->done.wait ();
data->references.release ();
}
}
task&
operator-> (void)
{
return *data;
}
cookie (task &_data, queue &_runner):
data (&_data),
runner (_runner)
{ ; }
cookie (cookie &&rhs):
data (nullptr),
runner (rhs.runner)
{
std::swap (data, rhs.data);
}
cookie& operator= (cookie&&) = delete;
cookie (const cookie&) = delete;
cookie& operator= (const cookie&) = delete;
task *data;
queue &runner;
};
template <class Function, typename ...Args> template <class Function, typename ...Args>
cookie cookie
submit (cookie &parent, Function&&, Args&&...); submit (task &parent, Function&&, Args&&...);
/// record a functor and a set of parameters to execute at some point /// record a functor and a set of parameters to execute at some point
/// in the future by an arbitrary available thread. /// in the future by an arbitrary available thread.
template <class Function, typename ...Args> template <class Function, typename ...Args>
cookie cookie
submit (Function &&func, Args &&...params) submit (Function &&func, Args &&...args)
{ {
{ CHECK (!m_stopping);
std::unique_lock<std::mutex> lk (m_store.mutex);
m_store.pending.emplace_back ( auto ptr = m_tasks.store.construct (
std::forward<Function> (func), std::forward<Function> (func),
std::forward<Args> (params)... std::forward<Args> (args)...
); );
}
m_store.cv.notify_one (); m_tasks.pending->push_back (ptr);
// TODO: return a useful identifier to allow waiting m_pending.release ();
return 0;
}
void wait (cookie); return cookie (*ptr, *this);
// HACK: this doesn't actually implement a proper barrier and may not
// even guarantee that all jobs are flushed. it's provided to make
// some amount of testing slightly more convenient by draining most
// of the queue on demand.
void
flush (void)
{
// setup a cv and completion flag. this is a poor man's barrier.
std::mutex m;
std::unique_lock<std::mutex> lk (m);
std::condition_variable cv;
std::atomic<bool> done = false;
// submit a job to the back of the queue that sets the done flag
// and wakes us back up again.
submit ([&] (void) {
{
std::lock_guard<std::mutex> _{m};
done.store (true);
}
cv.notify_one ();
});
// wait until the flag is set then exit.
do {
cv.wait (lk, [&] () { return done.load (); });
} while (!done.load ());
} }
private: // block until all jobs currently queued have been started
void flush (void);
// block until there are no more jobs queued or executing
void finish (void);
/// stores a functor and associated arguments in a fixed size buffer /// stores a functor and associated arguments in a fixed size buffer
/// for later execution. /// for later execution.
/// ///
@ -107,46 +135,77 @@ namespace util::job {
/// the user supplied functor is wrapped with our own that unpacks and /// the user supplied functor is wrapped with our own that unpacks and
/// forwards the arguments from the data buffer. this function must /// forwards the arguments from the data buffer. this function must
/// be passed a copy of the current arg object as the only argument. /// be passed a copy of the current arg object as the only argument.
struct args { struct task {
args () = default; task () = default;
template <class Function, typename ...Args> ~task ()
args (Function &&func, Args&&...params)
{ {
using tuple_t = std::tuple<std::decay_t<Args>...>; done.notify ();
static_assert ((std::is_trivial_v<std::decay_t<decltype(params)>> && ...)); references.acquire ();
}
task (const task&) = delete;
task (task&&) = delete;
task& operator= (const task&) = delete;
task& operator= (task&&) = delete;
template <class FunctionT, typename ...Args>
task (FunctionT &&func, Args&&...params)
{
using tuple_t = std::tuple<std::remove_reference_t <Args>...>;
static_assert ((
(
std::is_trivially_copyable_v<Args> ||
std::is_scalar_v<std::remove_reference_t<Args>> ||
is_same_template_template_v<std::reference_wrapper, Args>
) && ...)
);
static_assert (sizeof (tuple_t) <= sizeof data); static_assert (sizeof (tuple_t) <= sizeof data);
union { tuple_t &punned = *reinterpret_cast<tuple_t*> (&data);
decltype(data) *byte_ptr; punned = tuple_t (params...);
tuple_t *args_ptr;
};
byte_ptr = &data;
*args_ptr = std::make_tuple (params...);
function = [func] (args &base) { if constexpr (std::is_function_v<std::remove_reference_t<FunctionT>>) {
function = [f=std::ref(func)] (task &base) {
std::apply (f, *reinterpret_cast<tuple_t*> (&base.data));
};
} else {
function = [func] (task &base) {
std::apply (func, *reinterpret_cast<tuple_t*> (&base.data)); std::apply (func, *reinterpret_cast<tuple_t*> (&base.data));
}; };
}; }
}
void acquire (void) { --references; }
void release (void) { ++references; }
// GCC: switch to hardware_destructive_interference_size when it // GCC: switch to hardware_destructive_interference_size when it
// becomes available in libstdc++. Until then we use a sensible // becomes available in libstdc++. Until then we use a sensible
// guess. // guess.
std::array<char,64> data; std::array<char,64> data;
std::function<void(args&)> function; std::function<void(task&)> function;
semaphore references = 0;
flag done;
}; };
struct store { private:
std::atomic<bool> stopping = false; void loop ();
std::deque<args> pending;
std::condition_variable cv; std::atomic<bool> m_stopping = false;
std::mutex mutex; std::atomic<int> m_running = 0;
};
struct {
monitor<
std::deque<task*>,
ticketlock
> pending;
pool<task> store;
} m_tasks;
semaphore m_pending;
store m_store;
std::function<void(store&)> m_loop;
std::vector<std::thread> m_threads; std::vector<std::thread> m_threads;
}; };
} }

View File

@ -1,8 +1,12 @@
///////////////////////////////////////////////////////////////////////////////
#include "job/queue.hpp" #include "job/queue.hpp"
#include "tap.hpp" #include "tap.hpp"
#include <unistd.h> #include <chrono>
#include <iostream>
///////////////////////////////////////////////////////////////////////////////
int int
main (void) main (void)
{ {
@ -13,18 +17,26 @@ main (void)
// executed, and finished. it's not definitive, but executing this many // executed, and finished. it's not definitive, but executing this many
// items this many times seems reasonably reliable in exposing deadlocks. // items this many times seems reasonably reliable in exposing deadlocks.
bool success = true; bool success = true;
constexpr int OUTTER = 16; constexpr int OUTTER = 1;
constexpr int INNER = 1024; constexpr int INNER = 1;
for (auto i = 0; i < OUTTER && success; ++i) { for (auto i = 0; i < OUTTER && success; ++i) {
std::atomic<int> count = 0; std::atomic<int> count = 0;
{ {
util::job::queue q {}; util::job::queue q {1};
for (int j = 0; j < INNER; ++j) std::vector<util::job::queue::cookie> cookies;
q.submit ([&count] () noexcept { ++count; }); for (int j = 0; j < INNER; ++j) {
q.flush (); cookies.push_back (
q.submit ([&count] (int sleep_for) noexcept {
std::this_thread::sleep_for (std::chrono::microseconds (sleep_for % 25));
++count;
}, j)
);
} }
}
std::cout << count << '\n';
success = count == INNER && success; success = count == INNER && success;
} }