/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * Copyright 2017-2018 Danny Robson */ #ifndef CRUFT_UTIL_JOB_QUEUE_HPP #define CRUFT_UTIL_JOB_QUEUE_HPP #include "../pool.hpp" #include "../thread/ticketlock.hpp" #include "../thread/semaphore.hpp" #include "../thread/flag.hpp" #include "../thread/monitor.hpp" #include "../parallel/queue.hpp" #include #include #include #include #include #include #include #include #include #include namespace util::job { class queue { public: queue (); queue (unsigned thread_count, unsigned pool_size); ~queue (); queue (const queue&) = delete; queue (queue&&) = delete; queue& operator= (const queue&) = delete; queue& operator= (queue&&) = delete; auto parallelism (void) const { return m_threads.size (); } auto capacity (void) const { return m_tasks.store.capacity (); } struct task; struct [[nodiscard]] cookie { ~cookie () { if (data) { data->done.wait (); data->references.release (); } } task& operator-> (void) { return *data; } cookie (task &_data): data (&_data) { ; } cookie (cookie &&rhs): data (nullptr) { std::swap (data, rhs.data); } cookie& operator= (cookie&&) = delete; cookie (const cookie&) = delete; cookie& operator= (const cookie&) = delete; task *data; }; template cookie submit (task &parent, Function&&, Args&&...); /// record a functor and a set of parameters to execute at some point /// in the future by an arbitrary available thread. template cookie submit (Function &&func, Args &&...args) { CHECK (!m_stopping); auto ptr = m_tasks.store.construct ( std::forward (func), std::forward (args)... ); m_tasks.pending->push_back (ptr); m_pending.release (); return cookie (*ptr); } // block until all jobs currently queued have been started void flush (void); // block until there are no more jobs queued or executing void finish (void); /// stores a functor and associated arguments in a fixed size buffer /// for later execution. /// /// for ease of implementation the arguments are currently restricted /// as follows: /// * trivial types (memcpy'able) /// * a fixed total maximum size of around one cache line /// these limitations will be eliminated at some point in the future /// /// the user supplied functor is wrapped with our own that unpacks and /// forwards the arguments from the data buffer. this function must /// be passed a copy of the current arg object as the only argument. struct task { task () = default; ~task () { done.notify (); references.acquire (); } task (const task&) = delete; task (task&&) = delete; task& operator= (const task&) = delete; task& operator= (task&&) = delete; template task (FunctionT &&func, Args&&...params) { static_assert ((std::is_trivially_copyable_v> && ...)); using tuple_t = std::tuple< std::decay_t, std::tuple< std::decay_t... > >; { static_assert (sizeof (tuple_t) <= sizeof data); //tuple_t &punned = *reinterpret_cast (&data); new (reinterpret_cast (&data)) tuple_t ( std::forward (func), { std::forward (params)... } ); } function = [] (task &base) { auto &punned = *reinterpret_cast (&base.data); std::apply (std::get<0> (punned), std::get<1> (punned)); }; } auto acquire (void) { return --references; } auto release (void) { return ++references; } // GCC: switch to hardware_destructive_interference_size when it // becomes available in libstdc++. Until then we use a sensible // guess. std::array data; std::function function; thread::semaphore references = 0; thread::flag done; }; private: void loop (); void reap (); std::atomic m_stopping = false; std::atomic m_running = 0; struct { thread::monitor< std::deque, thread::ticketlock > pending; pool store; parallel::queue finishing; std::vector notified; } m_tasks; thread::semaphore m_pending; std::vector m_threads; thread::semaphore m_doomed; std::thread m_reaper; }; } #endif