libcruft-util/job/queue.hpp

217 lines
6.2 KiB
C++

/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright 2017-2018 Danny Robson <danny@nerdcruft.net>
*/
#ifndef CRUFT_UTIL_JOB_QUEUE_HPP
#define CRUFT_UTIL_JOB_QUEUE_HPP
#include "../pool.hpp"
#include "../thread/ticketlock.hpp"
#include "../thread/semaphore.hpp"
#include "../thread/flag.hpp"
#include "../thread/monitor.hpp"
#include "../parallel/queue.hpp"
#include <array>
#include <deque>
#include <thread>
#include <vector>
#include <new>
#include <cstddef>
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
namespace util::job {
class queue {
public:
queue ();
queue (unsigned thread_count, unsigned pool_size);
~queue ();
queue (const queue&) = delete;
queue (queue&&) = delete;
queue& operator= (const queue&) = delete;
queue& operator= (queue&&) = delete;
auto parallelism (void) const { return m_threads.size (); }
auto capacity (void) const { return m_tasks.store.capacity (); }
struct task;
struct [[nodiscard]] cookie {
~cookie ()
{
if (data) {
data->done.wait ();
data->references.release ();
}
}
task&
operator-> (void)
{
return *data;
}
cookie (task &_data):
data (&_data)
{ ; }
cookie (cookie &&rhs):
data (nullptr)
{
std::swap (data, rhs.data);
}
cookie& operator= (cookie&&) = delete;
cookie (const cookie&) = delete;
cookie& operator= (const cookie&) = delete;
task *data;
};
template <class Function, typename ...Args>
cookie
submit (task &parent, Function&&, Args&&...);
/// record a functor and a set of parameters to execute at some point
/// in the future by an arbitrary available thread.
template <class Function, typename ...Args>
cookie
submit (Function &&func, Args &&...args)
{
CHECK (!m_stopping);
auto ptr = m_tasks.store.construct (
std::forward<Function> (func),
std::forward<Args> (args)...
);
m_tasks.pending->push_back (ptr);
m_pending.release ();
return cookie (*ptr);
}
// block until all jobs currently queued have been started
void flush (void);
// block until there are no more jobs queued or executing
void finish (void);
/// stores a functor and associated arguments in a fixed size buffer
/// for later execution.
///
/// for ease of implementation the arguments are currently restricted
/// as follows:
/// * trivial types (memcpy'able)
/// * a fixed total maximum size of around one cache line
/// these limitations will be eliminated at some point in the future
///
/// the user supplied functor is wrapped with our own that unpacks and
/// forwards the arguments from the data buffer. this function must
/// be passed a copy of the current arg object as the only argument.
struct task {
task () = default;
~task ()
{
done.notify ();
references.acquire ();
}
task (const task&) = delete;
task (task&&) = delete;
task& operator= (const task&) = delete;
task& operator= (task&&) = delete;
template <class FunctionT, typename ...Args>
task (FunctionT &&func, Args&&...params)
{
static_assert ((std::is_trivially_copyable_v<std::decay_t<Args>> && ...));
using tuple_t = std::tuple<
std::decay_t<FunctionT>,
std::tuple<
std::decay_t<Args>...
>
>;
{
static_assert (sizeof (tuple_t) <= sizeof data);
//tuple_t &punned = *reinterpret_cast<tuple_t*> (&data);
new (reinterpret_cast<tuple_t*> (&data)) tuple_t (
std::forward<FunctionT> (func),
{ std::forward<Args> (params)... }
);
}
function = [] (task &base) {
auto &punned = *reinterpret_cast<tuple_t*> (&base.data);
std::apply (std::get<0> (punned), std::get<1> (punned));
};
}
auto acquire (void) { return --references; }
auto release (void) { return ++references; }
// GCC: switch to hardware_destructive_interference_size when it
// becomes available in libstdc++. Until then we use a sensible
// guess.
std::array<char,64> data;
std::function<void(task&)> function;
thread::semaphore references = 0;
thread::flag done;
};
private:
void loop ();
void reap ();
std::atomic<bool> m_stopping = false;
std::atomic<int> m_running = 0;
struct {
thread::monitor<
std::deque<task*>,
thread::ticketlock
> pending;
pool<task> store;
parallel::queue<task*> finishing;
std::vector<task*> notified;
} m_tasks;
thread::semaphore m_pending;
std::vector<std::thread> m_threads;
thread::semaphore m_doomed;
std::thread m_reaper;
};
}
#endif