Danny Robson
b60aaccf6f
win32 builds are still totally unsupported, untested, and functionally broken.
213 lines
6.0 KiB
C++
213 lines
6.0 KiB
C++
/*
|
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
*
|
|
* Copyright 2017-2018 Danny Robson <danny@nerdcruft.net>
|
|
*/
|
|
|
|
#ifndef CRUFT_UTIL_JOB_QUEUE_HPP
|
|
#define CRUFT_UTIL_JOB_QUEUE_HPP
|
|
|
|
#include "../pool.hpp"
|
|
|
|
#include "../thread/ticketlock.hpp"
|
|
#include "../thread/semaphore.hpp"
|
|
#include "../thread/flag.hpp"
|
|
#include "../thread/monitor.hpp"
|
|
|
|
#include "../parallel/queue.hpp"
|
|
|
|
#include <array>
|
|
#include <deque>
|
|
#include <thread>
|
|
#include <vector>
|
|
#include <new>
|
|
#include <cstddef>
|
|
#include <functional>
|
|
#include <atomic>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
|
|
|
|
namespace cruft::job {
|
|
class queue {
|
|
public:
|
|
queue ();
|
|
queue (unsigned thread_count, unsigned pool_size);
|
|
~queue ();
|
|
|
|
queue (const queue&) = delete;
|
|
queue (queue&&) = delete;
|
|
queue& operator= (const queue&) = delete;
|
|
queue& operator= (queue&&) = delete;
|
|
|
|
auto parallelism (void) const { return m_threads.size (); }
|
|
auto capacity (void) const { return m_tasks.store.capacity (); }
|
|
|
|
struct task;
|
|
|
|
struct [[nodiscard]] cookie {
|
|
~cookie ()
|
|
{
|
|
if (data) {
|
|
data->done.wait ();
|
|
data->references.release ();
|
|
}
|
|
}
|
|
|
|
task&
|
|
operator-> (void)
|
|
{
|
|
return *data;
|
|
}
|
|
|
|
cookie (task &_data):
|
|
data (&_data)
|
|
{ ; }
|
|
|
|
|
|
cookie (cookie &&rhs):
|
|
data (nullptr)
|
|
{
|
|
std::swap (data, rhs.data);
|
|
}
|
|
|
|
cookie& operator= (cookie &&rhs)
|
|
{
|
|
std::swap (data, rhs.data);
|
|
return *this;
|
|
}
|
|
|
|
cookie (const cookie&) = delete;
|
|
cookie& operator= (const cookie&) = delete;
|
|
|
|
task *data;
|
|
};
|
|
|
|
template <class Function, typename ...Args>
|
|
cookie
|
|
submit (task &parent, Function&&, Args&&...);
|
|
|
|
/// record a functor and a set of parameters to execute at some point
|
|
/// in the future by an arbitrary available thread.
|
|
template <class Function, typename ...Args>
|
|
cookie
|
|
submit (Function &&func, Args &&...args)
|
|
{
|
|
CHECK (!m_stopping);
|
|
|
|
auto ptr = m_tasks.store.construct (
|
|
std::forward<Function> (func),
|
|
std::forward<Args> (args)...
|
|
);
|
|
|
|
m_tasks.pending->push_back (ptr);
|
|
|
|
m_pending.release ();
|
|
|
|
return cookie (*ptr);
|
|
}
|
|
|
|
|
|
// block until all jobs currently queued have been started
|
|
void flush (void);
|
|
|
|
// block until there are no more jobs queued or executing
|
|
void finish (void);
|
|
|
|
/// stores a functor and associated arguments in a fixed size buffer
|
|
/// for later execution.
|
|
///
|
|
/// for ease of implementation the arguments are currently restricted
|
|
/// as follows:
|
|
/// * trivial types (memcpy'able)
|
|
/// * a fixed total maximum size of around one cache line
|
|
/// these limitations will be eliminated at some point in the future
|
|
///
|
|
/// the user supplied functor is wrapped with our own that unpacks and
|
|
/// forwards the arguments from the data buffer. this function must
|
|
/// be passed a copy of the current arg object as the only argument.
|
|
struct task {
|
|
task () = default;
|
|
|
|
~task ()
|
|
{
|
|
done.notify_all ();
|
|
references.acquire ();
|
|
}
|
|
|
|
task (const task&) = delete;
|
|
task (task&&) = delete;
|
|
task& operator= (const task&) = delete;
|
|
task& operator= (task&&) = delete;
|
|
|
|
template <class FunctionT, typename ...Args>
|
|
task (FunctionT &&func, Args&&...params)
|
|
{
|
|
static_assert ((std::is_trivially_copyable_v<std::decay_t<Args>> && ...));
|
|
|
|
using tuple_t = std::tuple<
|
|
std::decay_t<FunctionT>,
|
|
std::tuple<
|
|
std::decay_t<Args>...
|
|
>
|
|
>;
|
|
|
|
{
|
|
static_assert (sizeof (tuple_t) <= sizeof data);
|
|
//tuple_t &punned = *reinterpret_cast<tuple_t*> (&data);
|
|
new (reinterpret_cast<tuple_t*> (&data)) tuple_t (
|
|
std::forward<FunctionT> (func),
|
|
{ std::forward<Args> (params)... }
|
|
);
|
|
}
|
|
|
|
function = [] (task &base) {
|
|
auto &punned = *reinterpret_cast<tuple_t*> (&base.data);
|
|
std::apply (std::get<0> (punned), std::get<1> (punned));
|
|
};
|
|
}
|
|
|
|
auto acquire (void) { return --references; }
|
|
auto release (void) { return ++references; }
|
|
|
|
// GCC: switch to hardware_destructive_interference_size when it
|
|
// becomes available in libstdc++. Until then we use a sensible
|
|
// guess.
|
|
std::aligned_storage_t<64,alignof(std::max_align_t)> data;
|
|
|
|
std::function<void(task&)> function;
|
|
thread::semaphore references = 0;
|
|
thread::flag done;
|
|
};
|
|
|
|
private:
|
|
void loop ();
|
|
void reap ();
|
|
|
|
std::atomic<bool> m_stopping = false;
|
|
std::atomic<int> m_running = 0;
|
|
|
|
struct {
|
|
thread::monitor<
|
|
std::deque<task*>,
|
|
thread::ticketlock
|
|
> pending;
|
|
|
|
pool<task> store;
|
|
parallel::queue<task*> finishing;
|
|
std::vector<task*> notified;
|
|
} m_tasks;
|
|
|
|
thread::semaphore m_pending;
|
|
|
|
std::vector<std::thread> m_threads;
|
|
|
|
thread::semaphore m_doomed;
|
|
std::thread m_reaper;
|
|
};
|
|
}
|
|
|
|
#endif
|