diff --git a/.travis.yml b/.travis.yml deleted file mode 100755 index 310edf10..00000000 --- a/.travis.yml +++ /dev/null @@ -1,31 +0,0 @@ -language: cpp -os: linux -dist: xenial - -addons: - apt: - sources: - - ubuntu-toolchain-r-test - packages: - - g++-8 - -matrix: - include: - - compiler: gcc - env: - - MATRIX_EVAL="CC=gcc-8 && CXX=g++-8" - - compiler: clang - -before_install: - - eval "${MATRIX_EVAL}" - -script: - - mkdir -p ./build && cd ./build - - cmake -DCMAKE_BUILD_TYPE=Release -DLIBIPC_BUILD_TESTS=ON .. - - make -j`nproc` - - export LD_LIBRARY_PATH=./lib:$LD_LIBRARY_PATH && ./bin/test-ipc - -notifications: - slack: - on_success: never - on_failure: never diff --git a/CMakeLists.txt b/CMakeLists.txt index b4845b07..6d9b2a57 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,7 @@ if(NOT MSVC) set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") endif() -if (MSVC AND LIBIPC_USE_STATIC_CRT) +if (MSVC) set(CompilerFlags CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_DEBUG @@ -22,9 +22,17 @@ if (MSVC AND LIBIPC_USE_STATIC_CRT) CMAKE_C_FLAGS_DEBUG CMAKE_C_FLAGS_RELEASE ) - foreach(CompilerFlag ${CompilerFlags}) - string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") - endforeach() + if (LIBIPC_USE_STATIC_CRT) + foreach(CompilerFlag ${CompilerFlags}) + string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") + string(REPLACE "/MDd" "/MTd" ${CompilerFlag} "${${CompilerFlag}}") + endforeach() + else() + foreach(CompilerFlag ${CompilerFlags}) + string(REPLACE "/MT" "/MD" ${CompilerFlag} "${${CompilerFlag}}") + string(REPLACE "/MTd" "/MDd" ${CompilerFlag} "${${CompilerFlag}}") + endforeach() + endif() endif() set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) @@ -50,6 +58,7 @@ endif() if (LIBIPC_BUILD_DEMOS) add_subdirectory(demo/chat) add_subdirectory(demo/msg_que) + add_subdirectory(demo/send_recv) endif() install( diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index 5635101d..eda7f517 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -20,8 +20,8 @@ constexpr char const mode_r__[] = "r"; constexpr std::size_t const min_sz = 128; constexpr std::size_t const max_sz = 1024 * 16; -std::atomic is_quit__{ false }; -std::atomic size_counter__{ 0 }; +std::atomic is_quit__ {false}; +std::atomic size_counter__ {0}; using msg_que_t = ipc::chan; @@ -127,10 +127,10 @@ int main(int argc, char ** argv) { ::signal(SIGHUP , exit); #endif - if (std::string{ argv[1] } == mode_s__) { + std::string mode {argv[1]}; + if (mode == mode_s__) { do_send(); - } - else if (std::string{ argv[1] } == mode_r__) { + } else if (mode == mode_r__) { do_recv(); } return 0; diff --git a/demo/send_recv/CMakeLists.txt b/demo/send_recv/CMakeLists.txt new file mode 100644 index 00000000..9d3f0fbd --- /dev/null +++ b/demo/send_recv/CMakeLists.txt @@ -0,0 +1,11 @@ +project(send_recv) + +include_directories( + ${LIBIPC_PROJECT_DIR}/3rdparty) + +file(GLOB SRC_FILES ./*.cpp) +file(GLOB HEAD_FILES ./*.h) + +add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES}) + +target_link_libraries(${PROJECT_NAME} ipc) diff --git a/demo/send_recv/main.cpp b/demo/send_recv/main.cpp new file mode 100644 index 00000000..52287c2e --- /dev/null +++ b/demo/send_recv/main.cpp @@ -0,0 +1,72 @@ + +#include + +#include +#include +#include +#include +#include + +#include "libipc/ipc.h" + +namespace { + +std::atomic is_quit__ {false}; +ipc::channel *ipc__ = nullptr; + +void do_send(int size, int interval) { + ipc::channel ipc {"ipc", ipc::sender}; + ipc__ = &ipc; + std::string buffer(size, 'A'); + while (!is_quit__.load(std::memory_order_acquire)) { + std::cout << "send size: " << buffer.size() + 1 << "\n"; + ipc.send(buffer, 0/*tm*/); + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } +} + +void do_recv(int interval) { + ipc::channel ipc {"ipc", ipc::receiver}; + ipc__ = &ipc; + while (!is_quit__.load(std::memory_order_acquire)) { + ipc::buff_t recv; + for (int k = 1; recv.empty(); ++k) { + std::cout << "recv waiting... " << k << "\n"; + recv = ipc.recv(interval); + if (is_quit__.load(std::memory_order_acquire)) return; + } + std::cout << "recv size: " << recv.size() << "\n"; + } +} + +} // namespace + +int main(int argc, char ** argv) { + if (argc < 3) return -1; + + auto exit = [](int) { + is_quit__.store(true, std::memory_order_release); + if (ipc__ != nullptr) ipc__->disconnect(); + }; + ::signal(SIGINT , exit); + ::signal(SIGABRT , exit); + ::signal(SIGSEGV , exit); + ::signal(SIGTERM , exit); +#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ + defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ + defined(WINCE) || defined(_WIN32_WCE) + ::signal(SIGBREAK, exit); +#else + ::signal(SIGHUP , exit); +#endif + + std::string mode {argv[1]}; + if (mode == "send") { + if (argc < 4) return -1; + do_send(std::stoi(argv[2]) /*size*/, + std::stoi(argv[3]) /*interval*/); + } else if (mode == "recv") { + do_recv(std::stoi(argv[2]) /*interval*/); + } + return 0; +} diff --git a/include/libipc/condition.h b/include/libipc/condition.h new file mode 100644 index 00000000..d3b3a593 --- /dev/null +++ b/include/libipc/condition.h @@ -0,0 +1,39 @@ +#pragma once + +#include // std::uint64_t + +#include "libipc/export.h" +#include "libipc/def.h" +#include "libipc/mutex.h" + +namespace ipc { +namespace sync { + +class IPC_EXPORT condition { + condition(condition const &) = delete; + condition &operator=(condition const &) = delete; + +public: + condition(); + explicit condition(char const *name); + ~condition(); + + void const *native() const noexcept; + void *native() noexcept; + + bool valid() const noexcept; + + bool open(char const *name) noexcept; + void close() noexcept; + + bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept; + bool notify() noexcept; + bool broadcast() noexcept; + +private: + class condition_; + condition_* p_; +}; + +} // namespace sync +} // namespace ipc diff --git a/include/libipc/def.h b/include/libipc/def.h index 2b80b81b..8c1a72ba 100755 --- a/include/libipc/def.h +++ b/include/libipc/def.h @@ -25,13 +25,16 @@ using uint_t = typename uint::type; // constants +enum : std::uint32_t { + invalid_value = (std::numeric_limits::max)(), + default_timeout = 100, // ms +}; + enum : std::size_t { - invalid_value = (std::numeric_limits::max)(), data_length = 64, large_msg_limit = data_length, large_msg_align = 1024, large_msg_cache = 32, - default_timeout = 100 // ms }; enum class relat { // multiplicity of the relationship diff --git a/include/libipc/ipc.h b/include/libipc/ipc.h index f6380ae3..64b262c9 100755 --- a/include/libipc/ipc.h +++ b/include/libipc/ipc.h @@ -27,12 +27,12 @@ struct IPC_EXPORT chan_impl { static char const * name(ipc::handle_t h); static std::size_t recv_count(ipc::handle_t h); - static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm); + static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::uint64_t tm); - static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm); - static buff_t recv(ipc::handle_t h, std::size_t tm); + static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm); + static buff_t recv(ipc::handle_t h, std::uint64_t tm); - static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm); + static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm); static buff_t try_recv(ipc::handle_t h); }; @@ -120,41 +120,41 @@ class chan_wrapper { return detail_t::recv_count(h_); } - bool wait_for_recv(std::size_t r_count, std::size_t tm = invalid_value) const { + bool wait_for_recv(std::size_t r_count, std::uint64_t tm = invalid_value) const { return detail_t::wait_for_recv(h_, r_count, tm); } - static bool wait_for_recv(char const * name, std::size_t r_count, std::size_t tm = invalid_value) { + static bool wait_for_recv(char const * name, std::size_t r_count, std::uint64_t tm = invalid_value) { return chan_wrapper(name).wait_for_recv(r_count, tm); } /** * If timeout, this function would call 'force_push' to send the data forcibly. */ - bool send(void const * data, std::size_t size, std::size_t tm = default_timeout) { + bool send(void const * data, std::size_t size, std::uint64_t tm = default_timeout) { return detail_t::send(h_, data, size, tm); } - bool send(buff_t const & buff, std::size_t tm = default_timeout) { + bool send(buff_t const & buff, std::uint64_t tm = default_timeout) { return this->send(buff.data(), buff.size(), tm); } - bool send(std::string const & str, std::size_t tm = default_timeout) { + bool send(std::string const & str, std::uint64_t tm = default_timeout) { return this->send(str.c_str(), str.size() + 1, tm); } /** * If timeout, this function would just return false. */ - bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) { + bool try_send(void const * data, std::size_t size, std::uint64_t tm = default_timeout) { return detail_t::try_send(h_, data, size, tm); } - bool try_send(buff_t const & buff, std::size_t tm = default_timeout) { + bool try_send(buff_t const & buff, std::uint64_t tm = default_timeout) { return this->try_send(buff.data(), buff.size(), tm); } - bool try_send(std::string const & str, std::size_t tm = default_timeout) { + bool try_send(std::string const & str, std::uint64_t tm = default_timeout) { return this->try_send(str.c_str(), str.size() + 1, tm); } - buff_t recv(std::size_t tm = invalid_value) { + buff_t recv(std::uint64_t tm = invalid_value) { return detail_t::recv(h_, tm); } diff --git a/include/libipc/mutex.h b/include/libipc/mutex.h new file mode 100644 index 00000000..2d4781ff --- /dev/null +++ b/include/libipc/mutex.h @@ -0,0 +1,39 @@ +#pragma once + +#include // std::uint64_t +#include + +#include "libipc/export.h" +#include "libipc/def.h" + +namespace ipc { +namespace sync { + +class IPC_EXPORT mutex { + mutex(mutex const &) = delete; + mutex &operator=(mutex const &) = delete; + +public: + mutex(); + explicit mutex(char const *name); + ~mutex(); + + void const *native() const noexcept; + void *native() noexcept; + + bool valid() const noexcept; + + bool open(char const *name) noexcept; + void close() noexcept; + + bool lock(std::uint64_t tm = ipc::invalid_value) noexcept; + bool try_lock() noexcept(false); // std::system_error + bool unlock() noexcept; + +private: + class mutex_; + mutex_* p_; +}; + +} // namespace sync +} // namespace ipc diff --git a/include/libipc/rw_lock.h b/include/libipc/rw_lock.h index 5757b23f..31ed9de2 100755 --- a/include/libipc/rw_lock.h +++ b/include/libipc/rw_lock.h @@ -72,7 +72,7 @@ inline void yield(K& k) noexcept { ++k; } -template +template inline void sleep(K& k, F&& f) { if (k < static_cast(N)) { std::this_thread::yield(); @@ -84,7 +84,7 @@ inline void sleep(K& k, F&& f) { ++k; } -template +template inline void sleep(K& k) { sleep(k, [] { std::this_thread::sleep_for(std::chrono::milliseconds(1)); diff --git a/include/libipc/semaphore.h b/include/libipc/semaphore.h new file mode 100644 index 00000000..7f557e2f --- /dev/null +++ b/include/libipc/semaphore.h @@ -0,0 +1,37 @@ +#pragma once + +#include // std::uint64_t + +#include "libipc/export.h" +#include "libipc/def.h" + +namespace ipc { +namespace sync { + +class IPC_EXPORT semaphore { + semaphore(semaphore const &) = delete; + semaphore &operator=(semaphore const &) = delete; + +public: + semaphore(); + explicit semaphore(char const *name, std::uint32_t count = 0); + ~semaphore(); + + void const *native() const noexcept; + void *native() noexcept; + + bool valid() const noexcept; + + bool open(char const *name, std::uint32_t count = 0) noexcept; + void close() noexcept; + + bool wait(std::uint64_t tm = ipc::invalid_value) noexcept; + bool post(std::uint32_t count = 1) noexcept; + +private: + class semaphore_; + semaphore_* p_; +}; + +} // namespace sync +} // namespace ipc diff --git a/include/libipc/shm.h b/include/libipc/shm.h index 91a68e4c..12e32237 100755 --- a/include/libipc/shm.h +++ b/include/libipc/shm.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "libipc/export.h" @@ -14,11 +15,14 @@ enum : unsigned { open = 0x02 }; -IPC_EXPORT id_t acquire(char const * name, std::size_t size, unsigned mode = create | open); -IPC_EXPORT void * get_mem(id_t id, std::size_t * size); -IPC_EXPORT void release(id_t id); -IPC_EXPORT void remove (id_t id); -IPC_EXPORT void remove (char const * name); +IPC_EXPORT id_t acquire(char const * name, std::size_t size, unsigned mode = create | open); +IPC_EXPORT void * get_mem(id_t id, std::size_t * size); +IPC_EXPORT std::int32_t release(id_t id); +IPC_EXPORT void remove (id_t id); +IPC_EXPORT void remove (char const * name); + +IPC_EXPORT std::int32_t get_ref(id_t id); +IPC_EXPORT void sub_ref(id_t id); class IPC_EXPORT handle { public: @@ -31,12 +35,15 @@ class IPC_EXPORT handle { void swap(handle& rhs); handle& operator=(handle rhs); - bool valid() const; - std::size_t size () const; - char const * name () const; + bool valid() const noexcept; + std::size_t size () const noexcept; + char const * name () const noexcept; + + std::int32_t ref() const noexcept; + void sub_ref() noexcept; bool acquire(char const * name, std::size_t size, unsigned mode = create | open); - void release(); + std::int32_t release(); void* get() const; diff --git a/include/libipc/waiter.h b/include/libipc/waiter.h deleted file mode 100755 index a4c3c09b..00000000 --- a/include/libipc/waiter.h +++ /dev/null @@ -1,93 +0,0 @@ -#pragma once - -#include "libipc/export.h" -#include "libipc/def.h" - -namespace ipc { - -class condition; -class IPC_EXPORT mutex { -public: - mutex(); - explicit mutex(char const * name); - mutex(mutex&& rhs); - - ~mutex(); - - static void remove(char const * name); - - void swap(mutex& rhs); - mutex& operator=(mutex rhs); - - bool valid() const; - char const * name () const; - - bool open (char const * name); - void close(); - - bool lock (); - bool unlock(); - -private: - class mutex_; - mutex_* p_; - - friend class condition; -}; - -class IPC_EXPORT semaphore { -public: - semaphore(); - explicit semaphore(char const * name); - semaphore(semaphore&& rhs); - - ~semaphore(); - - static void remove(char const * name); - - void swap(semaphore& rhs); - semaphore& operator=(semaphore rhs); - - bool valid() const; - char const * name () const; - - bool open (char const * name, long count = 0); - void close(); - - bool wait(std::size_t tm = invalid_value); - bool post(long count = 1); - -private: - class semaphore_; - semaphore_* p_; -}; - -class IPC_EXPORT condition { -public: - condition(); - explicit condition(char const * name); - condition(condition&& rhs); - - ~condition(); - - static void remove(char const * name); - - void swap(condition& rhs); - condition& operator=(condition rhs); - - bool valid() const; - char const * name () const; - - bool open (char const * name); - void close(); - - bool wait(mutex&, std::size_t tm = invalid_value); - bool notify(); - bool broadcast(); - -private: - class condition_; - condition_* p_; -}; - -} // namespace ipc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ad601201..00dd78e6 100755 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,7 +5,8 @@ if(UNIX) else() file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/src/libipc/platform/*_win.cpp) endif() -aux_source_directory(${LIBIPC_PROJECT_DIR}/src SRC_FILES) +aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc SRC_FILES) +aux_source_directory(${LIBIPC_PROJECT_DIR}/src/libipc/sync SRC_FILES) file(GLOB HEAD_FILES ${LIBIPC_PROJECT_DIR}/include/libipc/*.h @@ -37,8 +38,8 @@ set_target_properties(${PROJECT_NAME} # set version set_target_properties(${PROJECT_NAME} PROPERTIES - VERSION 1.0.0 - SOVERSION 1) + VERSION 1.1.0 + SOVERSION 2) target_include_directories(${PROJECT_NAME} PUBLIC ${LIBIPC_PROJECT_DIR}/include diff --git a/src/buffer.cpp b/src/libipc/buffer.cpp similarity index 100% rename from src/buffer.cpp rename to src/libipc/buffer.cpp diff --git a/src/ipc.cpp b/src/libipc/ipc.cpp similarity index 93% rename from src/ipc.cpp rename to src/libipc/ipc.cpp index dcf60b34..2713de3a 100755 --- a/src/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -17,6 +17,7 @@ #include "libipc/queue.h" #include "libipc/policy.h" #include "libipc/rw_lock.h" +#include "libipc/waiter.h" #include "libipc/utility/log.h" #include "libipc/utility/id_pool.h" @@ -24,10 +25,7 @@ #include "libipc/utility/utility.h" #include "libipc/memory/resource.h" - #include "libipc/platform/detail.h" -#include "libipc/platform/waiter_wrapper.h" - #include "libipc/circ/elem_array.h" namespace { @@ -271,7 +269,7 @@ struct conn_info_head { ipc::string name_; msg_id_t cc_id_; // connection-info id - ipc::waiter cc_waiter_, wt_waiter_, rd_waiter_; + ipc::detail::waiter cc_waiter_, wt_waiter_, rd_waiter_; ipc::shm::handle acc_h_; conn_info_head(char const * name) @@ -300,18 +298,16 @@ struct conn_info_head { }; template -bool wait_for(W& waiter, F&& pred, std::size_t tm) { +bool wait_for(W& waiter, F&& pred, std::uint64_t tm) { if (tm == 0) return !pred(); for (unsigned k = 0; pred();) { - bool loop = true, ret = true; - ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] { - ret = waiter.wait_if([&loop, &pred] { - return loop = pred(); - }, tm); - k = 0; + bool ret = true; + ipc::sleep(k, [&k, &ret, &waiter, &pred, tm] { + ret = waiter.wait_if(std::forward(pred), tm); + k = 0; }); - if (!ret ) return false; // timeout or fail - if (!loop) break; + if (!ret) return false; // timeout or fail + if (k == 0) break; // k has been reset } return true; } @@ -414,7 +410,7 @@ static std::size_t recv_count(ipc::handle_t h) noexcept { return que->conn_count(); } -static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm) { +static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::uint64_t tm) { auto que = queue_of(h); if (que == nullptr) { return false; @@ -487,7 +483,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s return true; } -static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm) { +static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { @@ -508,7 +504,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size }, h, data, size); } -static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm) { +static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { @@ -524,7 +520,7 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: }, h, data, size); } -static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { +static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) { auto que = queue_of(h); if (que == nullptr) { ipc::error("fail: recv, queue_of(h) == nullptr\n"); @@ -666,22 +662,22 @@ std::size_t chan_impl::recv_count(ipc::handle_t h) { } template -bool chan_impl::wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm) { +bool chan_impl::wait_for_recv(ipc::handle_t h, std::size_t r_count, std::uint64_t tm) { return detail_impl>::wait_for_recv(h, r_count, tm); } template -bool chan_impl::send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm) { +bool chan_impl::send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return detail_impl>::send(h, data, size, tm); } template -buff_t chan_impl::recv(ipc::handle_t h, std::size_t tm) { +buff_t chan_impl::recv(ipc::handle_t h, std::uint64_t tm) { return detail_impl>::recv(h, tm); } template -bool chan_impl::try_send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm) { +bool chan_impl::try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return detail_impl>::try_send(h, data, size, tm); } diff --git a/src/libipc/memory/resource.h b/src/libipc/memory/resource.h index 063e8dcc..38b6fb68 100755 --- a/src/libipc/memory/resource.h +++ b/src/libipc/memory/resource.h @@ -45,14 +45,17 @@ constexpr char const * pf(long double) { return "%Lf" ; } } // internal-linkage +template +struct hash : public std::hash {}; + template using unordered_map = std::unordered_map< - Key, T, std::hash, std::equal_to, ipc::mem::allocator> + Key, T, ipc::hash, std::equal_to, ipc::mem::allocator> >; template using map = std::map< - Key, T, std::less, ipc::mem::allocator> + Key, T, std::less, ipc::mem::allocator> >; template @@ -63,6 +66,18 @@ using basic_string = std::basic_string< using string = basic_string; using wstring = basic_string; +template <> struct hash { + std::size_t operator()(string const &val) const noexcept { + return std::hash{}(val.c_str()); + } +}; + +template <> struct hash { + std::size_t operator()(wstring const &val) const noexcept { + return std::hash{}(val.c_str()); + } +}; + template ipc::string to_string(T val) { char buf[std::numeric_limits::digits10 + 1] {}; diff --git a/src/libipc/platform/condition_linux.h b/src/libipc/platform/condition_linux.h new file mode 100644 index 00000000..d9d4280d --- /dev/null +++ b/src/libipc/platform/condition_linux.h @@ -0,0 +1,140 @@ +#pragma once + +#include +#include + +#include + +#include "libipc/platform/get_wait_time.h" +#include "libipc/utility/log.h" +#include "libipc/utility/scope_guard.h" +#include "libipc/mutex.h" +#include "libipc/shm.h" + +namespace ipc { +namespace detail { +namespace sync { + +class condition { + ipc::shm::handle shm_; + pthread_cond_t *cond_ = nullptr; + + pthread_cond_t *acquire_cond(char const *name) { + if (!shm_.acquire(name, sizeof(pthread_cond_t))) { + ipc::error("[acquire_cond] fail shm.acquire: %s\n", name); + return nullptr; + } + return static_cast(shm_.get()); + } + +public: + condition() = default; + ~condition() = default; + + pthread_cond_t const *native() const noexcept { + return cond_; + } + + pthread_cond_t *native() noexcept { + return cond_; + } + + bool valid() const noexcept { + static const char tmp[sizeof(pthread_cond_t)] {}; + return (cond_ != nullptr) + && (std::memcmp(tmp, cond_, sizeof(pthread_cond_t)) != 0); + } + + bool open(char const *name) noexcept { + close(); + if ((cond_ = acquire_cond(name)) == nullptr) { + return false; + } + if (shm_.ref() > 1) { + return valid(); + } + ::pthread_cond_destroy(cond_); + auto finally = ipc::guard([this] { close(); }); // close when failed + // init condition + int eno; + pthread_condattr_t cond_attr; + if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) { + ipc::error("fail pthread_condattr_init[%d]\n", eno); + return false; + } + IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); + if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { + ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); + return false; + } + *cond_ = PTHREAD_COND_INITIALIZER; + if ((eno = ::pthread_cond_init(cond_, &cond_attr)) != 0) { + ipc::error("fail pthread_cond_init[%d]\n", eno); + return false; + } + finally.dismiss(); + return valid(); + } + + void close() noexcept { + if ((shm_.ref() <= 1) && cond_ != nullptr) { + int eno; + if ((eno = ::pthread_cond_destroy(cond_)) != 0) { + ipc::error("fail pthread_cond_destroy[%d]\n", eno); + } + } + shm_.release(); + cond_ = nullptr; + } + + bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { + if (!valid()) return false; + switch (tm) { + case invalid_value: { + int eno; + if ((eno = ::pthread_cond_wait(cond_, static_cast(mtx.native()))) != 0) { + ipc::error("fail pthread_cond_wait[%d]\n", eno); + return false; + } + } + break; + default: { + auto ts = detail::make_timespec(tm); + int eno; + if ((eno = ::pthread_cond_timedwait(cond_, static_cast(mtx.native()), &ts)) != 0) { + if (eno != ETIMEDOUT) { + ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + eno, tm, ts.tv_sec, ts.tv_nsec); + } + return false; + } + } + break; + } + return true; + } + + bool notify() noexcept { + if (!valid()) return false; + int eno; + if ((eno = ::pthread_cond_signal(cond_)) != 0) { + ipc::error("fail pthread_cond_signal[%d]\n", eno); + return false; + } + return true; + } + + bool broadcast() noexcept { + if (!valid()) return false; + int eno; + if ((eno = ::pthread_cond_broadcast(cond_)) != 0) { + ipc::error("fail pthread_cond_broadcast[%d]\n", eno); + return false; + } + return true; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/condition_win.h b/src/libipc/platform/condition_win.h new file mode 100644 index 00000000..5d82d471 --- /dev/null +++ b/src/libipc/platform/condition_win.h @@ -0,0 +1,119 @@ +#pragma once + +#include +#include +#include + +#include + +#include "libipc/utility/log.h" +#include "libipc/utility/scope_guard.h" +#include "libipc/platform/detail.h" +#include "libipc/mutex.h" +#include "libipc/semaphore.h" +#include "libipc/shm.h" + +namespace ipc { +namespace detail { +namespace sync { + +class condition { + ipc::sync::semaphore sem_; + ipc::sync::mutex lock_; + ipc::shm::handle shm_; + + std::int32_t &counter() { + return *static_cast(shm_.get()); + } + +public: + condition() = default; + ~condition() noexcept = default; + + auto native() noexcept { + return sem_.native(); + } + + auto native() const noexcept { + return sem_.native(); + } + + bool valid() const noexcept { + return sem_.valid() && lock_.valid() && shm_.valid(); + } + + bool open(char const *name) noexcept { + close(); + if (!sem_.open((std::string{"_cond_sem_"} + name).c_str())) { + return false; + } + auto finally_sem = ipc::guard([this] { sem_.close(); }); // close when failed + if (!lock_.open((std::string{"_cond_lock_"} + name).c_str())) { + return false; + } + auto finally_lock = ipc::guard([this] { lock_.close(); }); // close when failed + if (!shm_.acquire((std::string{"_cond_shm_"} + name).c_str(), sizeof(std::int32_t))) { + return false; + } + finally_lock.dismiss(); + finally_sem.dismiss(); + return valid(); + } + + void close() noexcept { + if (!valid()) return; + sem_.close(); + lock_.close(); + shm_.release(); + } + + bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { + if (!valid()) return false; + auto &cnt = counter(); + { + IPC_UNUSED_ std::lock_guard guard {lock_}; + cnt = (cnt < 0) ? 1 : cnt + 1; + } + DWORD ms = (tm == invalid_value) ? INFINITE : static_cast(tm); + /** + * @see + * - https://www.microsoft.com/en-us/research/wp-content/uploads/2004/12/ImplementingCVs.pdf + * - https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-signalobjectandwait + */ + bool rs = ::SignalObjectAndWait(mtx.native(), sem_.native(), ms, FALSE) == WAIT_OBJECT_0; + bool rl = mtx.lock(); // INFINITE + if (!rs) { + IPC_UNUSED_ std::lock_guard guard {lock_}; + cnt -= 1; + } + return rs && rl; + } + + bool notify() noexcept { + if (!valid()) return false; + auto &cnt = counter(); + if (!lock_.lock()) return false; + bool ret = false; + if (cnt > 0) { + ret = sem_.post(1); + cnt -= 1; + } + return lock_.unlock() && ret; + } + + bool broadcast() noexcept { + if (!valid()) return false; + auto &cnt = counter(); + if (!lock_.lock()) return false; + bool ret = false; + if (cnt > 0) { + ret = sem_.post(cnt); + cnt = 0; + } + return lock_.unlock() && ret; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/detail.h b/src/libipc/platform/detail.h index 141de6e2..8f4c4f57 100755 --- a/src/libipc/platform/detail.h +++ b/src/libipc/platform/detail.h @@ -22,6 +22,18 @@ # error "IPC_CONSTEXPR_ has been defined." #endif +// detect platform + +#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ + defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ + defined(WINCE) || defined(_WIN32_WCE) +# define IPC_OS_WINDOWS_ +#endif/*WIN*/ + +#if defined(__linux__) || defined(__linux) +# define IPC_OS_LINUX_ +#endif/*linux*/ + #if __cplusplus >= 201703L #define IPC_UNUSED_ [[maybe_unused]] diff --git a/src/libipc/platform/get_wait_time.h b/src/libipc/platform/get_wait_time.h new file mode 100644 index 00000000..785cd754 --- /dev/null +++ b/src/libipc/platform/get_wait_time.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include + +#include +#include +#include + +#include "libipc/utility/log.h" + +namespace ipc { +namespace detail { + +inline bool calc_wait_time(timespec &ts, std::uint64_t tm /*ms*/) noexcept { + timeval now; + int eno = ::gettimeofday(&now, NULL); + if (eno != 0) { + ipc::error("fail gettimeofday [%d]\n", eno); + return false; + } + ts.tv_nsec = (now.tv_usec + (tm % 1000) * 1000) * 1000; + ts.tv_sec = now.tv_sec + (tm / 1000) + (ts.tv_nsec / 1000000000l); + ts.tv_nsec %= 1000000000l; + return true; +} + +inline timespec make_timespec(std::uint64_t tm /*ms*/) noexcept(false) { + timespec ts {}; + if (!calc_wait_time(ts, tm)) { + ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + tm, ts.tv_sec, ts.tv_nsec); + throw std::system_error{static_cast(errno), std::system_category()}; + } + return ts; +} + +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/mutex_linux.h b/src/libipc/platform/mutex_linux.h new file mode 100644 index 00000000..9080f585 --- /dev/null +++ b/src/libipc/platform/mutex_linux.h @@ -0,0 +1,236 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +#include "libipc/platform/get_wait_time.h" +#include "libipc/platform/detail.h" +#include "libipc/utility/log.h" +#include "libipc/utility/scope_guard.h" +#include "libipc/memory/resource.h" +#include "libipc/shm.h" + +namespace ipc { +namespace detail { +namespace sync { + +class mutex { + ipc::shm::handle *shm_ = nullptr; + std::atomic *ref_ = nullptr; + pthread_mutex_t *mutex_ = nullptr; + + struct curr_prog { + struct shm_data { + ipc::shm::handle shm; + std::atomic ref; + + struct init { + char const *name; + std::size_t size; + }; + shm_data(init arg) + : shm{arg.name, arg.size}, ref{0} {} + }; + ipc::map mutex_handles; + std::mutex lock; + + static curr_prog &get() { + static curr_prog info; + return info; + } + }; + + pthread_mutex_t *acquire_mutex(char const *name) { + if (name == nullptr) { + return nullptr; + } + auto &info = curr_prog::get(); + IPC_UNUSED_ std::lock_guard guard {info.lock}; + auto it = info.mutex_handles.find(name); + if (it == info.mutex_handles.end()) { + it = curr_prog::get().mutex_handles.emplace(name, + curr_prog::shm_data::init{name, sizeof(pthread_mutex_t)}).first; + } + shm_ = &it->second.shm; + ref_ = &it->second.ref; + if (shm_ == nullptr) { + return nullptr; + } + return static_cast(shm_->get()); + } + + template + void release_mutex(ipc::string const &name, F &&clear) { + if (name.empty()) return; + IPC_UNUSED_ std::lock_guard guard {curr_prog::get().lock}; + auto it = curr_prog::get().mutex_handles.find(name); + if (it == curr_prog::get().mutex_handles.end()) { + return; + } + if (clear()) { + curr_prog::get().mutex_handles.erase(it); + } + } + +public: + mutex() = default; + ~mutex() = default; + + pthread_mutex_t const *native() const noexcept { + return mutex_; + } + + pthread_mutex_t *native() noexcept { + return mutex_; + } + + bool valid() const noexcept { + static const char tmp[sizeof(pthread_mutex_t)] {}; + return (shm_ != nullptr) && (ref_ != nullptr) && (mutex_ != nullptr) + && (std::memcmp(tmp, mutex_, sizeof(pthread_mutex_t)) != 0); + } + + bool open(char const *name) noexcept { + close(); + if ((mutex_ = acquire_mutex(name)) == nullptr) { + return false; + } + auto self_ref = ref_->fetch_add(1, std::memory_order_relaxed); + if (shm_->ref() > 1 || self_ref > 0) { + return valid(); + } + ::pthread_mutex_destroy(mutex_); + auto finally = ipc::guard([this] { close(); }); // close when failed + // init mutex + int eno; + pthread_mutexattr_t mutex_attr; + if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) { + ipc::error("fail pthread_mutexattr_init[%d]\n", eno); + return false; + } + IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); + if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { + ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); + return false; + } + if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) { + ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno); + return false; + } + *mutex_ = PTHREAD_MUTEX_INITIALIZER; + if ((eno = ::pthread_mutex_init(mutex_, &mutex_attr)) != 0) { + ipc::error("fail pthread_mutex_init[%d]\n", eno); + return false; + } + finally.dismiss(); + return valid(); + } + + void close() noexcept { + if ((ref_ != nullptr) && (shm_ != nullptr) && (mutex_ != nullptr)) { + if (shm_->name() != nullptr) { + release_mutex(shm_->name(), [this] { + auto self_ref = ref_->fetch_sub(1, std::memory_order_relaxed); + if ((shm_->ref() <= 1) && (self_ref <= 1)) { + int eno; + if ((eno = ::pthread_mutex_destroy(mutex_)) != 0) { + ipc::error("fail pthread_mutex_destroy[%d]\n", eno); + } + return true; + } + return false; + }); + } else shm_->release(); + } + shm_ = nullptr; + ref_ = nullptr; + mutex_ = nullptr; + } + + bool lock(std::uint64_t tm) noexcept { + if (!valid()) return false; + for (;;) { + auto ts = detail::make_timespec(tm); + int eno = (tm == invalid_value) + ? ::pthread_mutex_lock(mutex_) + : ::pthread_mutex_timedlock(mutex_, &ts); + switch (eno) { + case 0: + return true; + case ETIMEDOUT: + return false; + case EOWNERDEAD: { + if (shm_->ref() > 1) { + shm_->sub_ref(); + } + int eno2 = ::pthread_mutex_consistent(mutex_); + if (eno2 != 0) { + ipc::error("fail pthread_mutex_lock[%d], pthread_mutex_consistent[%d]\n", eno, eno2); + return false; + } + int eno3 = ::pthread_mutex_unlock(mutex_); + if (eno3 != 0) { + ipc::error("fail pthread_mutex_lock[%d], pthread_mutex_unlock[%d]\n", eno, eno3); + return false; + } + } + break; // loop again + default: + ipc::error("fail pthread_mutex_lock[%d]\n", eno); + return false; + } + } + } + + bool try_lock() noexcept(false) { + if (!valid()) return false; + auto ts = detail::make_timespec(0); + int eno = ::pthread_mutex_timedlock(mutex_, &ts); + switch (eno) { + case 0: + return true; + case ETIMEDOUT: + return false; + case EOWNERDEAD: { + if (shm_->ref() > 1) { + shm_->sub_ref(); + } + int eno2 = ::pthread_mutex_consistent(mutex_); + if (eno2 != 0) { + ipc::error("fail pthread_mutex_timedlock[%d], pthread_mutex_consistent[%d]\n", eno, eno2); + break; + } + int eno3 = ::pthread_mutex_unlock(mutex_); + if (eno3 != 0) { + ipc::error("fail pthread_mutex_timedlock[%d], pthread_mutex_unlock[%d]\n", eno, eno3); + break; + } + } + break; + default: + ipc::error("fail pthread_mutex_timedlock[%d]\n", eno); + break; + } + throw std::system_error{eno, std::system_category()}; + } + + bool unlock() noexcept { + if (!valid()) return false; + int eno; + if ((eno = ::pthread_mutex_unlock(mutex_)) != 0) { + ipc::error("fail pthread_mutex_unlock[%d]\n", eno); + return false; + } + return true; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/mutex_win.h b/src/libipc/platform/mutex_win.h new file mode 100644 index 00000000..b648c3b3 --- /dev/null +++ b/src/libipc/platform/mutex_win.h @@ -0,0 +1,96 @@ +#pragma once + +#include +#include + +#include + +#include "libipc/utility/log.h" + +#include "libipc/platform/to_tchar.h" +#include "libipc/platform/get_sa.h" + +namespace ipc { +namespace detail { +namespace sync { + +class mutex { + HANDLE h_ = NULL; + +public: + mutex() noexcept = default; + ~mutex() noexcept = default; + + HANDLE native() const noexcept { + return h_; + } + + bool valid() const noexcept { + return h_ != NULL; + } + + bool open(char const *name) noexcept { + close(); + h_ = ::CreateMutex(detail::get_sa(), FALSE, ipc::detail::to_tchar(name).c_str()); + if (h_ == NULL) { + ipc::error("fail CreateMutex[%lu]: %s\n", ::GetLastError(), name); + return false; + } + return true; + } + + void close() noexcept { + if (!valid()) return; + ::CloseHandle(h_); + h_ = NULL; + } + + bool lock(std::uint64_t tm) noexcept { + DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast(tm); + for(;;) { + switch ((ret = ::WaitForSingleObject(h_, ms))) { + case WAIT_OBJECT_0: + return true; + case WAIT_TIMEOUT: + return false; + case WAIT_ABANDONED: + ipc::log("fail WaitForSingleObject[%lu]: WAIT_ABANDONED, try again.\n", ::GetLastError()); + if (!unlock()) { + return false; + } + break; // loop again + default: + ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); + return false; + } + } + } + + bool try_lock() noexcept(false) { + DWORD ret = ::WaitForSingleObject(h_, 0); + switch (ret) { + case WAIT_OBJECT_0: + return true; + case WAIT_TIMEOUT: + return false; + case WAIT_ABANDONED: + unlock(); + IPC_FALLTHROUGH_; + default: + ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); + throw std::system_error{static_cast(ret), std::system_category()}; + } + } + + bool unlock() noexcept { + if (!::ReleaseMutex(h_)) { + ipc::error("fail ReleaseMutex[%lu]\n", ::GetLastError()); + return false; + } + return true; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/semaphore_linux.h b/src/libipc/platform/semaphore_linux.h new file mode 100644 index 00000000..cbc49732 --- /dev/null +++ b/src/libipc/platform/semaphore_linux.h @@ -0,0 +1,98 @@ +#pragma once + +#include + +#include /* For O_* constants */ +#include /* For mode constants */ +#include +#include + +#include "libipc/utility/log.h" +#include "libipc/platform/get_wait_time.h" +#include "libipc/shm.h" + +namespace ipc { +namespace detail { +namespace sync { + +class semaphore { + ipc::shm::handle shm_; + sem_t *h_ = SEM_FAILED; + +public: + semaphore() = default; + ~semaphore() noexcept = default; + + sem_t *native() const noexcept { + return h_; + } + + bool valid() const noexcept { + return h_ != SEM_FAILED; + } + + bool open(char const *name, std::uint32_t count) noexcept { + close(); + if (!shm_.acquire(name, 1)) { + ipc::error("[open_semaphore] fail shm.acquire: %s\n", name); + return false; + } + h_ = ::sem_open(name, O_CREAT, 0666, static_cast(count)); + if (h_ == SEM_FAILED) { + ipc::error("fail sem_open[%d]: %s\n", errno, name); + return false; + } + return true; + } + + void close() noexcept { + if (!valid()) return; + if (::sem_close(h_) != 0) { + ipc::error("fail sem_close[%d]: %s\n", errno); + } + h_ = SEM_FAILED; + if (shm_.name() != nullptr) { + std::string name = shm_.name(); + if (shm_.release() <= 1) { + if (::sem_unlink(name.c_str()) != 0) { + ipc::error("fail sem_unlink[%d]: %s, name: %s\n", errno, name.c_str()); + } + } + } + } + + bool wait(std::uint64_t tm) noexcept { + if (!valid()) return false; + if (tm == invalid_value) { + if (::sem_wait(h_) != 0) { + ipc::error("fail sem_wait[%d]: %s\n", errno); + return false; + } + } else { + auto ts = detail::make_timespec(tm); + if (::sem_timedwait(h_, &ts) != 0) { + if (errno != ETIMEDOUT) { + ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + errno, tm, ts.tv_sec, ts.tv_nsec); + } + return false; + } + } + return true; + } + + bool post(std::uint32_t count) noexcept { + if (!valid()) return false; + for (std::uint32_t i = 0; i < count; ++i) { + if (::sem_post(h_) != 0) { + ipc::error("fail sem_post[%d]: %s\n", errno); + return false; + } + } + return true; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/semaphore_win.h b/src/libipc/platform/semaphore_win.h new file mode 100644 index 00000000..9a91ebae --- /dev/null +++ b/src/libipc/platform/semaphore_win.h @@ -0,0 +1,74 @@ +#pragma once + +#include + +#include + +#include "libipc/utility/log.h" + +#include "libipc/platform/to_tchar.h" +#include "libipc/platform/get_sa.h" + +namespace ipc { +namespace detail { +namespace sync { + +class semaphore { + HANDLE h_ = NULL; + +public: + semaphore() noexcept = default; + ~semaphore() noexcept = default; + + HANDLE native() const noexcept { + return h_; + } + + bool valid() const noexcept { + return h_ != NULL; + } + + bool open(char const *name, std::uint32_t count) noexcept { + close(); + h_ = ::CreateSemaphore(detail::get_sa(), + static_cast(count), LONG_MAX, + ipc::detail::to_tchar(name).c_str()); + if (h_ == NULL) { + ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name); + return false; + } + return true; + } + + void close() noexcept { + if (!valid()) return; + ::CloseHandle(h_); + h_ = NULL; + } + + bool wait(std::uint64_t tm) noexcept { + DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast(tm); + switch ((ret = ::WaitForSingleObject(h_, ms))) { + case WAIT_OBJECT_0: + return true; + case WAIT_TIMEOUT: + return false; + case WAIT_ABANDONED: + default: + ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); + return false; + } + } + + bool post(std::uint32_t count) noexcept { + if (!::ReleaseSemaphore(h_, static_cast(count), NULL)) { + ipc::error("fail ReleaseSemaphore[%lu]\n", ::GetLastError()); + return false; + } + return true; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/shm_linux.cpp b/src/libipc/platform/shm_linux.cpp index 9b523d62..4baf8b55 100755 --- a/src/libipc/platform/shm_linux.cpp +++ b/src/libipc/platform/shm_linux.cpp @@ -22,7 +22,7 @@ namespace { struct info_t { - std::atomic_size_t acc_; + std::atomic acc_; }; struct id_info_t { @@ -81,6 +81,30 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) { return ii; } +std::int32_t get_ref(id_t id) { + if (id == nullptr) { + return 0; + } + auto ii = static_cast(id); + if (ii->mem_ == nullptr || ii->size_ == 0) { + return 0; + } + return acc_of(ii->mem_, ii->size_).load(std::memory_order_acquire); +} + +void sub_ref(id_t id) { + if (id == nullptr) { + ipc::error("fail sub_ref: invalid id (null)\n"); + return; + } + auto ii = static_cast(id); + if (ii->mem_ == nullptr || ii->size_ == 0) { + ipc::error("fail sub_ref: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_); + return; + } + acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel); +} + void * get_mem(id_t id, std::size_t * size) { if (id == nullptr) { ipc::error("fail get_mem: invalid id (null)\n"); @@ -93,7 +117,7 @@ void * get_mem(id_t id, std::size_t * size) { } int fd = ii->fd_; if (fd == -1) { - ipc::error("fail to_mem: invalid id (fd = -1)\n"); + ipc::error("fail get_mem: invalid id (fd = -1)\n"); return nullptr; } if (ii->size_ == 0) { @@ -104,7 +128,7 @@ void * get_mem(id_t id, std::size_t * size) { } ii->size_ = static_cast(st.st_size); if ((ii->size_ <= sizeof(info_t)) || (ii->size_ % sizeof(info_t))) { - ipc::error("fail to_mem: %s, invalid size = %zd\n", ii->name_.c_str(), ii->size_); + ipc::error("fail get_mem: %s, invalid size = %zd\n", ii->name_.c_str(), ii->size_); return nullptr; } } @@ -128,16 +152,17 @@ void * get_mem(id_t id, std::size_t * size) { return mem; } -void release(id_t id) { +std::int32_t release(id_t id) { if (id == nullptr) { ipc::error("fail release: invalid id (null)\n"); - return; + return -1; } + std::int32_t ret = -1; auto ii = static_cast(id); if (ii->mem_ == nullptr || ii->size_ == 0) { ipc::error("fail release: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_); } - else if (acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acquire) == 1) { + else if ((ret = acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel)) <= 1) { ::munmap(ii->mem_, ii->size_); if (!ii->name_.empty()) { ::shm_unlink(ii->name_.c_str()); @@ -145,6 +170,7 @@ void release(id_t id) { } else ::munmap(ii->mem_, ii->size_); mem::free(ii); + return ret; } void remove(id_t id) { diff --git a/src/libipc/platform/shm_win.cpp b/src/libipc/platform/shm_win.cpp index 389372d5..ae492689 100755 --- a/src/libipc/platform/shm_win.cpp +++ b/src/libipc/platform/shm_win.cpp @@ -58,6 +58,14 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) { return ii; } +std::int32_t get_ref(id_t) { + return 0; +} + +void sub_ref(id_t) { + // Do Nothing. +} + void * get_mem(id_t id, std::size_t * size) { if (id == nullptr) { ipc::error("fail get_mem: invalid id (null)\n"); @@ -88,10 +96,10 @@ void * get_mem(id_t id, std::size_t * size) { return static_cast(mem); } -void release(id_t id) { +std::int32_t release(id_t id) { if (id == nullptr) { ipc::error("fail release: invalid id (null)\n"); - return; + return -1; } auto ii = static_cast(id); if (ii->mem_ == nullptr || ii->size_ == 0) { @@ -103,6 +111,7 @@ void release(id_t id) { } else ::CloseHandle(ii->h_); mem::free(ii); + return 0; } void remove(id_t id) { diff --git a/src/libipc/platform/to_tchar.h b/src/libipc/platform/to_tchar.h index 3cc28403..61def065 100755 --- a/src/libipc/platform/to_tchar.h +++ b/src/libipc/platform/to_tchar.h @@ -41,7 +41,7 @@ constexpr auto to_tchar(ipc::string &&str) -> IsSameChar -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include "libipc/def.h" -#include "libipc/waiter_helper.h" - -#include "libipc/utility/log.h" -#include "libipc/platform/detail.h" -#include "libipc/memory/resource.h" - -namespace ipc { -namespace detail { - -inline static bool calc_wait_time(timespec& ts, std::size_t tm /*ms*/) { - timeval now; - int eno = ::gettimeofday(&now, NULL); - if (eno != 0) { - ipc::error("fail gettimeofday [%d]\n", eno); - return false; - } - ts.tv_nsec = (now.tv_usec + (tm % 1000) * 1000) * 1000; - ts.tv_sec = now.tv_sec + (tm / 1000) + (ts.tv_nsec / 1000000000); - ts.tv_nsec %= 1000000000; - return true; -} - -#pragma push_macro("IPC_PTHREAD_FUNC_") -#undef IPC_PTHREAD_FUNC_ -#define IPC_PTHREAD_FUNC_(CALL, ...) \ - int eno; \ - if ((eno = ::CALL(__VA_ARGS__)) != 0) { \ - ipc::error("fail " #CALL " [%d]\n", eno); \ - return false; \ - } \ - return true - -class mutex { - pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER; - -public: - pthread_mutex_t& native() { - return mutex_; - } - - bool open() { - int eno; - // init mutex - pthread_mutexattr_t mutex_attr; - if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) { - ipc::error("fail pthread_mutexattr_init[%d]\n", eno); - return false; - } - IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); - if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { - ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); - return false; - } - if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) { - ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno); - return false; - } - if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) { - ipc::error("fail pthread_mutex_init[%d]\n", eno); - return false; - } - return true; - } - - bool close() { - IPC_PTHREAD_FUNC_(pthread_mutex_destroy, &mutex_); - } - - bool lock() { - for (;;) { - int eno = ::pthread_mutex_lock(&mutex_); - switch (eno) { - case 0: - return true; - case EOWNERDEAD: - if (::pthread_mutex_consistent(&mutex_) == 0) { - ::pthread_mutex_unlock(&mutex_); - break; - } - IPC_FALLTHROUGH_; - case ENOTRECOVERABLE: - if (close() && open()) { - break; - } - IPC_FALLTHROUGH_; - default: - ipc::error("fail pthread_mutex_lock[%d]\n", eno); - return false; - } - } - } - - bool unlock() { - IPC_PTHREAD_FUNC_(pthread_mutex_unlock, &mutex_); - } -}; - -class condition { - pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER; - -public: - bool open() { - int eno; - // init condition - pthread_condattr_t cond_attr; - if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) { - ipc::error("fail pthread_condattr_init[%d]\n", eno); - return false; - } - IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); - if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { - ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); - return false; - } - if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) { - ipc::error("fail pthread_cond_init[%d]\n", eno); - return false; - } - return true; - } - - bool close() { - IPC_PTHREAD_FUNC_(pthread_cond_destroy, &cond_); - } - - bool wait(mutex& mtx, std::size_t tm = invalid_value) { - switch (tm) { - case 0: - return true; - case invalid_value: - IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native()); - default: { - timespec ts; - if (!calc_wait_time(ts, tm)) { - ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - tm, ts.tv_sec, ts.tv_nsec); - return false; - } - int eno; - if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) { - if (eno != ETIMEDOUT) { - ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - eno, tm, ts.tv_sec, ts.tv_nsec); - } - return false; - } - } - return true; - } - } - - bool notify() { - IPC_PTHREAD_FUNC_(pthread_cond_signal, &cond_); - } - - bool broadcast() { - IPC_PTHREAD_FUNC_(pthread_cond_broadcast, &cond_); - } -}; - -#pragma pop_macro("IPC_PTHREAD_FUNC_") - -class sem_helper { -public: - using handle_t = sem_t*; - - constexpr static handle_t invalid() noexcept { - return SEM_FAILED; - } - - static handle_t open(char const * name, long count) { - handle_t sem = ::sem_open(name, O_CREAT, 0666, count); - if (sem == SEM_FAILED) { - ipc::error("fail sem_open[%d]: %s\n", errno, name); - return invalid(); - } - return sem; - } - -#pragma push_macro("IPC_SEMAPHORE_FUNC_") -#undef IPC_SEMAPHORE_FUNC_ -#define IPC_SEMAPHORE_FUNC_(CALL, ...) \ - if (::CALL(__VA_ARGS__) != 0) { \ - ipc::error("fail " #CALL "[%d]\n", errno); \ - return false; \ - } \ - return true - - static bool close(handle_t h) { - if (h == invalid()) return false; - IPC_SEMAPHORE_FUNC_(sem_close, h); - } - - static bool destroy(char const * name) { - IPC_SEMAPHORE_FUNC_(sem_unlink, name); - } - - static bool post(handle_t h, long count) { - if (h == invalid()) return false; - auto spost = [](handle_t h) { - IPC_SEMAPHORE_FUNC_(sem_post, h); - }; - for (long i = 0; i < count; ++i) { - if (!spost(h)) return false; - } - return true; - } - - static bool wait(handle_t h, std::size_t tm = invalid_value) { - if (h == invalid()) return false; - switch (tm) { - case 0: - return true; - case invalid_value: - IPC_SEMAPHORE_FUNC_(sem_wait, h); - default: { - timespec ts; - if (!calc_wait_time(ts, tm)) { - ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - tm, ts.tv_sec, ts.tv_nsec); - return false; - } - if (::sem_timedwait(h, &ts) != 0) { - if (errno != ETIMEDOUT) { - ipc::error("fail sem_timedwait [%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - errno, tm, ts.tv_sec, ts.tv_nsec); - } - return false; - } - } - return true; - } - } - -#pragma pop_macro("IPC_SEMAPHORE_FUNC_") -}; - -class waiter_holder { -public: - using handle_t = std::tuple< - ipc::string, - sem_helper::handle_t /* sema */, - sem_helper::handle_t /* handshake */>; - - static handle_t invalid() noexcept { - return std::make_tuple( - ipc::string{}, - sem_helper::invalid(), - sem_helper::invalid()); - } - -private: - using wait_flags = waiter_helper::wait_flags; - using wait_counter = waiter_helper::wait_counter; - - mutex lock_; - wait_counter cnt_; - - struct contrl { - waiter_holder * me_; - wait_flags * flags_; - handle_t const & h_; - - wait_flags & flags() noexcept { - assert(flags_ != nullptr); - return *flags_; - } - - wait_counter & counter() noexcept { - return me_->cnt_; - } - - auto get_lock() { - return ipc::detail::unique_lock(me_->lock_); - } - - bool sema_wait(std::size_t tm) { - return sem_helper::wait(std::get<1>(h_), tm); - } - - bool sema_post(long count) { - return sem_helper::post(std::get<1>(h_), count); - } - - bool handshake_wait(std::size_t tm) { - return sem_helper::wait(std::get<2>(h_), tm); - } - - bool handshake_post(long count) { - return sem_helper::post(std::get<2>(h_), count); - } - }; - -public: - handle_t open_h(ipc::string && name) { - auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0); - if (sem == sem_helper::invalid()) { - return invalid(); - } - auto han = sem_helper::open(("__WAITER_HELPER_HAN__" + name).c_str(), 0); - if (han == sem_helper::invalid()) { - return invalid(); - } - return std::make_tuple(std::move(name), sem, han); - } - - void release_h(handle_t const & h) { - sem_helper::close(std::get<2>(h)); - sem_helper::close(std::get<1>(h)); - } - - void close_h(handle_t const & h) { - auto const & name = std::get<0>(h); - sem_helper::destroy(("__WAITER_HELPER_HAN__" + name).c_str()); - sem_helper::destroy(("__WAITER_HELPER_SEM__" + name).c_str()); - } - - bool open() { - return lock_.open(); - } - - void close() { - lock_.close(); - } - - template - bool wait_if(handle_t const & h, wait_flags * flags, F&& pred, std::size_t tm = invalid_value) { - assert(flags != nullptr); - contrl ctrl { this, flags, h }; - - class non_mutex { - public: - void lock () noexcept {} - void unlock() noexcept {} - } nm; - - return waiter_helper::wait_if(ctrl, nm, std::forward(pred), tm); - } - - bool notify(handle_t const & h) { - contrl ctrl { this, nullptr, h }; - return waiter_helper::notify(ctrl); - } - - bool broadcast(handle_t const & h) { - contrl ctrl { this, nullptr, h }; - return waiter_helper::broadcast(ctrl); - } - - bool quit_waiting(handle_t const & h, wait_flags * flags) { - assert(flags != nullptr); - contrl ctrl { this, flags, h }; - return waiter_helper::quit_waiting(ctrl); - } -}; - -class waiter { - waiter_holder helper_; - std::atomic opened_ { 0 }; - -public: - using handle_t = waiter_holder::handle_t; - - static handle_t invalid() noexcept { - return waiter_holder::invalid(); - } - - handle_t open(char const * name) { - if (name == nullptr || name[0] == '\0') { - return invalid(); - } - if ((opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && !helper_.open()) { - return invalid(); - } - return helper_.open_h(name); - } - - void close(handle_t h) { - if (h == invalid()) return; - helper_.release_h(h); - if (opened_.fetch_sub(1, std::memory_order_release) == 1) { - helper_.close_h(h); - helper_.close(); - } - } - - template - bool wait_if(handle_t h, waiter_helper::wait_flags * flags, F && pred, std::size_t tm = invalid_value) { - if (h == invalid()) return false; - return helper_.wait_if(h, flags, std::forward(pred), tm); - } - - bool notify(handle_t h) { - if (h == invalid()) return false; - return helper_.notify(h); - } - - bool broadcast(handle_t h) { - if (h == invalid()) return false; - return helper_.broadcast(h); - } - - bool quit_waiting(handle_t h, waiter_helper::wait_flags * flags) { - if (h == invalid()) return false; - return helper_.quit_waiting(h, flags); - } -}; - -} // namespace detail -} // namespace ipc diff --git a/src/libipc/platform/waiter_win.h b/src/libipc/platform/waiter_win.h deleted file mode 100755 index 4f3d0802..00000000 --- a/src/libipc/platform/waiter_win.h +++ /dev/null @@ -1,233 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -#include "libipc/rw_lock.h" -#include "libipc/pool_alloc.h" -#include "libipc/shm.h" -#include "libipc/waiter_helper.h" - -#include "libipc/utility/log.h" -#include "libipc/platform/to_tchar.h" -#include "libipc/platform/get_sa.h" -#include "libipc/platform/detail.h" -#include "libipc/memory/resource.h" - -namespace ipc { -namespace detail { - -class semaphore { - HANDLE h_ = NULL; - -public: - static void remove(char const * /*name*/) {} - - bool open(ipc::string && name, long count = 0, long limit = LONG_MAX) { - h_ = ::CreateSemaphore(detail::get_sa(), count, limit, ipc::detail::to_tchar(std::move(name)).c_str()); - if (h_ == NULL) { - ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name.c_str()); - return false; - } - return true; - } - - void close() { - ::CloseHandle(h_); - } - - bool wait(std::size_t tm = invalid_value) { - DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast(tm); - switch ((ret = ::WaitForSingleObject(h_, ms))) { - case WAIT_OBJECT_0: - return true; - case WAIT_TIMEOUT: - return false; - case WAIT_ABANDONED: - default: - ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); - return false; - } - } - - bool post(long count = 1) { - if (::ReleaseSemaphore(h_, count, NULL)) { - return true; - } - ipc::error("fail ReleaseSemaphore[%lu]\n", ::GetLastError()); - return false; - } -}; - -class mutex : public semaphore { - using semaphore::wait; - using semaphore::post; - -public: - bool open(ipc::string && name) { - return semaphore::open(std::move(name), 1, 1); - } - - bool lock () { return semaphore::wait(); } - bool unlock() { return semaphore::post(); } -}; - -class condition { - using wait_flags = waiter_helper::wait_flags; - using wait_counter = waiter_helper::wait_counter; - - mutex lock_; - semaphore sema_, handshake_; - wait_counter * cnt_ = nullptr; - - struct contrl { - condition * me_; - wait_flags * flags_; - - wait_flags & flags() noexcept { - assert(flags_ != nullptr); - return *flags_; - } - - wait_counter & counter() noexcept { - assert(me_->cnt_ != nullptr); - return *(me_->cnt_); - } - - auto get_lock() { - return ipc::detail::unique_lock(me_->lock_); - } - - bool sema_wait(std::size_t tm) { - return me_->sema_.wait(tm); - } - - bool sema_post(long count) { - return me_->sema_.post(count); - } - - bool handshake_wait(std::size_t tm) { - return me_->handshake_.wait(tm); - } - - bool handshake_post(long count) { - return me_->handshake_.post(count); - } - }; - -public: - friend bool operator==(condition const & c1, condition const & c2) { - return c1.cnt_ == c2.cnt_; - } - - friend bool operator!=(condition const & c1, condition const & c2) { - return !(c1 == c2); - } - - static void remove(char const * name) { - semaphore::remove((ipc::string{ "__COND_HAN__" } + name).c_str()); - semaphore::remove((ipc::string{ "__COND_SEM__" } + name).c_str()); - mutex ::remove((ipc::string{ "__COND_MTX__" } + name).c_str()); - } - - bool open(ipc::string const & name, wait_counter * cnt) { - if (lock_ .open("__COND_MTX__" + name) && - sema_ .open("__COND_SEM__" + name) && - handshake_.open("__COND_HAN__" + name)) { - cnt_ = cnt; - return true; - } - return false; - } - - void close() { - handshake_.close(); - sema_ .close(); - lock_ .close(); - } - - template - bool wait_if(Mutex & mtx, wait_flags * flags, F && pred, std::size_t tm = invalid_value) { - assert(flags != nullptr); - contrl ctrl { this, flags }; - return waiter_helper::wait_if(ctrl, mtx, std::forward(pred), tm); - } - - bool notify() { - contrl ctrl { this, nullptr }; - return waiter_helper::notify(ctrl); - } - - bool broadcast() { - contrl ctrl { this, nullptr }; - return waiter_helper::broadcast(ctrl); - } - - bool quit_waiting(wait_flags * flags) { - assert(flags != nullptr); - contrl ctrl { this, flags }; - return waiter_helper::quit_waiting(ctrl); - } -}; - -class waiter { - waiter_helper::wait_counter cnt_; - -public: - using handle_t = condition; - - static handle_t invalid() { - return condition {}; - } - - handle_t open(char const * name) { - if (name == nullptr || name[0] == '\0') { - return invalid(); - } - condition cond; - if (cond.open(name, &cnt_)) { - return cond; - } - return invalid(); - } - - void close(handle_t& h) { - if (h == invalid()) return; - h.close(); - } - - template - bool wait_if(handle_t& h, waiter_helper::wait_flags * flags, F&& pred, std::size_t tm = invalid_value) { - if (h == invalid()) return false; - - class non_mutex { - public: - void lock () noexcept {} - void unlock() noexcept {} - } nm; - - return h.wait_if(nm, flags, std::forward(pred), tm); - } - - bool notify(handle_t& h) { - if (h == invalid()) return false; - return h.notify(); - } - - bool broadcast(handle_t& h) { - if (h == invalid()) return false; - return h.broadcast(); - } - - bool quit_waiting(handle_t& h, waiter_helper::wait_flags * flags) { - if (h == invalid()) return false; - return h.quit_waiting(flags); - } -}; - -} // namespace detail -} // namespace ipc diff --git a/src/libipc/platform/waiter_wrapper.h b/src/libipc/platform/waiter_wrapper.h deleted file mode 100755 index 553d2e1b..00000000 --- a/src/libipc/platform/waiter_wrapper.h +++ /dev/null @@ -1,292 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "libipc/shm.h" - -#include "libipc/memory/resource.h" -#include "libipc/platform/detail.h" -#if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ - defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ - defined(WINCE) || defined(_WIN32_WCE) - -#include "libipc/platform/waiter_win.h" - -namespace ipc { -namespace detail { - -using mutex_impl = ipc::detail::mutex; -using semaphore_impl = ipc::detail::semaphore; - -class condition_impl : public ipc::detail::condition { - using base_t = ipc::detail::condition; - - ipc::shm::handle cnt_h_; - waiter_helper::wait_flags flags_; - -public: - static void remove(char const * name) { - base_t::remove(name); - ipc::string n = name; - ipc::shm::remove((n + "__COND_CNT__" ).c_str()); - ipc::shm::remove((n + "__COND_WAIT__").c_str()); - } - - bool open(char const * name) { - if (cnt_h_ .acquire( - (ipc::string { name } + "__COND_CNT__" ).c_str(), - sizeof(waiter_helper::wait_counter))) { - flags_.is_closed_.store(false, std::memory_order_release); - return base_t::open(name, - static_cast(cnt_h_.get())); - } - return false; - } - - void close() { - flags_.is_closed_.store(true, std::memory_order_release); - base_t::quit_waiting(&flags_); - base_t::close(); - cnt_h_.release(); - } - - bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) { - return base_t::wait_if(mtx, &flags_, [] { return true; }, tm); - } -}; - -} // namespace detail -} // namespace ipc - -#else /*!WIN*/ - -#include "libipc/platform/waiter_linux.h" - -namespace ipc { -namespace detail { - -template -class object_impl { - ipc::shm::handle h_; - - struct info_t { - T object_; - std::atomic opened_; - }; - -public: - static void remove(char const * name) { - { - ipc::shm::handle h { name, sizeof(info_t) }; - if (h.valid()) { - auto info = static_cast(h.get()); - info->object_.close(); - } - } - ipc::shm::remove(name); - } - - T& object() { - return static_cast(h_.get())->object_; - } - - template - bool open(char const * name, P&&... params) { - if (!h_.acquire(name, sizeof(info_t))) { - return false; - } - auto info = static_cast(h_.get()); - if ((info->opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && - !info->object_.open(std::forward

(params)...)) { - return false; - } - return true; - } - - void close() { - if (!h_.valid()) return; - auto info = static_cast(h_.get()); - if (info->opened_.fetch_sub(1, std::memory_order_release) == 1) { - info->object_.close(); - } - h_.release(); - } -}; - -class mutex_impl : public object_impl { -public: - bool lock () { return object().lock (); } - bool unlock() { return object().unlock(); } -}; - -class condition_impl : public object_impl { -public: - bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) { - return object().wait(mtx.object(), tm); - } - - bool notify () { return object().notify (); } - bool broadcast() { return object().broadcast(); } -}; - -class semaphore_impl { - sem_helper::handle_t h_; - ipc::shm::handle opened_; // std::atomic - ipc::string name_; - - auto cnt() { - return static_cast*>(opened_.get()); - } - -public: - static void remove(char const * name) { - sem_helper::destroy((ipc::string{ "__SEMAPHORE_IMPL_SEM__" } + name).c_str()); - ipc::shm::remove ((ipc::string{ "__SEMAPHORE_IMPL_CNT__" } + name).c_str()); - } - - bool open(char const * name, long count) { - name_ = name; - if (!opened_.acquire(("__SEMAPHORE_IMPL_CNT__" + name_).c_str(), sizeof(std::atomic))) { - return false; - } - if ((h_ = sem_helper::open(("__SEMAPHORE_IMPL_SEM__" + name_).c_str(), count)) == sem_helper::invalid()) { - return false; - } - cnt()->fetch_add(1, std::memory_order_acq_rel); - return true; - } - - void close() { - if (h_ == sem_helper::invalid()) return; - sem_helper::close(h_); - if (cnt() == nullptr) return; - if (cnt()->fetch_sub(1, std::memory_order_release) == 1) { - sem_helper::destroy(("__SEMAPHORE_IMPL_SEM__" + name_).c_str()); - } - opened_.release(); - } - - bool wait(std::size_t tm = invalid_value) { - return sem_helper::wait(h_, tm); - } - - bool post(long count) { - return sem_helper::post(h_, count); - } -}; - -} // namespace detail -} // namespace ipc - -#endif/*!WIN*/ - -namespace ipc { -namespace detail { - -class waiter_wrapper { -public: - using waiter_t = detail::waiter; - -private: - waiter_t* w_ = nullptr; - waiter_t::handle_t h_ = waiter_t::invalid(); - waiter_helper::wait_flags flags_; - -public: - waiter_wrapper() = default; - explicit waiter_wrapper(waiter_t* w) { - attach(w); - } - waiter_wrapper(const waiter_wrapper&) = delete; - waiter_wrapper& operator=(const waiter_wrapper&) = delete; - - waiter_t * waiter() { return w_; } - waiter_t const * waiter() const { return w_; } - - void attach(waiter_t* w) { - close(); - w_ = w; - } - - bool valid() const { - return (w_ != nullptr) && (h_ != waiter_t::invalid()); - } - - bool open(char const * name) { - if (w_ == nullptr) return false; - close(); - flags_.is_closed_.store(false, std::memory_order_release); - h_ = w_->open(name); - return valid(); - } - - void close() { - if (!valid()) return; - flags_.is_closed_.store(true, std::memory_order_release); - quit_waiting(); - w_->close(h_); - h_ = waiter_t::invalid(); - } - - void quit_waiting() { - w_->quit_waiting(h_, &flags_); - } - - template - bool wait_if(F && pred, std::size_t tm = invalid_value) { - if (!valid()) return false; - return w_->wait_if(h_, &flags_, std::forward(pred), tm); - } - - bool notify() { - if (!valid()) return false; - w_->notify(h_); - return true; - } - - bool broadcast() { - if (!valid()) return false; - w_->broadcast(h_); - return true; - } -}; - -} // namespace detail - -class waiter : public detail::waiter_wrapper { - - shm::handle shm_; - - using detail::waiter_wrapper::attach; - -public: - waiter() = default; - waiter(char const * name) { - open(name); - } - - ~waiter() { - close(); - } - - bool open(char const * name) { - if (name == nullptr || name[0] == '\0') { - return false; - } - close(); - if (!shm_.acquire((ipc::string{ "__SHM_WAITER__" } + name).c_str(), sizeof(waiter_t))) { - return false; - } - attach(static_cast(shm_.get())); - return detail::waiter_wrapper::open((ipc::string{ "__IMP_WAITER__" } + name).c_str()); - } - - void close() { - detail::waiter_wrapper::close(); - shm_.release(); - } -}; - -} // namespace ipc diff --git a/src/pool_alloc.cpp b/src/libipc/pool_alloc.cpp similarity index 100% rename from src/pool_alloc.cpp rename to src/libipc/pool_alloc.cpp diff --git a/src/shm.cpp b/src/libipc/shm.cpp similarity index 75% rename from src/shm.cpp rename to src/libipc/shm.cpp index a2eed938..92f96ef4 100755 --- a/src/shm.cpp +++ b/src/libipc/shm.cpp @@ -47,18 +47,26 @@ handle& handle::operator=(handle rhs) { return *this; } -bool handle::valid() const { +bool handle::valid() const noexcept { return impl(p_)->m_ != nullptr; } -std::size_t handle::size() const { +std::size_t handle::size() const noexcept { return impl(p_)->s_; } -char const * handle::name() const { +char const * handle::name() const noexcept { return impl(p_)->n_.c_str(); } +std::int32_t handle::ref() const noexcept { + return shm::get_ref(impl(p_)->id_); +} + +void handle::sub_ref() noexcept { + shm::sub_ref(impl(p_)->id_); +} + bool handle::acquire(char const * name, std::size_t size, unsigned mode) { release(); impl(p_)->id_ = shm::acquire((impl(p_)->n_ = name).c_str(), size, mode); @@ -66,9 +74,9 @@ bool handle::acquire(char const * name, std::size_t size, unsigned mode) { return valid(); } -void handle::release() { - if (impl(p_)->id_ == nullptr) return; - shm::release(detach()); +std::int32_t handle::release() { + if (impl(p_)->id_ == nullptr) return -1; + return shm::release(detach()); } void* handle::get() const { diff --git a/src/libipc/sync/condition.cpp b/src/libipc/sync/condition.cpp new file mode 100644 index 00000000..2859d213 --- /dev/null +++ b/src/libipc/sync/condition.cpp @@ -0,0 +1,70 @@ + +#include "libipc/condition.h" + +#include "libipc/utility/pimpl.h" +#include "libipc/memory/resource.h" +#include "libipc/platform/detail.h" +#if defined(IPC_OS_WINDOWS_) +#include "libipc/platform/condition_win.h" +#elif defined(IPC_OS_LINUX_) +#include "libipc/platform/condition_linux.h" +#else/*linux*/ +# error "Unsupported platform." +#endif + +namespace ipc { +namespace sync { + +class condition::condition_ : public ipc::pimpl { +public: + ipc::detail::sync::condition cond_; +}; + +condition::condition() + : p_(p_->make()) { +} + +condition::condition(char const * name) + : condition() { + open(name); +} + +condition::~condition() { + close(); + p_->clear(); +} + +void const *condition::native() const noexcept { + return impl(p_)->cond_.native(); +} + +void *condition::native() noexcept { + return impl(p_)->cond_.native(); +} + +bool condition::valid() const noexcept { + return impl(p_)->cond_.valid(); +} + +bool condition::open(char const *name) noexcept { + return impl(p_)->cond_.open(name); +} + +void condition::close() noexcept { + impl(p_)->cond_.close(); +} + +bool condition::wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { + return impl(p_)->cond_.wait(mtx, tm); +} + +bool condition::notify() noexcept { + return impl(p_)->cond_.notify(); +} + +bool condition::broadcast() noexcept { + return impl(p_)->cond_.broadcast(); +} + +} // namespace sync +} // namespace ipc diff --git a/src/libipc/sync/mutex.cpp b/src/libipc/sync/mutex.cpp new file mode 100644 index 00000000..813e3340 --- /dev/null +++ b/src/libipc/sync/mutex.cpp @@ -0,0 +1,70 @@ + +#include "libipc/mutex.h" + +#include "libipc/utility/pimpl.h" +#include "libipc/memory/resource.h" +#include "libipc/platform/detail.h" +#if defined(IPC_OS_WINDOWS_) +#include "libipc/platform/mutex_win.h" +#elif defined(IPC_OS_LINUX_) +#include "libipc/platform/mutex_linux.h" +#else/*linux*/ +# error "Unsupported platform." +#endif + +namespace ipc { +namespace sync { + +class mutex::mutex_ : public ipc::pimpl { +public: + ipc::detail::sync::mutex lock_; +}; + +mutex::mutex() + : p_(p_->make()) { +} + +mutex::mutex(char const * name) + : mutex() { + open(name); +} + +mutex::~mutex() { + close(); + p_->clear(); +} + +void const *mutex::native() const noexcept { + return impl(p_)->lock_.native(); +} + +void *mutex::native() noexcept { + return impl(p_)->lock_.native(); +} + +bool mutex::valid() const noexcept { + return impl(p_)->lock_.valid(); +} + +bool mutex::open(char const *name) noexcept { + return impl(p_)->lock_.open(name); +} + +void mutex::close() noexcept { + impl(p_)->lock_.close(); +} + +bool mutex::lock(std::uint64_t tm) noexcept { + return impl(p_)->lock_.lock(tm); +} + +bool mutex::try_lock() noexcept(false) { + return impl(p_)->lock_.try_lock(); +} + +bool mutex::unlock() noexcept { + return impl(p_)->lock_.unlock(); +} + +} // namespace sync +} // namespace ipc diff --git a/src/libipc/sync/semaphore.cpp b/src/libipc/sync/semaphore.cpp new file mode 100644 index 00000000..6e868233 --- /dev/null +++ b/src/libipc/sync/semaphore.cpp @@ -0,0 +1,66 @@ + +#include "libipc/semaphore.h" + +#include "libipc/utility/pimpl.h" +#include "libipc/memory/resource.h" +#include "libipc/platform/detail.h" +#if defined(IPC_OS_WINDOWS_) +#include "libipc/platform/semaphore_win.h" +#elif defined(IPC_OS_LINUX_) +#include "libipc/platform/semaphore_linux.h" +#else/*linux*/ +# error "Unsupported platform." +#endif + +namespace ipc { +namespace sync { + +class semaphore::semaphore_ : public ipc::pimpl { +public: + ipc::detail::sync::semaphore sem_; +}; + +semaphore::semaphore() + : p_(p_->make()) { +} + +semaphore::semaphore(char const * name, std::uint32_t count) + : semaphore() { + open(name, count); +} + +semaphore::~semaphore() { + close(); + p_->clear(); +} + +void const *semaphore::native() const noexcept { + return impl(p_)->sem_.native(); +} + +void *semaphore::native() noexcept { + return impl(p_)->sem_.native(); +} + +bool semaphore::valid() const noexcept { + return impl(p_)->sem_.valid(); +} + +bool semaphore::open(char const *name, std::uint32_t count) noexcept { + return impl(p_)->sem_.open(name, count); +} + +void semaphore::close() noexcept { + impl(p_)->sem_.close(); +} + +bool semaphore::wait(std::uint64_t tm) noexcept { + return impl(p_)->sem_.wait(tm); +} + +bool semaphore::post(std::uint32_t count) noexcept { + return impl(p_)->sem_.post(count); +} + +} // namespace sync +} // namespace ipc diff --git a/src/libipc/utility/pimpl.h b/src/libipc/utility/pimpl.h index 25074e71..ca0edd3c 100755 --- a/src/libipc/utility/pimpl.h +++ b/src/libipc/utility/pimpl.h @@ -3,6 +3,7 @@ #include #include +#include "libipc/platform/detail.h" #include "libipc/utility/concept.h" #include "libipc/pool_alloc.h" @@ -17,49 +18,45 @@ template using IsImplUncomfortable = ipc::require<(sizeof(T) > sizeof(T*)), R>; template -constexpr auto make_impl(P&&... params) -> IsImplComfortable { +IPC_CONSTEXPR_ auto make_impl(P&&... params) -> IsImplComfortable { T* buf {}; ::new (&buf) T { std::forward

(params)... }; return buf; } template -constexpr auto impl(T* const (& p)) -> IsImplComfortable { +IPC_CONSTEXPR_ auto impl(T* const (& p)) -> IsImplComfortable { return reinterpret_cast(&const_cast(reinterpret_cast(p))); } template -constexpr auto clear_impl(T* p) -> IsImplComfortable { +IPC_CONSTEXPR_ auto clear_impl(T* p) -> IsImplComfortable { if (p != nullptr) impl(p)->~T(); } template -constexpr auto make_impl(P&&... params) -> IsImplUncomfortable { +IPC_CONSTEXPR_ auto make_impl(P&&... params) -> IsImplUncomfortable { return mem::alloc(std::forward

(params)...); } template -constexpr auto clear_impl(T* p) -> IsImplUncomfortable { +IPC_CONSTEXPR_ auto clear_impl(T* p) -> IsImplUncomfortable { mem::free(p); } template -constexpr auto impl(T* const (& p)) -> IsImplUncomfortable { +IPC_CONSTEXPR_ auto impl(T* const (& p)) -> IsImplUncomfortable { return p; } template struct pimpl { template - constexpr static T* make(P&&... params) { + IPC_CONSTEXPR_ static T* make(P&&... params) { return make_impl(std::forward

(params)...); } -#if __cplusplus >= 201703L - constexpr void clear() { -#else /*__cplusplus < 201703L*/ - void clear() { -#endif/*__cplusplus < 201703L*/ + IPC_CONSTEXPR_ void clear() { clear_impl(static_cast(const_cast(this))); } }; diff --git a/src/libipc/waiter.h b/src/libipc/waiter.h new file mode 100644 index 00000000..79837026 --- /dev/null +++ b/src/libipc/waiter.h @@ -0,0 +1,81 @@ +#pragma once + +#include +#include +#include +#include + +#include "libipc/def.h" +#include "libipc/mutex.h" +#include "libipc/condition.h" +#include "libipc/platform/detail.h" + +namespace ipc { +namespace detail { + +class waiter { + ipc::sync::condition cond_; + ipc::sync::mutex lock_; + std::atomic quit_ {false}; + +public: + waiter() = default; + waiter(char const *name) { + open(name); + } + + ~waiter() { + close(); + } + + bool valid() const noexcept { + return cond_.valid() && lock_.valid(); + } + + bool open(char const *name) noexcept { + quit_.store(false, std::memory_order_relaxed); + if (!cond_.open((std::string{"_waiter_cond_"} + name).c_str())) { + return false; + } + if (!lock_.open((std::string{"_waiter_lock_"} + name).c_str())) { + cond_.close(); + return false; + } + return valid(); + } + + void close() noexcept { + cond_.close(); + lock_.close(); + } + + template + bool wait_if(F &&pred, std::uint64_t tm = ipc::invalid_value) noexcept { + IPC_UNUSED_ std::lock_guard guard {lock_}; + while ([this, &pred] { + return !quit_.load(std::memory_order_relaxed) + && std::forward(pred)(); + }()) { + if (!cond_.wait(lock_, tm)) return false; + } + return true; + } + + bool notify() noexcept { + std::lock_guard{lock_}; // barrier + return cond_.notify(); + } + + bool broadcast() noexcept { + std::lock_guard{lock_}; // barrier + return cond_.broadcast(); + } + + bool quit_waiting() { + quit_.store(true, std::memory_order_release); + return broadcast(); + } +}; + +} // namespace detail +} // namespace ipc diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h deleted file mode 100644 index a32035b7..00000000 --- a/src/libipc/waiter_helper.h +++ /dev/null @@ -1,129 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "libipc/def.h" -#include "libipc/utility/scope_guard.h" - -namespace ipc { -namespace detail { - -struct waiter_helper { - - struct wait_counter { - std::atomic waiting_ { 0 }; - long counter_ = 0; - }; - - struct wait_flags { - std::atomic is_waiting_ { false }; - std::atomic is_closed_ { true }; - std::atomic need_dest_ { false }; - }; - - template - static bool wait_if(Ctrl & ctrl, Mutex & mtx, F && pred, std::size_t tm) { - auto & flags = ctrl.flags(); - if (flags.is_closed_.load(std::memory_order_acquire)) { - return false; - } - - auto & counter = ctrl.counter(); - counter.waiting_.fetch_add(1, std::memory_order_release); - flags.is_waiting_.store(true, std::memory_order_relaxed); - auto finally = ipc::guard([&counter, &flags] { - counter.waiting_.fetch_sub(1, std::memory_order_release); - flags.is_waiting_.store(false, std::memory_order_relaxed); - }); - { - IPC_UNUSED_ auto guard = ctrl.get_lock(); - if (!std::forward(pred)()) return true; - counter.counter_ += 1; - } - mtx.unlock(); - - bool ret = false; - do { - bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed); - bool is_closed = flags.is_closed_ .load(std::memory_order_acquire); - if (!is_waiting || is_closed) { - flags.need_dest_.store(false, std::memory_order_release); - ret = false; - break; - } - else if (flags.need_dest_.exchange(false, std::memory_order_release)) { - ret = false; - ctrl.sema_wait(default_timeout); - break; - } - else { - ret = ctrl.sema_wait(tm); - } - } while (flags.need_dest_.load(std::memory_order_acquire)); - finally.do_exit(); - ret = ctrl.handshake_post(1) && ret; - - mtx.lock(); - return ret; - } - - template - static bool notify(Ctrl & ctrl) { - auto & counter = ctrl.counter(); - if ((counter.waiting_.load(std::memory_order_acquire)) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ctrl.get_lock(); - if (counter.counter_ > 0) { - ret = ctrl.sema_post(1); - counter.counter_ -= 1; - ret = ret && ctrl.handshake_wait(default_timeout); - } - return ret; - } - - template - static bool broadcast(Ctrl & ctrl) { - auto & counter = ctrl.counter(); - if ((counter.waiting_.load(std::memory_order_acquire)) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ctrl.get_lock(); - if (counter.counter_ > 0) { - ret = ctrl.sema_post(counter.counter_); - do { - counter.counter_ -= 1; - ret = ret && ctrl.handshake_wait(default_timeout); - } while (counter.counter_ > 0); - } - return ret; - } - - template - static bool quit_waiting(Ctrl & ctrl) { - auto & flags = ctrl.flags(); - flags.need_dest_.store(true, std::memory_order_relaxed); - if (!flags.is_waiting_.exchange(false, std::memory_order_release)) { - return true; - } - auto & counter = ctrl.counter(); - if ((counter.waiting_.load(std::memory_order_acquire)) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ctrl.get_lock(); - if (counter.counter_ > 0) { - ret = ctrl.sema_post(counter.counter_); - counter.counter_ -= 1; - ret = ret && ctrl.handshake_wait(default_timeout); - } - return ret; - } -}; - -} // namespace detail -} // namespace ipc diff --git a/src/libipc/waiter_template.inc b/src/libipc/waiter_template.inc deleted file mode 100755 index 976b3d4d..00000000 --- a/src/libipc/waiter_template.inc +++ /dev/null @@ -1,71 +0,0 @@ - -#undef IPC_OBJECT_TYPE_P_ -#undef IPC_OBJECT_TYPE_I_ - -#define IPC_OBJECT_TYPE_P_ IPC_PP_JOIN_(IPC_OBJECT_TYPE_, _) -#define IPC_OBJECT_TYPE_I_ IPC_PP_JOIN_(IPC_OBJECT_TYPE_, _impl) - -class IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_P_ : public pimpl { -public: - std::string n_; - ipc::detail::IPC_OBJECT_TYPE_I_ h_; -}; - -void IPC_OBJECT_TYPE_::remove(char const * name) { - detail::IPC_OBJECT_TYPE_I_::remove(name); -} - -IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_() - : p_(p_->make()) { -} - -IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_(char const * name) - : IPC_OBJECT_TYPE_() { - open(name); -} - -IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_(IPC_OBJECT_TYPE_&& rhs) - : IPC_OBJECT_TYPE_() { - swap(rhs); -} - -IPC_OBJECT_TYPE_::~IPC_OBJECT_TYPE_() { - close(); - p_->clear(); -} - -void IPC_OBJECT_TYPE_::swap(IPC_OBJECT_TYPE_& rhs) { - std::swap(p_, rhs.p_); -} - -IPC_OBJECT_TYPE_& IPC_OBJECT_TYPE_::operator=(IPC_OBJECT_TYPE_ rhs) { - swap(rhs); - return *this; -} - -bool IPC_OBJECT_TYPE_::valid() const { - return (p_ != nullptr) && !impl(p_)->n_.empty(); -} - -char const * IPC_OBJECT_TYPE_::name() const { - return impl(p_)->n_.c_str(); -} - -bool IPC_OBJECT_TYPE_::open(char const * name IPC_OBJECT_TYPE_OPEN_PARS_) { - if (name == nullptr || name[0] == '\0') { - return false; - } - if (impl(p_)->n_ == name) return true; - close(); - if (impl(p_)->h_.open(name IPC_OBJECT_TYPE_OPEN_ARGS_)) { - impl(p_)->n_ = name; - return true; - } - return false; -} - -void IPC_OBJECT_TYPE_::close() { - if (!valid()) return; - impl(p_)->h_.close(); - impl(p_)->n_.clear(); -} diff --git a/src/waiter.cpp b/src/waiter.cpp deleted file mode 100755 index 24a13823..00000000 --- a/src/waiter.cpp +++ /dev/null @@ -1,77 +0,0 @@ - -#include - -#include "libipc/waiter.h" - -#include "libipc/utility/pimpl.h" -#include "libipc/platform/waiter_wrapper.h" - -#undef IPC_PP_CAT_ -#undef IPC_PP_JOIN_T__ -#undef IPC_PP_JOIN_ - -#define IPC_PP_CAT_(X, ...) X##__VA_ARGS__ -#define IPC_PP_JOIN_T__(X, ...) IPC_PP_CAT_(X, __VA_ARGS__) -#define IPC_PP_JOIN_(X, ...) IPC_PP_JOIN_T__(X, __VA_ARGS__) - -namespace ipc { - -#undef IPC_OBJECT_TYPE_ -#undef IPC_OBJECT_TYPE_OPEN_PARS_ -#undef IPC_OBJECT_TYPE_OPEN_ARGS_ - -#define IPC_OBJECT_TYPE_ mutex -#define IPC_OBJECT_TYPE_OPEN_PARS_ -#define IPC_OBJECT_TYPE_OPEN_ARGS_ - -#include "libipc/waiter_template.inc" - -bool mutex::lock() { - return impl(p_)->h_.lock(); -} - -bool mutex::unlock() { - return impl(p_)->h_.unlock(); -} - -#undef IPC_OBJECT_TYPE_ -#undef IPC_OBJECT_TYPE_OPEN_PARS_ -#undef IPC_OBJECT_TYPE_OPEN_ARGS_ - -#define IPC_OBJECT_TYPE_ semaphore -#define IPC_OBJECT_TYPE_OPEN_PARS_ , long count -#define IPC_OBJECT_TYPE_OPEN_ARGS_ , count - -#include "libipc/waiter_template.inc" - -bool semaphore::wait(std::size_t tm) { - return impl(p_)->h_.wait(tm); -} - -bool semaphore::post(long count) { - return impl(p_)->h_.post(count); -} - -#undef IPC_OBJECT_TYPE_ -#undef IPC_OBJECT_TYPE_OPEN_PARS_ -#undef IPC_OBJECT_TYPE_OPEN_ARGS_ - -#define IPC_OBJECT_TYPE_ condition -#define IPC_OBJECT_TYPE_OPEN_PARS_ -#define IPC_OBJECT_TYPE_OPEN_ARGS_ - -#include "libipc/waiter_template.inc" - -bool condition::wait(mutex& mtx, std::size_t tm) { - return impl(p_)->h_.wait(impl(mtx.p_)->h_, tm); -} - -bool condition::notify() { - return impl(p_)->h_.notify(); -} - -bool condition::broadcast() { - return impl(p_)->h_.broadcast(); -} - -} // namespace ipc diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 4b695eb8..9398aa30 100755 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -15,7 +15,10 @@ include_directories( ${LIBIPC_PROJECT_DIR}/3rdparty ${LIBIPC_PROJECT_DIR}/3rdparty/gtest/include) -file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/test/*.cpp) +file(GLOB SRC_FILES + ${LIBIPC_PROJECT_DIR}/test/*.cpp + # ${LIBIPC_PROJECT_DIR}/test/profiler/*.cpp + ) file(GLOB HEAD_FILES ${LIBIPC_PROJECT_DIR}/test/*.h) add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES}) diff --git a/test/profiler/README.md b/test/profiler/README.md new file mode 100644 index 00000000..d4a6bea1 --- /dev/null +++ b/test/profiler/README.md @@ -0,0 +1,11 @@ +# A Quick Introduction to C++ Performance Tuning +(From: https://github.com/adah1972/cpp_summit_2020.git) + +This repository contains the presentation file and example code for my +presentation at the C++ Summit 2020 held in Shenzhen, China on 4–5 December +2020. + +The presentation content is shared under a [Creative Commons Attribution-Share +Alike 2.5 Licence](http://creativecommons.org/licenses/by-sa/2.5/). The code +is put in the public domain (i.e. do whatever you like with it), though an +acknowledgement will be appreciated (but not required). diff --git a/test/profiler/profiler.cpp b/test/profiler/profiler.cpp new file mode 100644 index 00000000..d8fd7bcf --- /dev/null +++ b/test/profiler/profiler.cpp @@ -0,0 +1,77 @@ +#include "profiler.h" +#include +#include +#include + +namespace { + +struct profiling_data { + int number; + int call_count{}; + uint64_t call_duration{}; +}; + +class profiler { +public: + profiler(); + ~profiler(); + + void add_data(int number, uint64_t duration); + +private: + std::vector data_; +}; + +profiler::profiler() +{ + size_t len = 0; + for (;;) { + if (name_map[len].name == NULL) { + break; + } + ++len; + } + data_.resize(len); + int i = 0; + for (auto& item : data_) { + assert(i == name_map[i].number); + item.number = i; + ++i; + } +} + +profiler::~profiler() +{ +#ifndef NDEBUG + for (auto& item : data_) { + if (item.call_count == 0) { + continue; + } + std::cout << item.number << " " << name_map[item.number].name + << ":\n"; + std::cout << " Call count: " << item.call_count << '\n'; + std::cout << " Call duration: " << item.call_duration << '\n'; + std::cout << " Average duration: " + << item.call_duration * 1.0 / + (item.call_count != 0 ? item.call_count : 1) + << '\n'; + } +#endif +} + +void profiler::add_data(int number, uint64_t duration) +{ + assert(number >= 0 && number < static_cast(data_.size())); + data_[number].call_count++; + data_[number].call_duration += duration; +} + +profiler profiler_instance; + +} // unnamed namespace + +profiling_checker::~profiling_checker() +{ + auto end_time = rdtsc(); + profiler_instance.add_data(number_, end_time - start_time_); +} diff --git a/test/profiler/profiler.h b/test/profiler/profiler.h new file mode 100644 index 00000000..d04264ab --- /dev/null +++ b/test/profiler/profiler.h @@ -0,0 +1,35 @@ +#ifndef PROFILER_H +#define PROFILER_H + +#include "rdtsc.h" + +struct name_mapper { + int number; + const char* name; +}; + +extern name_mapper name_map[]; + +class profiling_checker { +public: + profiling_checker(int number); + ~profiling_checker(); + +private: + int number_; + uint64_t start_time_; +}; + +inline profiling_checker::profiling_checker(int number) + : number_(number) +{ + start_time_ = rdtsc(); +} + +#ifdef NDEBUG +#define PROFILE_CHECK(func_number) (void)0 +#else +#define PROFILE_CHECK(func_number) profiling_checker _checker(func_number) +#endif + +#endif // PROFILER_H diff --git a/test/profiler/rdtsc.h b/test/profiler/rdtsc.h new file mode 100644 index 00000000..80e35c79 --- /dev/null +++ b/test/profiler/rdtsc.h @@ -0,0 +1,52 @@ +#ifndef RDTSC_H +#define RDTSC_H + +#include // uint64_t + +#if defined(_M_X64) || defined(_M_IX86) || defined(__x86_64) || defined(__i386) +# ifdef _WIN32 +# include // __rdtsc +# else +# include // __rdtsc +# endif +# define HAS_HW_RDTSC 1 +#else +# include // std::chrono::high_resolution_clock +# define HAS_HW_RDTSC 0 +#endif + +inline uint64_t rdtsc() +{ +#if HAS_HW_RDTSC + // _mm_lfence() might be used to serialize the instruction stream, + // and it would guarantee that RDTSC will not be reordered with + // other instructions. However, measurements show that the overhead + // may be too big (easily 15 to 30 CPU cycles) for profiling + // purposes: if reordering matters, the overhead matters too! + + // Forbid the compiler from reordering instructions +# ifdef _MSC_VER + _ReadWriteBarrier(); +# else + __asm__ __volatile__("" : : : "memory"); +# endif + + uint64_t result = __rdtsc(); + + // Forbid the compiler from reordering instructions +# ifdef _MSC_VER + _ReadWriteBarrier(); +# else + __asm__ __volatile__("" : : : "memory"); +# endif + + return result; +#else + auto now = std::chrono::high_resolution_clock::now(); + return std::chrono::duration_cast( + now.time_since_epoch()) + .count(); +#endif +} + +#endif // RDTSC_H diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 16c076cf..c7311981 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -18,8 +18,9 @@ using namespace ipc; namespace { -constexpr int LoopCount = 10000; -constexpr int MultiMax = 8; +constexpr int LoopCount = 10000; +constexpr int MultiMax = 8; +constexpr int TestBuffMax = 65536; struct msg_head { int id_; @@ -28,7 +29,7 @@ struct msg_head { class rand_buf : public buffer { public: rand_buf() { - int size = capo::random<>{sizeof(msg_head), 65536}(); + int size = capo::random<>{(int)sizeof(msg_head), TestBuffMax}(); *this = buffer(new char[size], size, [](void * p, std::size_t) { delete [] static_cast(p); }); @@ -109,10 +110,10 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { for (int k = 0; k < s_cnt; ++k) { ipc_ut::sender() << [name, &sw, r_cnt, k] { Que que { name, ipc::sender }; - EXPECT_TRUE(que.wait_for_recv(r_cnt)); + ASSERT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); for (int i = 0; i < (int)data_set__.get().size(); ++i) { - EXPECT_TRUE(que.send(data_set__.get()[i])); + ASSERT_TRUE(que.send(data_set__.get()[i])); } }; } @@ -132,7 +133,7 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { if (data_set != got) { printf("data_set__.get()[%d] != got, size = %zd/%zd\n", i, data_set.size(), got.size()); - EXPECT_TRUE(false); + ASSERT_TRUE(false); } } }; @@ -140,7 +141,7 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().wait_for_done(); Que que { name }; - EXPECT_TRUE(que.wait_for_recv(r_cnt)); + ASSERT_TRUE(que.wait_for_recv(r_cnt)); for (int k = 0; k < r_cnt; ++k) { que.send(rand_buf{msg_head{-1}}); } diff --git a/test/test_pthread.cpp b/test/test_pthread.cpp deleted file mode 100755 index 36d0acf8..00000000 --- a/test/test_pthread.cpp +++ /dev/null @@ -1,49 +0,0 @@ - -#include -#include - -#include "test.h" - -#if defined(__linux__) || defined(__linux) -#include -#include - -TEST(PThread, Robust) { - pthread_mutexattr_t ma; - pthread_mutexattr_init(&ma); - pthread_mutexattr_setrobust(&ma, PTHREAD_MUTEX_ROBUST); - pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; - pthread_mutex_init(&mutex, &ma); - - std::thread{[&mutex] { - pthread_mutex_lock(&mutex); - // pthread_mutex_unlock(&mutex); - }}.join(); - - struct timespec tout; - clock_gettime(CLOCK_REALTIME, &tout); - int r = pthread_mutex_timedlock(&mutex, &tout); - EXPECT_EQ(r, EOWNERDEAD); - - pthread_mutex_consistent(&mutex); - pthread_mutex_unlock(&mutex); - pthread_mutex_destroy(&mutex); -} -#elif defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ - defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) -#include -#include - -TEST(PThread, Robust) { - HANDLE lock = CreateMutex(NULL, FALSE, _T("test-robust")); - std::thread{[] { - HANDLE lock = CreateMutex(NULL, FALSE, _T("test-robust")); - WaitForSingleObject(lock, 0); - }}.join(); - - DWORD r = WaitForSingleObject(lock, 0); - EXPECT_EQ(r, WAIT_ABANDONED); - - CloseHandle(lock); -} -#endif // !__linux__ \ No newline at end of file diff --git a/test/test_queue.cpp b/test/test_queue.cpp index a59b3a73..e85d39fa 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -103,7 +103,7 @@ void test_sr(elems_t && elems, int s_cnt, int r_cnt, char const * me queue_t que { &elems }; ASSERT_TRUE(que.connect()); while (pop(que).pid_ >= 0) ; - EXPECT_TRUE(que.disconnect()); + ASSERT_TRUE(que.disconnect()); }; } @@ -133,7 +133,7 @@ TEST(Queue, el_connection) { elems_t el; EXPECT_TRUE(el.connect_sender()); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_FALSE(el.connect_sender()); + ASSERT_FALSE(el.connect_sender()); } el.disconnect_sender(); EXPECT_TRUE(el.connect_sender()); @@ -141,7 +141,7 @@ TEST(Queue, el_connection) { { elems_t el; for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(el.connect_sender()); + ASSERT_TRUE(el.connect_sender()); } } { @@ -149,7 +149,7 @@ TEST(Queue, el_connection) { auto cc = el.connect_receiver(); EXPECT_NE(cc, 0); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_EQ(el.connect_receiver(), 0); + ASSERT_EQ(el.connect_receiver(), 0); } EXPECT_EQ(el.disconnect_receiver(cc), 0); EXPECT_EQ(el.connect_receiver(), cc); @@ -157,10 +157,10 @@ TEST(Queue, el_connection) { { elems_t el; for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { - EXPECT_NE(el.connect_receiver(), 0); + ASSERT_NE(el.connect_receiver(), 0); } for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_EQ(el.connect_receiver(), 0); + ASSERT_EQ(el.connect_receiver(), 0); } } } @@ -171,11 +171,11 @@ TEST(Queue, connection) { queue_t que{&el}; // sending for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.ready_sending()); + ASSERT_FALSE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { que.shut_sending(); @@ -186,15 +186,15 @@ TEST(Queue, connection) { } // receiving for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } EXPECT_TRUE(que.disconnect()); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_FALSE(que.disconnect()); + ASSERT_FALSE(que.disconnect()); } { queue_t que{&el}; @@ -202,7 +202,7 @@ TEST(Queue, connection) { } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } } { @@ -210,42 +210,42 @@ TEST(Queue, connection) { queue_t que{&el}; // sending for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { que.shut_sending(); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } // receiving for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { queue_t que{&el}; - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } - EXPECT_TRUE(que.disconnect()); + ASSERT_TRUE(que.disconnect()); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_FALSE(que.disconnect()); + ASSERT_FALSE(que.disconnect()); } { queue_t que{&el}; - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } } } diff --git a/test/test_sync.cpp b/test/test_sync.cpp new file mode 100644 index 00000000..84ace7b4 --- /dev/null +++ b/test/test_sync.cpp @@ -0,0 +1,195 @@ + +#include +#include +#include +#include +#include +#include +#include + +#include "test.h" + +#include "libipc/platform/detail.h" +#if defined(IPC_OS_LINUX_) +#include +#include + +TEST(PThread, Robust) { + pthread_mutexattr_t ma; + pthread_mutexattr_init(&ma); + pthread_mutexattr_setpshared(&ma, PTHREAD_PROCESS_SHARED); + pthread_mutexattr_setrobust(&ma, PTHREAD_MUTEX_ROBUST); + pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_init(&mutex, &ma); + + std::thread{[&mutex] { + pthread_mutex_lock(&mutex); + // pthread_mutex_unlock(&mutex); + }}.join(); + + struct timespec tout; + clock_gettime(CLOCK_REALTIME, &tout); + int r = pthread_mutex_timedlock(&mutex, &tout); + EXPECT_EQ(r, EOWNERDEAD); + + pthread_mutex_consistent(&mutex); + pthread_mutex_unlock(&mutex); + pthread_mutex_destroy(&mutex); +} +#elif defined(IPC_OS_WINDOWS_) +#include +#include + +TEST(PThread, Robust) { + HANDLE lock = CreateMutex(NULL, FALSE, _T("test-robust")); + std::thread{[] { + HANDLE lock = CreateMutex(NULL, FALSE, _T("test-robust")); + WaitForSingleObject(lock, 0); + }}.join(); + + DWORD r = WaitForSingleObject(lock, 0); + EXPECT_EQ(r, WAIT_ABANDONED); + + CloseHandle(lock); +} +#endif // OS + +#include "libipc/mutex.h" + +TEST(Sync, Mutex) { + ipc::sync::mutex lock; + EXPECT_TRUE(lock.open("test-mutex-robust")); + std::thread{[] { + ipc::sync::mutex lock {"test-mutex-robust"}; + EXPECT_TRUE(lock.valid()); + EXPECT_TRUE(lock.lock()); + }}.join(); + + EXPECT_THROW(lock.try_lock(), std::system_error); + + int i = 0; + EXPECT_TRUE(lock.lock()); + i = 100; + auto t2 = std::thread{[&i] { + ipc::sync::mutex lock {"test-mutex-robust"}; + EXPECT_TRUE(lock.valid()); + EXPECT_FALSE(lock.try_lock()); + EXPECT_TRUE(lock.lock()); + i += i; + EXPECT_TRUE(lock.unlock()); + }}; + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_EQ(i, 100); + EXPECT_TRUE(lock.unlock()); + t2.join(); + EXPECT_EQ(i, 200); +} + +#include "libipc/semaphore.h" + +TEST(Sync, Semaphore) { + ipc::sync::semaphore sem; + EXPECT_TRUE(sem.open("test-sem")); + std::thread{[] { + ipc::sync::semaphore sem {"test-sem"}; + EXPECT_TRUE(sem.post(1000)); + }}.join(); + + for (int i = 0; i < 1000; ++i) { + EXPECT_TRUE(sem.wait(0)); + } + EXPECT_FALSE(sem.wait(0)); +} + +#include "libipc/condition.h" + +TEST(Sync, Condition) { + ipc::sync::condition cond; + EXPECT_TRUE(cond.open("test-cond")); + ipc::sync::mutex lock; + EXPECT_TRUE(lock.open("test-mutex")); + std::deque que; + + auto job = [&que](int num) { + ipc::sync::condition cond {"test-cond"}; + ipc::sync::mutex lock {"test-mutex"}; + for (;;) { + int val = 0; + { + std::lock_guard guard {lock}; + while (que.empty()) { + ASSERT_TRUE(cond.wait(lock)); + } + val = que.front(); + que.pop_front(); + } + if (val == 0) { + std::printf("test-cond-%d: exit.\n", num); + return; + } + std::printf("test-cond-%d: %d\n", num, val); + } + }; + std::array test_conds; + for (int i = 0; i < (int)test_conds.size(); ++i) { + test_conds[i] = std::thread{job, i}; + } + + for (int i = 1; i < 100; ++i) { + { + std::lock_guard guard {lock}; + que.push_back(i); + } + cond.notify(); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + for (int i = 1; i < 100; ++i) { + { + std::lock_guard guard {lock}; + que.push_back(i); + } + cond.broadcast(); + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + { + std::lock_guard guard {lock}; + for (int i = 0; i < (int)test_conds.size(); ++i) { + que.push_back(0); + } + } + cond.broadcast(); + + for (auto &t : test_conds) t.join(); +} + +/** + * https://stackoverflow.com/questions/51730660/is-this-a-bug-in-glibc-pthread +*/ +TEST(Sync, ConditionRobust) { + printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 1\n"); + ipc::sync::condition cond {"test-cond"}; + printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 2\n"); + ipc::sync::mutex lock {"test-mutex"}; + printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 3\n"); + lock.lock(); + std::thread unlock {[] { + printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 1\n"); + ipc::sync::condition cond {"test-cond"}; + printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 2\n"); + ipc::sync::mutex lock {"test-mutex"}; + printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 3\n"); + { + std::lock_guard guard {lock}; + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 4\n"); + cond.broadcast(); + printf("WWWWWWWWWWWWWWWWWWWWWWWWWWWWWW 5\n"); + }}; + printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 4\n"); + cond.wait(lock); + printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 5\n"); + lock.unlock(); + printf("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX 6\n"); + unlock.join(); +} \ No newline at end of file diff --git a/test/test_waiter.cpp b/test/test_waiter.cpp index 8230ed53..727cca0c 100755 --- a/test/test_waiter.cpp +++ b/test/test_waiter.cpp @@ -1,33 +1,68 @@ #include #include -#include "libipc/platform/waiter_wrapper.h" +#include "libipc/waiter.h" #include "test.h" namespace { TEST(Waiter, broadcast) { - ipc::detail::waiter w; - std::thread ts[10]; - - for (auto& t : ts) { - t = std::thread([&w] { - ipc::detail::waiter_wrapper wp { &w }; - EXPECT_TRUE(wp.open("test-ipc-waiter")); - EXPECT_TRUE(wp.wait_if([] { return true; })); - wp.close(); - }); + for (int i = 0; i < 10; ++i) { + ipc::detail::waiter waiter; + std::thread ts[10]; + + int k = 0; + for (auto& t : ts) { + t = std::thread([&k] { + ipc::detail::waiter waiter {"test-ipc-waiter"}; + EXPECT_TRUE(waiter.valid()); + for (int i = 0; i < 9; ++i) { + while (!waiter.wait_if([&k, &i] { return k == i; })) ; + } + }); + } + + EXPECT_TRUE(waiter.open("test-ipc-waiter")); + std::cout << "waiting for broadcast...\n"; + for (k = 1; k < 10; ++k) { + std::cout << "broadcast: " << k << "\n"; + ASSERT_TRUE(waiter.broadcast()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + for (auto& t : ts) t.join(); + std::cout << "quit... " << i << "\n"; } +} + +TEST(Waiter, quit_waiting) { + ipc::detail::waiter waiter; + EXPECT_TRUE(waiter.open("test-ipc-waiter")); + + std::thread t1 { + [&waiter] { + EXPECT_TRUE(waiter.wait_if([] { return true; })); + } + }; - ipc::detail::waiter_wrapper wp { &w }; - EXPECT_TRUE(wp.open("test-ipc-waiter")); + bool quit = false; + std::thread t2 { + [&quit] { + ipc::detail::waiter waiter {"test-ipc-waiter"}; + EXPECT_TRUE(waiter.wait_if([&quit] { return !quit; })); + } + }; - std::cout << "waiting for broadcast...\n"; - std::this_thread::sleep_for(std::chrono::seconds(1)); - EXPECT_TRUE(wp.broadcast()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_TRUE(waiter.quit_waiting()); + t1.join(); + ASSERT_TRUE(t2.joinable()); - for (auto& t : ts) t.join(); - wp.close(); + EXPECT_TRUE(waiter.open("test-ipc-waiter")); + std::cout << "nofify quit...\n"; + quit = true; + EXPECT_TRUE(waiter.notify()); + t2.join(); + std::cout << "quit... \n"; } } // internal-linkage