/*
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * Copyright 2017-2018 Danny Robson <danny@nerdcruft.net>
 */

#ifndef CRUFT_UTIL_JOB_QUEUE_HPP
#define CRUFT_UTIL_JOB_QUEUE_HPP

#include "../pool.hpp"

#include "../thread/ticketlock.hpp"
#include "../thread/semaphore.hpp"
#include "../thread/flag.hpp"
#include "../thread/monitor.hpp"

#include "../parallel/queue.hpp"

#include <array>
#include <deque>
#include <thread>
#include <vector>
#include <new>
#include <cstddef>
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>


namespace cruft::job {
    class queue {
    public:
        queue ();
        queue (unsigned thread_count, unsigned pool_size);
        ~queue ();

        queue (const queue&) = delete;
        queue (queue&&) = delete;
        queue& operator= (const queue&) = delete;
        queue& operator= (queue&&) = delete;

        auto parallelism (void) const { return m_threads.size (); }
        auto capacity (void) const { return m_tasks.store.capacity (); }

        struct task;

        struct [[nodiscard]] cookie {
            ~cookie ()
            {
                if (data) {
                    data->done.wait ();
                    data->references.release ();
                }
            }

            task&
            operator-> (void)
            {
                return *data;
            }

            cookie (task &_data):
                data (&_data)
            { ; }


            cookie (cookie &&rhs):
                data (nullptr)
            {
                std::swap (data, rhs.data);
            }

            cookie& operator= (cookie &&rhs)
            {
                std::swap (data, rhs.data);
                return *this;
            }

            cookie (const cookie&) = delete;
            cookie& operator= (const cookie&) = delete;

            task *data;
        };

        template <class Function, typename ...Args>
        cookie
        submit (task &parent, Function&&, Args&&...);

        /// record a functor and a set of parameters to execute at some point
        /// in the future by an arbitrary available thread.
        template <class Function, typename ...Args>
        cookie
        submit (Function &&func, Args &&...args)
        {
            CHECK (!m_stopping);

            auto ptr = m_tasks.store.construct (
                std::forward<Function> (func),
                std::forward<Args> (args)...
            );

            m_tasks.pending->push_back (ptr);

            m_pending.release ();

            return cookie (*ptr);
        }


        // block until all jobs currently queued have been started
        void flush (void);

        // block until there are no more jobs queued or executing
        void finish (void);

        /// stores a functor and associated arguments in a fixed size buffer
        /// for later execution.
        ///
        /// for ease of implementation the arguments are currently restricted
        /// as follows:
        ///   * trivial types (memcpy'able)
        ///   * a fixed total maximum size of around one cache line
        /// these limitations will be eliminated at some point in the future
        ///
        /// the user supplied functor is wrapped with our own that unpacks and
        /// forwards the arguments from the data buffer. this function must
        /// be passed a copy of the current arg object as the only argument.
        struct task {
            task () = default;

            ~task ()
            {
                done.notify_all ();
                references.acquire ();
            }

            task (const task&) = delete;
            task (task&&) = delete;
            task& operator= (const task&) = delete;
            task& operator= (task&&) = delete;

            template <class FunctionT, typename ...Args>
            task (FunctionT &&func, Args&&...params)
            {
                static_assert ((std::is_trivially_copyable_v<std::decay_t<Args>> && ...));

                using tuple_t = std::tuple<
                    std::decay_t<FunctionT>,
                    std::tuple<
                        std::decay_t<Args>...
                    >
                >;

                {
                    static_assert (sizeof (tuple_t) <= sizeof data);
                    //tuple_t &punned = *reinterpret_cast<tuple_t*> (&data);
                    new (reinterpret_cast<tuple_t*> (&data)) tuple_t (
                        std::forward<FunctionT> (func),
                        { std::forward<Args> (params)... }
                    );
                }

                function = [] (task &base) {
                    auto &punned = *reinterpret_cast<tuple_t*> (&base.data);
                    std::apply (std::get<0> (punned), std::get<1> (punned));
                };
            }

            auto acquire (void) { return --references; }
            auto release (void) { return ++references; }

            // GCC: switch to hardware_destructive_interference_size when it
            // becomes available in libstdc++. Until then we use a sensible
            // guess.
            std::aligned_storage_t<64,alignof(std::max_align_t)> data;

            std::function<void(task&)> function;
            thread::semaphore references = 0;
            thread::flag done;
        };

    private:
        void loop ();
        void reap ();

        std::atomic<bool> m_stopping = false;
        std::atomic<int> m_running = 0;

        struct {
            thread::monitor<
                std::deque<task*>,
                thread::ticketlock
            > pending;

            pool<task> store;
            parallel::queue<task*> finishing;
            std::vector<task*> notified;
        } m_tasks;

        thread::semaphore m_pending;

        std::vector<std::thread> m_threads;

        thread::semaphore m_doomed;
        std::thread m_reaper;
    };
}

#endif