/* * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * * Copyright 2018-2019 Danny Robson */ #include "queue.hpp" #include "../parse/value.hpp" #include "../scoped.hpp" #include "../log.hpp" using cruft::job::queue; /////////////////////////////////////////////////////////////////////////////// unsigned queue::default_parallelism (void) noexcept { try { if (auto var = getenv ("JOB_THREADS")) { return cruft::parse::from_string (var); } } catch (...) { LOG_ERROR ("Unable to parse JOB_THREADS. Using the default"); } return std::thread::hardware_concurrency (); } //----------------------------------------------------------------------------- static unsigned default_depth (void) { if (auto var = getenv ("JOB_DEPTH")) { return cruft::parse::from_string (var); } return 1024; } /////////////////////////////////////////////////////////////////////////////// queue::queue (): queue (default_parallelism (), default_depth ()) { ; } //----------------------------------------------------------------------------- queue::queue (unsigned thread_count, unsigned pool_size): m_tasks { {}, cruft::pool {pool_size}, {}, {} }, m_pending (0), m_threads (thread_count), m_doomed (0), m_reaper (&queue::reap, this) { CHECK_GE (m_tasks.finishing.capacity (), thread_count); for (auto &t: m_threads) t = std::thread (&queue::loop, this); } //----------------------------------------------------------------------------- queue::~queue () { m_stopping = true; // raise the semaphore enough times to resume all the worker threads for (size_t i = 0; i < m_threads.size (); ++i) m_pending.release (); // wait for everyone to tidy up. perhaps we'd like to use a timeout, but // if things deadlock then it's the users fault currently. for (auto &t: m_threads) t.join (); // wake the reaper up with enough tasks that it's guaranteed to resume. // it will bail early and drain the queue, so the validity of the increment // isn't super important. m_doomed.release (m_tasks.finishing.capacity ()); m_reaper.join (); } /////////////////////////////////////////////////////////////////////////////// void queue::loop () { while (true) { m_pending.acquire (); if (m_stopping) return; cruft::scoped::increment running_count (m_running); CHECK (!m_tasks.pending->empty ()); auto todo = [this] () { auto obj = m_tasks.pending.acquire (); auto res = obj->front (); obj->pop_front (); return res; } (); cruft::scoped::function cleanup ([&todo, this] () noexcept { while (!m_tasks.finishing.push (todo)) ; m_doomed.release (); }); todo->function (*todo); } } //----------------------------------------------------------------------------- void queue::reap () { while (true) { // wait until we might have something to work with m_doomed.acquire (); if (m_stopping) break; // pop and notify as many doomed tasks as we can int count = 0; for (task *item; m_tasks.finishing.pop (item); ++count) { item->done.notify_all (); m_tasks.notified.push_back (item); } // destroy any tasks that have a zero reference count m_tasks.notified.erase ( std::remove_if ( m_tasks.notified.begin (), m_tasks.notified.end (), [&] (auto i) { if (i->references.value () < 1) return false; m_tasks.store.destroy (i); return true; } ), m_tasks.notified.end () ); // subtract the number of tasks we've dealt with (remembering we've // already claimed one in the process of waking). m_doomed.acquire (count - 1); } // pre-notify everyone. this lets any observers release the task before // we wait for them at destruction time. otherwise we tend to find // deadlocks amongst remaining tasks. std::vector doomed; for (auto &i: m_tasks.notified) m_tasks.store.destroy (i); for (auto &i: doomed) i->done.notify_all (); for (auto &i: doomed) m_tasks.store.destroy (i); }