From c9dea1888110bb9fb03cccaca4410322a21bc982 Mon Sep 17 00:00:00 2001 From: Danny Robson Date: Wed, 14 Mar 2018 18:12:34 +1100 Subject: [PATCH] job/semaphore: add a basic semaphore implementation --- CMakeLists.txt | 4 ++ job/semaphore.hpp | 50 ++++++++++++++++++ job/semaphore_linux.cpp | 114 ++++++++++++++++++++++++++++++++++++++++ job/semaphore_win32.cpp | 0 test/job/semaphore.cpp | 71 +++++++++++++++++++++++++ 5 files changed, 239 insertions(+) create mode 100644 job/semaphore.hpp create mode 100644 job/semaphore_linux.cpp create mode 100644 job/semaphore_win32.cpp create mode 100644 test/job/semaphore.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 0be96064..6cf92988 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -89,6 +89,7 @@ if (LINUX) list (APPEND UTIL_FILES job/event_linux.cpp job/flag_linux.cpp + job/semaphore_linux.cpp ) endif () @@ -126,6 +127,7 @@ if (WINDOWS) io_win32.hpp job/event_win32.cpp job/flag_win32.cpp + job/semaphore_win32.cpp library_win32.cpp library_win32.hpp time_win32.cpp @@ -270,6 +272,7 @@ list ( job/flag.hpp job/queue.cpp job/queue.hpp + job/semaphore.hpp job/ticketlock.cpp job/ticketlock.hpp job/spinlock.cpp @@ -484,6 +487,7 @@ if (TESTS) job/event job/flag job/queue + job/semaphore job/spinlock job/ticketlock json_types diff --git a/job/semaphore.hpp b/job/semaphore.hpp new file mode 100644 index 00000000..df468351 --- /dev/null +++ b/job/semaphore.hpp @@ -0,0 +1,50 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Copyright 2018 Danny Robson + */ + +#ifndef CRUFT_UTIL_JOB_SEMAPHORE_HPP +#define CRUFT_UTIL_JOB_SEMAPHORE_HPP + +#include + +namespace util::job { + /// Explicitly does not conform to BasicLockable. + class semaphore { + public: + semaphore (int initial); + semaphore (); + + semaphore (const semaphore&) = delete; + semaphore& operator= (const semaphore&) = delete; + semaphore (semaphore&&) = delete; + semaphore& operator= (semaphore&&) = delete; + + int acquire (void); + int release (void); + + auto lock (void) { return acquire (); } + auto unlock (void) { return release (); } + + int value (void) const; + + int operator++ (void); + int operator-- (void); + + private: + std::atomic m_value; + }; +}; + +#endif diff --git a/job/semaphore_linux.cpp b/job/semaphore_linux.cpp new file mode 100644 index 00000000..6e2c3147 --- /dev/null +++ b/job/semaphore_linux.cpp @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Copyright 2018 Danny Robson + */ + +#include "semaphore.hpp" + +#include "../cast.hpp" + +#include +#include +#include +#include +#include +#include + +using util::job::semaphore; + + +/////////////////////////////////////////////////////////////////////////////// +static long +sys_futex (void *addr1, int op, int val1, struct timespec *timeout, void *addr2, int val3) +{ + return syscall (SYS_futex, addr1, op | FUTEX_PRIVATE_FLAG, val1, timeout, addr2, val3); +} + + +/////////////////////////////////////////////////////////////////////////////// +semaphore::semaphore (): + semaphore (1) +{ ; } + + +//----------------------------------------------------------------------------- +semaphore::semaphore (int initial): + m_value (initial) +{ ; } + + +/////////////////////////////////////////////////////////////////////////////// +int +semaphore::acquire (void) +{ + do { + int now = m_value; + + // if our value is positive then attempt to decrement it and return, + // else retry because someone interfered with us. + if (now > 0) { + if (m_value.compare_exchange_weak (now, now - 1)) + return now - 1; + continue; + } + + // the count doesn't appear to allow us to acquire. sleep until + // there's been a modification and retry. + if (-1 == sys_futex (&m_value, FUTEX_WAIT, now, nullptr, nullptr, 0)) { + switch (errno) { + case EAGAIN: break; + case EINTR: break; + default: + posix::error::throw_code (); + } + } + } while (1); +} + + +//----------------------------------------------------------------------------- +int +semaphore::release (void) +{ + auto res = ++m_value; + if (sys_futex (&m_value, FUTEX_WAKE, 1, nullptr, nullptr, 0) < 0) + posix::error::throw_code (); + return res; +} + + +/////////////////////////////////////////////////////////////////////////////// +int +semaphore::value (void) const +{ + return m_value; +} + + +//----------------------------------------------------------------------------- +int +semaphore::operator++ (void) +{ + return release (); +} + + +//----------------------------------------------------------------------------- +int +semaphore::operator-- (void) +{ + // we don't need to wake anyone because this will only serve to delay + // their wakeup. + return --m_value; +} diff --git a/job/semaphore_win32.cpp b/job/semaphore_win32.cpp new file mode 100644 index 00000000..e69de29b diff --git a/test/job/semaphore.cpp b/test/job/semaphore.cpp new file mode 100644 index 00000000..7ed7a598 --- /dev/null +++ b/test/job/semaphore.cpp @@ -0,0 +1,71 @@ +#include "job/semaphore.hpp" +#include "job/flag.hpp" +#include "tap.hpp" + +#include +#include + + +/////////////////////////////////////////////////////////////////////////////// +void +fight (util::job::semaphore &sem, const int iterations) +{ + for (int i = 0; i < iterations; ++i) + std::lock_guard {sem}; +} + + +/////////////////////////////////////////////////////////////////////////////// +int +main () +{ + util::TAP::logger tap; + + { + util::job::semaphore sem (0); + tap.expect_eq (sem.value (), 0, "initialisation is respected"); + tap.expect_eq (sem.unlock (), 1, "bare release increments without blocking"); + tap.expect_eq (sem.lock (), 0, "bare acquire decrements without blocking"); + } + + { + util::job::semaphore sem (1); + + std::atomic test = 0; + std::thread t ([&] () { + sem.lock (); + sem.lock (); + test = 1; + }); + + while (sem.value () > 0) + std::this_thread::yield (); + + tap.expect_eq (test, 0, "locking blocks in foreign thread"); + + sem.unlock (); + t.join (); + tap.expect_eq (test, 1, "unlocking resumes foreign thread"); + } + + { + const auto parallelism = std::thread::hardware_concurrency (); + constexpr int iterations = 1 << 16; + std::vector threads; + + util::job::semaphore sem (0); + + for (unsigned i = 0; i < parallelism; ++i) + threads.emplace_back (fight, std::ref (sem), iterations); + + sem.unlock (); + sem.unlock (); + + for (auto &t: threads) + t.join (); + + tap.expect (true, "high concurrency didn't deadlock"); + } + + return tap.status (); +} \ No newline at end of file