/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * Copyright 2018 Danny Robson */ #include "./queue.hpp" #include "../raii.hpp" #include using util::job::queue; /////////////////////////////////////////////////////////////////////////////// queue::queue (): queue (std::thread::hardware_concurrency () ?: 1) { ; } //----------------------------------------------------------------------------- queue::queue (unsigned thread_count): m_tasks { {}, util::pool {4096} }, m_pending (0), m_threads (thread_count) { for (auto &t: m_threads) t = std::thread (&queue::loop, this); } //----------------------------------------------------------------------------- queue::~queue () { m_stopping = true; // raise the semaphore enough times to resume all the worker threads for (size_t i = 0; i < m_threads.size (); ++i) m_pending.release (); // wait for everyone to tidy up. perhaps we'd like to use a timeout, but // if things deadlock then it's the users fault currently. for (auto &t: m_threads) t.join (); } /////////////////////////////////////////////////////////////////////////////// void queue::loop () { while (true) { m_pending.acquire (); if (m_stopping) return; util::scoped_counter running_count (m_running); CHECK (!m_tasks.pending->empty ()); auto todo = [this] () { auto obj = m_tasks.pending.acquire (); auto res = obj->front (); obj->pop_front (); return res; } (); util::scoped_function cleanup ([&, this] () { m_tasks.store.destroy (todo); }); todo->function (*todo); } }