thread/semaphore: specalise the implementation for win32

This commit is contained in:
Danny Robson 2018-08-15 17:00:29 +10:00
parent 2eb1f2b919
commit 6c5a7cc5fa
4 changed files with 65 additions and 48 deletions

View File

@ -28,9 +28,12 @@ main ()
tap.expect_eq (sem.lock (), 0, "bare acquire decrements without blocking"); tap.expect_eq (sem.lock (), 0, "bare acquire decrements without blocking");
} }
// test that two threads can cooperate on a single semaphore.
{ {
cruft::thread::semaphore sem (1); cruft::thread::semaphore sem (1);
// the spawned thread attempts to double acquire a semaphore with
// only one available and so should get blocked immediately.
std::atomic<int> test = 0; std::atomic<int> test = 0;
std::thread t ([&] () { std::thread t ([&] () {
sem.lock (); sem.lock ();
@ -38,11 +41,15 @@ main ()
test = 1; test = 1;
}); });
// wait until we know the thread should have been blocked. it should
// not have touched the 'test' variable at this point.
while (sem.value () > 0) while (sem.value () > 0)
std::this_thread::yield (); std::this_thread::yield ();
tap.expect_eq (test, 0, "locking blocks in foreign thread"); tap.expect_eq (test, 0, "locking blocks in foreign thread");
// unlock the semaphore, wait for the thread to finish, and check it
// touched the 'test' variable.
sem.unlock (); sem.unlock ();
t.join (); t.join ();
tap.expect_eq (test, 1, "unlocking resumes foreign thread"); tap.expect_eq (test, 1, "unlocking resumes foreign thread");

View File

@ -2,6 +2,8 @@
#if defined(PLATFORM_LINUX) #if defined(PLATFORM_LINUX)
#include "semaphore_linux.hpp" #include "semaphore_linux.hpp"
#else #elif defined(PLATFORM_WIN32)
#include "semaphore_win32.hpp" #include "semaphore_win32.hpp"
#else
#error "Unsupported threading platform"
#endif #endif

View File

@ -8,8 +8,12 @@
#include "semaphore_win32.hpp" #include "semaphore_win32.hpp"
#include "../win32/except.hpp"
#include "../debug.hpp" #include "../debug.hpp"
#include <windows.h>
using cruft::thread::semaphore; using cruft::thread::semaphore;
@ -20,38 +24,39 @@ semaphore::semaphore ():
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
semaphore::semaphore (int initial): semaphore::semaphore (value_type initial):
m_value (initial) m_handle (
{ ; } win32::error::try_call (
CreateSemaphore,
nullptr,
initial,
std::numeric_limits<LONG>::max (),
nullptr
)
)
{
m_value = initial;
}
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
int semaphore::value_type
semaphore::acquire (int count) semaphore::acquire (value_type const count)
{ {
CHECK_GE (count, 0); CHECK_GE (count, 0);
do { for (value_type i = 0; i < count; ++i) {
int now = m_value; if (WAIT_OBJECT_0 != WaitForSingleObject(m_handle.native(), INFINITE))
win32::error::throw_code();
--m_value;
}
// if our value is positive then attempt to decrement it and return, return m_value;
// else retry because someone interfered with us.
if (now - count >= 0) {
if (m_value.compare_exchange_weak (now, now - count))
return now - count;
continue;
}
// the count doesn't appear to allow us to acquire. sleep until
// there's been a modification and retry.
std::unique_lock lk (m_mutex);
m_cv.wait (lk, [&, this] () { return m_value - count >= 0; });
} while (1);
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
int semaphore::value_type
semaphore::acquire (void) semaphore::acquire (void)
{ {
return acquire (1); return acquire (1);
@ -59,18 +64,22 @@ semaphore::acquire (void)
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
int semaphore::value_type
semaphore::release (int count) semaphore::release (value_type count)
{ {
auto res = m_value += count; LONG previous;
if (res > 0) for (value_type i = 0; i < count; ++i) {
m_cv.notify_one(); if (!ReleaseSemaphore (m_handle.native(), count, &previous))
return res; win32::error::throw_code ();
++m_value;
}
return previous + 1;
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
int semaphore::value_type
semaphore::release (void) semaphore::release (void)
{ {
return release (1); return release (1);
@ -78,7 +87,7 @@ semaphore::release (void)
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
int semaphore::value_type
semaphore::value (void) const semaphore::value (void) const
{ {
return m_value; return m_value;
@ -86,7 +95,7 @@ semaphore::value (void) const
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
int semaphore::value_type
semaphore::operator++ (void) semaphore::operator++ (void)
{ {
return release (); return release ();
@ -94,10 +103,8 @@ semaphore::operator++ (void)
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
int semaphore::value_type
semaphore::operator-- (void) semaphore::operator-- (void)
{ {
// we don't need to wake anyone because this will only serve to delay return acquire ();
// their wakeup.
return --m_value;
} }

View File

@ -8,16 +8,18 @@
#pragma once #pragma once
#include "../win32/handle.hpp"
#include <atomic> #include <atomic>
#include <condition_variable>
#include <mutex>
namespace cruft::thread { namespace cruft::thread {
/// Explicitly does not conform to BasicLockable. /// Explicitly does not conform to BasicLockable.
class semaphore { class semaphore {
public: public:
semaphore (int initial); using value_type = LONG;
semaphore (value_type initial);
semaphore (); semaphore ();
semaphore (const semaphore&) = delete; semaphore (const semaphore&) = delete;
@ -25,22 +27,21 @@ namespace cruft::thread {
semaphore (semaphore&&) = delete; semaphore (semaphore&&) = delete;
semaphore& operator= (semaphore&&) = delete; semaphore& operator= (semaphore&&) = delete;
int acquire (void); value_type acquire (void);
int acquire (int count); value_type acquire (value_type count);
int release (void); value_type release (void);
int release (int count); value_type release (value_type count);
auto lock (void) { return acquire (); } auto lock (void) { return acquire (); }
auto unlock (void) { return release (); } auto unlock (void) { return release (); }
int value (void) const; value_type value (void) const;
int operator++ (void); value_type operator++ (void);
int operator-- (void); value_type operator-- (void);
private: private:
std::atomic<int> m_value; std::atomic<LONG> m_value;
std::mutex m_mutex; win32::handle m_handle;
std::condition_variable m_cv;
}; };
}; };