libcruft-util/job/queue.cpp

171 lines
4.6 KiB
C++

/*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright 2018-2019 Danny Robson <danny@nerdcruft.net>
*/
#include "queue.hpp"
#include "../parse/value.hpp"
#include "../scoped.hpp"
#include "../log.hpp"
using cruft::job::queue;
///////////////////////////////////////////////////////////////////////////////
unsigned
queue::default_parallelism (void) noexcept
{
try {
if (auto var = getenv ("JOB_THREADS")) {
return cruft::parse::from_string<unsigned> (var);
}
} catch (...) {
LOG_ERROR ("Unable to parse JOB_THREADS. Using the default");
}
return std::thread::hardware_concurrency ();
}
//-----------------------------------------------------------------------------
static unsigned
default_depth (void)
{
if (auto var = getenv ("JOB_DEPTH")) {
return cruft::parse::from_string<unsigned> (var);
}
return 1024;
}
///////////////////////////////////////////////////////////////////////////////
queue::queue ():
queue (default_parallelism (), default_depth ())
{ ; }
//-----------------------------------------------------------------------------
queue::queue (unsigned thread_count, unsigned pool_size):
m_tasks {
{}, cruft::pool<task> {pool_size}, {}, {}
},
m_pending (0),
m_threads (thread_count),
m_doomed (0),
m_reaper (&queue::reap, this)
{
CHECK_GE (m_tasks.finishing.capacity (), thread_count);
for (auto &t: m_threads)
t = std::thread (&queue::loop, this);
}
//-----------------------------------------------------------------------------
queue::~queue ()
{
m_stopping = true;
// raise the semaphore enough times to resume all the worker threads
for (size_t i = 0; i < m_threads.size (); ++i)
m_pending.release ();
// wait for everyone to tidy up. perhaps we'd like to use a timeout, but
// if things deadlock then it's the users fault currently.
for (auto &t: m_threads)
t.join ();
// wake the reaper up with enough tasks that it's guaranteed to resume.
// it will bail early and drain the queue, so the validity of the increment
// isn't super important.
m_doomed.release (m_tasks.finishing.capacity ());
m_reaper.join ();
}
///////////////////////////////////////////////////////////////////////////////
void
queue::loop ()
{
while (true) {
m_pending.acquire ();
if (m_stopping)
return;
cruft::scoped::increment running_count (m_running);
CHECK (!m_tasks.pending->empty ());
auto todo = [this] () {
auto obj = m_tasks.pending.acquire ();
auto res = obj->front ();
obj->pop_front ();
return res;
} ();
cruft::scoped::function cleanup ([&todo, this] () noexcept {
while (!m_tasks.finishing.push (todo))
;
m_doomed.release ();
});
todo->function (*todo);
}
}
//-----------------------------------------------------------------------------
void
queue::reap ()
{
while (true) {
// wait until we might have something to work with
m_doomed.acquire ();
if (m_stopping)
break;
// pop and notify as many doomed tasks as we can
int count = 0;
for (task *item; m_tasks.finishing.pop (item); ++count) {
item->done.notify_all ();
m_tasks.notified.push_back (item);
}
// destroy any tasks that have a zero reference count
m_tasks.notified.erase (
std::remove_if (
m_tasks.notified.begin (),
m_tasks.notified.end (),
[&] (auto i) {
if (i->references.value () < 1)
return false;
m_tasks.store.destroy (i);
return true;
}
),
m_tasks.notified.end ()
);
// subtract the number of tasks we've dealt with (remembering we've
// already claimed one in the process of waking).
m_doomed.acquire (count - 1);
}
// pre-notify everyone. this lets any observers release the task before
// we wait for them at destruction time. otherwise we tend to find
// deadlocks amongst remaining tasks.
std::vector<task*> doomed;
for (auto &i: m_tasks.notified)
m_tasks.store.destroy (i);
for (auto &i: doomed)
i->done.notify_all ();
for (auto &i: doomed)
m_tasks.store.destroy (i);
}