diff --git a/.githooks/pre-commit b/.githooks/pre-commit index 888ec379..5fac140b 100755 --- a/.githooks/pre-commit +++ b/.githooks/pre-commit @@ -34,6 +34,12 @@ done # template_contents=$(cat '.githooks/readme-template.md') cp .githooks/readme-template.md README.md +# Disable patsub_replacement +# +# If the patsub_replacement shell option is enabled using shopt, +# any unquoted instances of ‘&’ in string are replaced with the matching portion of pattern +shopt -u patsub_replacement + template_contents=$(cat 'README.md') example_contents=$(cat 'examples/coro_task.cpp') echo "${template_contents/\$\{EXAMPLE_CORO_TASK_CPP\}/$example_contents}" > README.md @@ -70,6 +76,10 @@ template_contents=$(cat 'README.md') example_contents=$(cat 'examples/coro_ring_buffer.cpp') echo "${template_contents/\$\{EXAMPLE_CORO_RING_BUFFER_CPP\}/$example_contents}" > README.md +template_contents=$(cat 'README.md') +example_contents=$(cat 'examples/coro_queue.cpp') +echo "${template_contents/\$\{EXAMPLE_CORO_QUEUE_CPP\}/$example_contents}" > README.md + template_contents=$(cat 'README.md') example_contents=$(cat 'examples/coro_shared_mutex.cpp') echo "${template_contents/\$\{EXAMPLE_CORO_SHARED_MUTEX_CPP\}/$example_contents}" > README.md diff --git a/.githooks/readme-template.md b/.githooks/readme-template.md index 8a2bb39a..ac1f086f 100644 --- a/.githooks/readme-template.md +++ b/.githooks/readme-template.md @@ -25,6 +25,7 @@ - [coro::shared_mutex](#shared_mutex) - [coro::semaphore](#semaphore) - [coro::ring_buffer](#ring_buffer) + - [coro::queue](#queue) * Schedulers - [coro::thread_pool](#thread_pool) for coroutine cooperative multitasking - [coro::io_scheduler](#io_scheduler) for driving i/o events @@ -266,6 +267,43 @@ consumer 2 shutting down, stop signal received consumer 3 shutting down, stop signal received ``` +### queue +The `coro::queue` is thread safe async multi-producer multi-consumer queue. Producing into the queue is not an asynchronous operation, it will either immediately use a consumer that is awaiting on `pop()` to process the element, or if no consumer is available place the element into the queue. All consume waiters on the queue are resumed in a LIFO manner when an element becomes available to consume. + +```C++ +${EXAMPLE_CORO_QUEUE_CPP} +``` + +Expected output: +```bash +$ ./examples/coro_queue +consumed 0 +consumed 1 +consumed 0 +consumed 2 +consumed 3 +consumed 4 +consumed 1 +consumed 0 +consumed 0 +consumed 0 +consumed 1 +consumed 1 +consumed 2 +consumed 2 +consumed 3 +consumed 4 +consumed 3 +consumed 4 +consumed 2 +consumed 3 +consumed 4 +consumed 1 +consumed 2 +consumed 3 +consumed 4 +``` + ### thread_pool `coro::thread_pool` is a statically sized pool of worker threads to execute scheduled coroutines from a FIFO queue. One way to schedule a coroutine on a thread pool is to use the pool's `schedule()` function which should be `co_awaited` inside the coroutine to transfer the execution from the current thread to a thread pool worker thread. Its important to note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available thread in the pool, e.g. there could be a delay if there is a lot of work queued up. @@ -318,6 +356,7 @@ The `coro::io_scheduler` can use a dedicated spawned thread for processing event * `coro::io_scheduler::schedule()` Use `co_await` on this method inside a coroutine to transfer the tasks execution to the `coro::io_scheduler`. * `coro::io_scheduler::spawn(coro::task)` Spawns the task to be detached and owned by the `coro::io_scheduler`, use this if you want to fire and forget the task, the `coro::io_scheduler` will maintain the task's lifetime. * `coro::io_scheduler::schedule(coro::task task) -> coro::task` schedules the task on the `coro::io_scheduler` and then returns the result in a task that must be awaited. This is useful if you want to schedule work on the `coro::io_scheduler` and want to wait for the result. +* `coro::io_scheduler::schedule(std::stop_source st, coro::task task, std::chrono::duration timeout) -> coro::expected` schedules the task on the `coro::io_scheduler` and then returns the result in a task that must be awaited. That task will then either return the completed task's value if it completes before the timeout, or a return value denoted the task timed out. If the task times out the `std::stop_source.request_stop()` will be invoked so the task can check for it and stop executing. This must be done by the user, the `coro::io_scheduler` cannot stop the execution of the task but it is able through the `std::stop_source` to signal to the task it should stop executing. * `coro::io_scheduler::scheduler_after(std::chrono::milliseconds amount)` schedules the current task to be rescheduled after a specified amount of time has passed. * `coro::io_scheduler::schedule_at(std::chrono::steady_clock::time_point time)` schedules the current task to be rescheduled at the specified timepoint. * `coro::io_scheduler::yield()` will yield execution of the current task and resume after other tasks have had a chance to execute. This effectively places the task at the back of the queue of waiting tasks. diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml index 003b1c7b..84ac7719 100644 --- a/.github/workflows/ci-macos.yml +++ b/.github/workflows/ci-macos.yml @@ -1,45 +1,46 @@ -# name: ci-macos +name: ci-macos -# on: [pull_request, workflow_dispatch] +on: [pull_request, workflow_dispatch] -# jobs: -# macos: -# name: macos-12 -# runs-on: macos-12 -# strategy: -# fail-fast: false -# matrix: -# clang_version: [17] -# cxx_standard: [20, 23] -# libcoro_feature_networking: [ {enabled: OFF, tls: OFF} ] -# libcoro_build_shared_libs: [OFF, ON] -# steps: -# - name: Install Dependencies -# run: | -# brew update -# brew install llvm@${{ matrix.clang_version }} -# brew install ninja -# - name: Checkout -# uses: actions/checkout@v4 -# with: -# submodules: recursive -# - name: Release -# run: | -# brew --prefix llvm@17 -# mkdir Release -# cd Release -# cmake \ -# -GNinja \ -# -DCMAKE_BUILD_TYPE=Release \ -# -DCMAKE_C_COMPILER=$(brew --prefix llvm@${{ matrix.clang_version }})/bin/clang-${{ matrix.clang_version }} \ -# -DCMAKE_CXX_COMPILER=$(brew --prefix llvm@${{ matrix.clang_version }})/bin/clang-${{ matrix.clang_version }} \ -# -DCMAKE_CXX_STANDARD=${{ matrix.cxx_standard }} \ -# -DLIBCORO_FEATURE_NETWORKING=${{ matrix.libcoro_feature_networking.enabled }} \ -# -DLIBCORO_FEATURE_TLS=${{ matrix.libcoro_feature_networking.tls }} \ -# -DLIBCORO_BUILD_SHARED_LIBS=${{ matrix.libcoro_build_shared_libs }} \ -# .. -# cmake --build . --config Release -# - name: Test -# run: | -# cd Release -# ctest --build-config Release -VV +jobs: + macos: + name: macos-15 + runs-on: macos-15 + strategy: + fail-fast: false + matrix: + clang_version: [20] + cxx_standard: [20, 23] + libcoro_feature_networking: [{ enabled: OFF, tls: OFF }] + libcoro_build_shared_libs: [OFF, ON] + steps: + - name: Install Dependencies + run: | + brew update + brew install llvm@${{ matrix.clang_version }} + brew install ninja + - name: Checkout + uses: actions/checkout@v4 + with: + submodules: recursive + - name: Release + run: | + brew --prefix llvm@${{ matrix.clang_version }} + ls $(brew --prefix llvm@${{ matrix.clang_version }})/bin + mkdir Release + cd Release + cmake \ + -GNinja \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_C_COMPILER=$(brew --prefix llvm@${{ matrix.clang_version }})/bin/clang-${{ matrix.clang_version }} \ + -DCMAKE_CXX_COMPILER=$(brew --prefix llvm@${{ matrix.clang_version }})/bin/clang-${{ matrix.clang_version }} \ + -DCMAKE_CXX_STANDARD=${{ matrix.cxx_standard }} \ + -DLIBCORO_FEATURE_NETWORKING=${{ matrix.libcoro_feature_networking.enabled }} \ + -DLIBCORO_FEATURE_TLS=${{ matrix.libcoro_feature_networking.tls }} \ + -DLIBCORO_BUILD_SHARED_LIBS=${{ matrix.libcoro_build_shared_libs }} \ + .. + cmake --build . --config Release + - name: Test + run: | + cd Release + ctest --build-config Release -VV diff --git a/CMakeLists.txt b/CMakeLists.txt index bdf9df8c..343c6c01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,6 +21,10 @@ if (NOT "$ENV{version}" STREQUAL "") set(PROJECT_VERSION "$ENV{version}" CACHE INTERNAL "Copied from environment variable") endif() +option(LIBCORO_ENABLE_ASAN "Build with address sanitizer") +option(LIBCORO_ENABLE_MSAN "Build with memory sanitizer") +option(LIBCORO_ENABLE_TSAN "Build with thread sanitizer") +option(LIBCORO_ENABLE_USAN "Build with undefined sanitizer") option(LIBCORO_EXTERNAL_DEPENDENCIES "Use Cmake find_package to resolve dependencies instead of embedded libraries, Default=OFF." OFF) option(LIBCORO_BUILD_TESTS "Build the tests, Default=ON." ON) option(LIBCORO_CODE_COVERAGE "Enable code coverage, tests must also be enabled, Default=OFF" OFF) @@ -40,6 +44,10 @@ endif() cmake_dependent_option(LIBCORO_FEATURE_NETWORKING "Include networking features, Default=ON." ON "NOT EMSCRIPTEN; NOT MSVC" OFF) cmake_dependent_option(LIBCORO_FEATURE_TLS "Include TLS encryption features, Default=ON." ON "NOT EMSCRIPTEN; NOT MSVC" OFF) +message("${PROJECT_NAME} LIBCORO_ENABLE_ASAN = ${LIBCORO_ENABLE_ASAN}") +message("${PROJECT_NAME} LIBCORO_ENABLE_MSAN = ${LIBCORO_ENABLE_MSAN}") +message("${PROJECT_NAME} LIBCORO_ENABLE_TSAN = ${LIBCORO_ENABLE_TSAN}") +message("${PROJECT_NAME} LIBCORO_ENABLE_USAN = ${LIBCORO_ENABLE_USAN}") message("${PROJECT_NAME} LIBCORO_EXTERNAL_DEPENDENCIES = ${LIBCORO_EXTERNAL_DEPENDENCIES}") message("${PROJECT_NAME} LIBCORO_BUILD_TESTS = ${LIBCORO_BUILD_TESTS}") message("${PROJECT_NAME} LIBCORO_CODE_COVERAGE = ${LIBCORO_CODE_COVERAGE}") @@ -82,9 +90,11 @@ set(LIBCORO_SOURCE_FILES include/coro/attribute.hpp include/coro/coro.hpp include/coro/event.hpp src/event.cpp + include/coro/default_executor.hpp src/default_executor.cpp include/coro/generator.hpp include/coro/latch.hpp include/coro/mutex.hpp src/mutex.cpp + include/coro/queue.hpp include/coro/ring_buffer.hpp include/coro/semaphore.hpp src/semaphore.cpp include/coro/shared_mutex.hpp @@ -145,6 +155,27 @@ endif() add_library(${PROJECT_NAME} ${LIBCORO_SOURCE_FILES}) set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX PREFIX "" VERSION ${PROJECT_VERSION} SOVERSION ${PROJECT_VERSION_MAJOR}) + +if(LIBCORO_ENABLE_ASAN) + add_compile_options(-g -O0 -fno-omit-frame-pointer -fsanitize=address) + add_link_options( -fsanitize=address) +endif() + +if(LIBCORO_ENABLE_MSAN) + add_compile_options(-g -O0 -fno-omit-frame-pointer -fsanitize=memory) + add_link_options( -fsanitize=memory) +endif() + +if(LIBCORO_ENABLE_TSAN) + add_compile_options(-g -O0 -fno-omit-frame-pointer -fsanitize=thread) + add_link_options(-fsanitize=thread) +endif() + +if(LIBCORO_ENABLE_USAN) + add_compile_options(-g -O0 -fno-omit-frame-pointer -fsanitize=undefined) + add_link_options(-fsanitize=undefined) +endif() + target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_20) target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/include) generate_export_header(${PROJECT_NAME} BASE_NAME CORO EXPORT_FILE_NAME include/coro/export.hpp) diff --git a/README.md b/README.md index 3d988823..ebb9364f 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ - [coro::shared_mutex](#shared_mutex) - [coro::semaphore](#semaphore) - [coro::ring_buffer](#ring_buffer) + - [coro::queue](#queue) * Schedulers - [coro::thread_pool](#thread_pool) for coroutine cooperative multitasking - [coro::io_scheduler](#io_scheduler) for driving i/o events @@ -453,7 +454,7 @@ int main() // This task does 'work' and counts down on the latch when completed. The final child task to // complete will end up resuming the latch task when the latch's count reaches zero. - auto make_worker_task = [](std::shared_ptr& tp, coro::latch& l, int64_t i) -> coro::task + auto make_worker_task = [](std::shared_ptr tp, coro::latch& l, int64_t i) -> coro::task { // Schedule the worker task onto the thread pool. co_await tp->schedule(); @@ -525,7 +526,7 @@ int main() // lock() function returns a coro::scoped_lock that holds the mutex and automatically // unlocks the mutex upon destruction. This behaves just like std::scoped_lock. { - auto scoped_lock = co_await mutex.lock(); + auto scoped_lock = co_await mutex.scoped_lock(); output.emplace_back(i); } // <-- scoped lock unlocks the mutex here. co_return; @@ -741,7 +742,7 @@ int main() // Now that the ring buffer is empty signal to all the consumers its time to stop. Note that // the stop signal works on producers as well, but this example only uses 1 producer. { - auto scoped_lock = co_await m.lock(); + auto scoped_lock = co_await m.scoped_lock(); std::cerr << "\nproducer is sending stop signal"; } rb.notify_waiters(); @@ -756,7 +757,7 @@ int main() while (true) { auto expected = co_await rb.consume(); - auto scoped_lock = co_await m.lock(); // just for synchronizing std::cout/cerr + auto scoped_lock = co_await m.scoped_lock(); // just for synchronizing std::cout/cerr if (!expected) { std::cerr << "\nconsumer " << id << " shutting down, stop signal received"; @@ -799,6 +800,111 @@ consumer 2 shutting down, stop signal received consumer 3 shutting down, stop signal received ``` +### queue +The `coro::queue` is thread safe async multi-producer multi-consumer queue. Producing into the queue is not an asynchronous operation, it will either immediately use a consumer that is awaiting on `pop()` to process the element, or if no consumer is available place the element into the queue. All consume waiters on the queue are resumed in a LIFO manner when an element becomes available to consume. + +```C++ +#include +#include + +int main() +{ + const size_t iterations = 5; + const size_t producers_count = 5; + const size_t consumers_count = 2; + + coro::thread_pool tp{}; + coro::queue q{}; + coro::latch producers_done{producers_count}; + coro::mutex m{}; /// Just for making the console prints look nice. + + auto make_producer_task = + [iterations](coro::thread_pool& tp, coro::queue& q, coro::latch& pd) -> coro::task + { + co_await tp.schedule(); + + for (size_t i = 0; i < iterations; ++i) + { + co_await q.push(i); + } + + pd.count_down(); // Notify the shutdown task this producer is complete. + co_return; + }; + + auto make_shutdown_task = [](coro::thread_pool& tp, coro::queue& q, coro::latch& pd) -> coro::task + { + // This task will wait for all the producers to complete and then for the + // entire queue to be drained before shutting it down. + co_await tp.schedule(); + co_await pd; + co_await q.shutdown_notify_waiters_drain(tp); + co_return; + }; + + auto make_consumer_task = [](coro::thread_pool& tp, coro::queue& q, coro::mutex& m) -> coro::task + { + co_await tp.schedule(); + + while (true) + { + auto expected = co_await q.pop(); + if (!expected) + { + break; // coro::queue is shutting down + } + + auto scoped_lock = co_await m.scoped_lock(); // Only used to make the output look nice. + std::cout << "consumed " << *expected << "\n"; + } + }; + + std::vector> tasks{}; + + for (size_t i = 0; i < producers_count; ++i) + { + tasks.push_back(make_producer_task(tp, q, producers_done)); + } + for (size_t i = 0; i < consumers_count; ++i) + { + tasks.push_back(make_consumer_task(tp, q, m)); + } + tasks.push_back(make_shutdown_task(tp, q, producers_done)); + + coro::sync_wait(coro::when_all(std::move(tasks))); +} +``` + +Expected output: +```bash +$ ./examples/coro_queue +consumed 0 +consumed 1 +consumed 0 +consumed 2 +consumed 3 +consumed 4 +consumed 1 +consumed 0 +consumed 0 +consumed 0 +consumed 1 +consumed 1 +consumed 2 +consumed 2 +consumed 3 +consumed 4 +consumed 3 +consumed 4 +consumed 2 +consumed 3 +consumed 4 +consumed 1 +consumed 2 +consumed 3 +consumed 4 +``` + ### thread_pool `coro::thread_pool` is a statically sized pool of worker threads to execute scheduled coroutines from a FIFO queue. One way to schedule a coroutine on a thread pool is to use the pool's `schedule()` function which should be `co_awaited` inside the coroutine to transfer the execution from the current thread to a thread pool worker thread. Its important to note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available thread in the pool, e.g. there could be a delay if there is a lot of work queued up. @@ -928,6 +1034,7 @@ The `coro::io_scheduler` can use a dedicated spawned thread for processing event * `coro::io_scheduler::schedule()` Use `co_await` on this method inside a coroutine to transfer the tasks execution to the `coro::io_scheduler`. * `coro::io_scheduler::spawn(coro::task)` Spawns the task to be detached and owned by the `coro::io_scheduler`, use this if you want to fire and forget the task, the `coro::io_scheduler` will maintain the task's lifetime. * `coro::io_scheduler::schedule(coro::task task) -> coro::task` schedules the task on the `coro::io_scheduler` and then returns the result in a task that must be awaited. This is useful if you want to schedule work on the `coro::io_scheduler` and want to wait for the result. +* `coro::io_scheduler::schedule(std::stop_source st, coro::task task, std::chrono::duration timeout) -> coro::expected` schedules the task on the `coro::io_scheduler` and then returns the result in a task that must be awaited. That task will then either return the completed task's value if it completes before the timeout, or a return value denoted the task timed out. If the task times out the `std::stop_source.request_stop()` will be invoked so the task can check for it and stop executing. This must be done by the user, the `coro::io_scheduler` cannot stop the execution of the task but it is able through the `std::stop_source` to signal to the task it should stop executing. * `coro::io_scheduler::scheduler_after(std::chrono::milliseconds amount)` schedules the current task to be rescheduled after a specified amount of time has passed. * `coro::io_scheduler::schedule_at(std::chrono::steady_clock::time_point time)` schedules the current task to be rescheduled at the specified timepoint. * `coro::io_scheduler::yield()` will yield execution of the current task and resume after other tasks have had a chance to execute. This effectively places the task at the back of the queue of waiting tasks. diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 03502791..7b3b6ec3 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -49,6 +49,10 @@ add_executable(coro_shared_mutex coro_shared_mutex.cpp) target_link_libraries(coro_shared_mutex PUBLIC libcoro) target_compile_options(coro_shared_mutex PUBLIC ${LIBCORO_EXAMPLE_OPTIONS}) +add_executable(coro_queue coro_queue.cpp) +target_link_libraries(coro_queue PUBLIC libcoro) +target_compile_options(coro_queue PUBLIC ${LIBCORO_EXAMPLE_OPTIONS}) + add_executable(coro_sync_wait coro_sync_wait.cpp) target_link_libraries(coro_sync_wait PUBLIC libcoro) target_compile_options(coro_sync_wait PUBLIC ${LIBCORO_EXAMPLE_OPTIONS}) diff --git a/examples/coro_latch.cpp b/examples/coro_latch.cpp index 75777ccc..d7fe7724 100644 --- a/examples/coro_latch.cpp +++ b/examples/coro_latch.cpp @@ -27,7 +27,7 @@ int main() // This task does 'work' and counts down on the latch when completed. The final child task to // complete will end up resuming the latch task when the latch's count reaches zero. - auto make_worker_task = [](std::shared_ptr& tp, coro::latch& l, int64_t i) -> coro::task + auto make_worker_task = [](std::shared_ptr tp, coro::latch& l, int64_t i) -> coro::task { // Schedule the worker task onto the thread pool. co_await tp->schedule(); diff --git a/examples/coro_mutex.cpp b/examples/coro_mutex.cpp index 80283c70..b3fd7745 100644 --- a/examples/coro_mutex.cpp +++ b/examples/coro_mutex.cpp @@ -15,7 +15,7 @@ int main() // lock() function returns a coro::scoped_lock that holds the mutex and automatically // unlocks the mutex upon destruction. This behaves just like std::scoped_lock. { - auto scoped_lock = co_await mutex.lock(); + auto scoped_lock = co_await mutex.scoped_lock(); output.emplace_back(i); } // <-- scoped lock unlocks the mutex here. co_return; diff --git a/examples/coro_queue.cpp b/examples/coro_queue.cpp new file mode 100644 index 00000000..c799008d --- /dev/null +++ b/examples/coro_queue.cpp @@ -0,0 +1,69 @@ +#include +#include + +int main() +{ + const size_t iterations = 5; + const size_t producers_count = 5; + const size_t consumers_count = 2; + + coro::thread_pool tp{}; + coro::queue q{}; + coro::latch producers_done{producers_count}; + coro::mutex m{}; /// Just for making the console prints look nice. + + auto make_producer_task = + [iterations](coro::thread_pool& tp, coro::queue& q, coro::latch& pd) -> coro::task + { + co_await tp.schedule(); + + for (size_t i = 0; i < iterations; ++i) + { + co_await q.push(i); + } + + pd.count_down(); // Notify the shutdown task this producer is complete. + co_return; + }; + + auto make_shutdown_task = [](coro::thread_pool& tp, coro::queue& q, coro::latch& pd) -> coro::task + { + // This task will wait for all the producers to complete and then for the + // entire queue to be drained before shutting it down. + co_await tp.schedule(); + co_await pd; + co_await q.shutdown_notify_waiters_drain(tp); + co_return; + }; + + auto make_consumer_task = [](coro::thread_pool& tp, coro::queue& q, coro::mutex& m) -> coro::task + { + co_await tp.schedule(); + + while (true) + { + auto expected = co_await q.pop(); + if (!expected) + { + break; // coro::queue is shutting down + } + + auto scoped_lock = co_await m.scoped_lock(); // Only used to make the output look nice. + std::cout << "consumed " << *expected << "\n"; + } + }; + + std::vector> tasks{}; + + for (size_t i = 0; i < producers_count; ++i) + { + tasks.push_back(make_producer_task(tp, q, producers_done)); + } + for (size_t i = 0; i < consumers_count; ++i) + { + tasks.push_back(make_consumer_task(tp, q, m)); + } + tasks.push_back(make_shutdown_task(tp, q, producers_done)); + + coro::sync_wait(coro::when_all(std::move(tasks))); +} diff --git a/examples/coro_ring_buffer.cpp b/examples/coro_ring_buffer.cpp index 7957ba52..9cb4e593 100644 --- a/examples/coro_ring_buffer.cpp +++ b/examples/coro_ring_buffer.cpp @@ -30,7 +30,7 @@ int main() // Now that the ring buffer is empty signal to all the consumers its time to stop. Note that // the stop signal works on producers as well, but this example only uses 1 producer. { - auto scoped_lock = co_await m.lock(); + auto scoped_lock = co_await m.scoped_lock(); std::cerr << "\nproducer is sending stop signal"; } rb.notify_waiters(); @@ -45,7 +45,7 @@ int main() while (true) { auto expected = co_await rb.consume(); - auto scoped_lock = co_await m.lock(); // just for synchronizing std::cout/cerr + auto scoped_lock = co_await m.scoped_lock(); // just for synchronizing std::cout/cerr if (!expected) { std::cerr << "\nconsumer " << id << " shutting down, stop signal received"; diff --git a/include/coro/coro.hpp b/include/coro/coro.hpp index 82dc75d1..4b554a7d 100644 --- a/include/coro/coro.hpp +++ b/include/coro/coro.hpp @@ -30,9 +30,11 @@ #endif #include "coro/event.hpp" +#include "coro/default_executor.hpp" #include "coro/generator.hpp" #include "coro/latch.hpp" #include "coro/mutex.hpp" +#include "coro/queue.hpp" #include "coro/ring_buffer.hpp" #include "coro/semaphore.hpp" #include "coro/shared_mutex.hpp" diff --git a/include/coro/default_executor.hpp b/include/coro/default_executor.hpp new file mode 100644 index 00000000..b271acad --- /dev/null +++ b/include/coro/default_executor.hpp @@ -0,0 +1,55 @@ +#pragma once + +#ifdef LIBCORO_FEATURE_NETWORKING + #include "coro/io_scheduler.hpp" +#else + #include "coro/thread_pool.hpp" +#endif + +namespace coro::default_executor +{ + +/** + * Set up default coro::thread_pool::options before constructing a single instance of coro::thread_pool in + * coro::default_executor::executor() + * @param thread_pool_options thread_pool options + */ +void set_executor_options(thread_pool::options thread_pool_options); + +/** + * Get default coro::thread_pool + */ +std::shared_ptr executor(); + +#ifdef LIBCORO_FEATURE_NETWORKING +/** + * Set up default coro::io_scheduler::options before constructing a single instance of coro::io_scheduler in + * coro::default_executor::io_executor() + * @param io_scheduler_options io_scheduler options + */ +void set_io_executor_options(io_scheduler::options io_scheduler_options); + +/** + * Get default coro::io_scheduler + */ +std::shared_ptr io_executor(); +#endif + +/** + * Get the perfect default executor + * + * This executor is ideal as a default argument in a library, + * in a place where thread_pool functionality is sufficient, + * but you don't want to have two executor instances per application for the same thing, + * one thread_pool and one io_scheduler. + */ +inline auto perfect() +{ +#ifdef LIBCORO_FEATURE_NETWORKING + return io_executor(); +#else + return executor(); +#endif +} + +} // namespace coro::default_executor diff --git a/include/coro/io_scheduler.hpp b/include/coro/io_scheduler.hpp index 4cdae2d3..771789f3 100644 --- a/include/coro/io_scheduler.hpp +++ b/include/coro/io_scheduler.hpp @@ -1,6 +1,7 @@ #pragma once #include "coro/detail/poll_info.hpp" +#include "coro/expected.hpp" #include "coro/fd.hpp" #include "coro/poll.hpp" #include "coro/thread_pool.hpp" @@ -20,6 +21,12 @@ namespace coro { +enum timeout_status +{ + no_timeout, + timeout, +}; + class io_scheduler : public std::enable_shared_from_this { using timed_events = detail::poll_info::timed_events; @@ -70,7 +77,7 @@ class io_scheduler : public std::enable_shared_from_this /// If inline task processing is enabled then the io worker will resume tasks on its thread /// rather than scheduling them to be picked up by the thread pool. - const execution_strategy_t execution_strategy{execution_strategy_t::process_tasks_on_thread_pool}; + execution_strategy_t execution_strategy{execution_strategy_t::process_tasks_on_thread_pool}; }; /** @@ -195,6 +202,77 @@ class io_scheduler : public std::enable_shared_from_this co_return co_await task; } + /** + * Schedules a task on the io_scheduler that must complete within the given timeout. + * NOTE: This version of schedule does *NOT* cancel the given task, it will continue executing even if it times out. + * It is absolutely recommended to use the version of this schedule() function that takes an std::stop_token + * and have the scheduled task check to see if its been cancelled due to timeout to not waste resources. + * @tparam return_type The return value of the task. + * @param task The task to schedule on the io_scheduler with the given timeout. + * @param timeout How long should this task be given to complete before it times out? + * @return The task to await for the input task to complete. + */ + template + [[nodiscard]] auto schedule(coro::task task, std::chrono::duration timeout) + -> coro::task> + { + using namespace std::chrono_literals; + + // If negative or 0 timeout, just schedule the task as normal. + auto timeout_ms = std::max(std::chrono::duration_cast(timeout), 0ms); + if (timeout_ms == 0ms) + { + co_return coro::expected(co_await schedule(std::move(task))); + } + + auto result = co_await when_any(std::move(task), make_timeout_task(timeout_ms)); + if (!std::holds_alternative(result)) + { + co_return coro::expected(std::move(std::get<0>(result))); + } + else + { + co_return coro::unexpected(std::move(std::get<1>(result))); + } + } + +#ifndef EMSCRIPTEN + /** + * Schedules a task on the io_scheduler that must complete within the given timeout. + * NOTE: This version of the task will have the stop_source.request_stop() be called if the timeout triggers. + * It is up to you to check in the scheduled task if the stop has been requested to actually stop executing + * the task. + * @tparam return_type The return value of the task. + * @param task The task to schedule on the io_scheduler with the given timeout. + * @param timeout How long should this task be given to complete before it times out? + * @return The task to await for the input task to complete. + */ + template + [[nodiscard]] auto + schedule(std::stop_source stop_source, coro::task task, std::chrono::duration timeout) + -> coro::task> + { + using namespace std::chrono_literals; + + // If negative or 0 timeout, just schedule the task as normal. + auto timeout_ms = std::max(std::chrono::duration_cast(timeout), 0ms); + if (timeout_ms == 0ms) + { + co_return coro::expected(co_await schedule(std::move(task))); + } + + auto result = co_await when_any(std::move(stop_source), std::move(task), make_timeout_task(timeout_ms)); + if (!std::holds_alternative(result)) + { + co_return coro::expected(std::move(std::get<0>(result))); + } + else + { + co_return coro::unexpected(std::move(std::get<1>(result))); + } + } +#endif + /** * Schedules the current task to run after the given amount of time has elapsed. * @param amount The amount of time to wait before resuming execution of this task. @@ -212,7 +290,10 @@ class io_scheduler : public std::enable_shared_from_this /** * Yields the current task to the end of the queue of waiting tasks. */ - [[nodiscard]] auto yield() -> schedule_operation { return schedule_operation{*this}; }; + [[nodiscard]] auto yield() -> schedule_operation + { + return schedule_operation{*this}; + }; /** * Yields the current task for the given amount of time. @@ -386,6 +467,12 @@ class io_scheduler : public std::enable_shared_from_this auto add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator; auto remove_timer_token(timed_events::iterator pos) -> void; auto update_timeout(time_point now) -> void; + + auto make_timeout_task(std::chrono::milliseconds timeout) -> coro::task + { + co_await schedule_after(timeout); + co_return timeout_status::timeout; + } }; } // namespace coro diff --git a/include/coro/mutex.hpp b/include/coro/mutex.hpp index c701071e..e799e4b5 100644 --- a/include/coro/mutex.hpp +++ b/include/coro/mutex.hpp @@ -1,5 +1,7 @@ #pragma once +#include "coro/task.hpp" + #include #include #include @@ -8,15 +10,66 @@ namespace coro { class mutex; +class scoped_lock; +class condition_variable; + +namespace detail +{ + +struct lock_operation_base +{ + explicit lock_operation_base(coro::mutex& m) : m_mutex(m) {} + virtual ~lock_operation_base() = default; + + lock_operation_base(const lock_operation_base&) = delete; + lock_operation_base(lock_operation_base&&) = delete; + auto operator=(const lock_operation_base&) -> lock_operation_base& = delete; + auto operator=(lock_operation_base&&) -> lock_operation_base& = delete; + + auto await_ready() const noexcept -> bool; + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool; + +protected: + friend class coro::mutex; + + coro::mutex& m_mutex; + std::coroutine_handle<> m_awaiting_coroutine; + lock_operation_base* m_next{nullptr}; +}; + +template +struct lock_operation : public lock_operation_base +{ + explicit lock_operation(coro::mutex& m) : lock_operation_base(m) {} + ~lock_operation() override = default; + + lock_operation(const lock_operation&) = delete; + lock_operation(lock_operation&&) = delete; + auto operator=(const lock_operation&) -> lock_operation& = delete; + auto operator=(lock_operation&&) -> lock_operation& = delete; + + auto await_resume() noexcept -> return_type + { + if constexpr (std::is_same_v) + { + return scoped_lock{m_mutex}; + } + else + { + return; + } + } +}; + +} // namespace detail /** - * A scoped RAII lock holder, just like std::lock_guard or std::scoped_lock in that the coro::mutex - * is always unlocked unpon this coro::scoped_lock going out of scope. It is possible to unlock the - * coro::mutex prior to the end of its current scope by manually calling the unlock() function. + * A scoped RAII lock holder similar to std::unique_lock. */ class scoped_lock { - friend class mutex; + friend class coro::mutex; + friend class coro::condition_variable; // cv.wait() functions need to be able do unlock and re-lock public: enum class lock_strategy @@ -25,7 +78,7 @@ class scoped_lock adopt }; - explicit scoped_lock(mutex& m, lock_strategy strategy = lock_strategy::adopt) : m_mutex(&m) + explicit scoped_lock(class coro::mutex& m, lock_strategy strategy = lock_strategy::adopt) : m_mutex(&m) { // Future -> support acquiring the lock? Not sure how to do that without being able to // co_await in the constructor. @@ -38,7 +91,8 @@ class scoped_lock ~scoped_lock(); scoped_lock(const scoped_lock&) = delete; - scoped_lock(scoped_lock&& other) : m_mutex(std::exchange(other.m_mutex, nullptr)) {} + scoped_lock(scoped_lock&& other) + : m_mutex(std::exchange(other.m_mutex, nullptr)) {} auto operator=(const scoped_lock&) -> scoped_lock& = delete; auto operator=(scoped_lock&& other) noexcept -> scoped_lock& { @@ -50,13 +104,12 @@ class scoped_lock } /** - * Unlocks the scoped lock prior to it going out of scope. Calling this multiple times has no - * additional affect after the first call. + * Unlocks the scoped lock prior to it going out of scope. */ auto unlock() -> void; private: - mutex* m_mutex{nullptr}; + class coro::mutex* m_mutex{nullptr}; }; class mutex @@ -70,34 +123,25 @@ class mutex auto operator=(const mutex&) -> mutex& = delete; auto operator=(mutex&&) -> mutex& = delete; - struct lock_operation - { - explicit lock_operation(mutex& m) : m_mutex(m) {} - - auto await_ready() const noexcept -> bool; - auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool; - auto await_resume() noexcept -> scoped_lock { return scoped_lock{m_mutex}; } - - private: - friend class mutex; - - mutex& m_mutex; - std::coroutine_handle<> m_awaiting_coroutine; - lock_operation* m_next{nullptr}; - }; - /** - * To acquire the mutex's lock co_await this function. Upon acquiring the lock it returns - * a coro::scoped_lock which will hold the mutex until the coro::scoped_lock destructs. + * @brief To acquire the mutex's lock co_await this function. Upon acquiring the lock it returns a coro::scoped_lock + * which will hold the mutex until the coro::scoped_lock destructs. * @return A co_await'able operation to acquire the mutex. */ - [[nodiscard]] auto lock() -> lock_operation { return lock_operation{*this}; }; + [[nodiscard]] auto scoped_lock() -> detail::lock_operation { return detail::lock_operation{*this}; } + + /** + * @brief Locks the mutex. + * + * @return detail::lock_operation + */ + [[nodiscard]] auto lock() -> detail::lock_operation { return detail::lock_operation{*this}; } /** * Attempts to lock the mutex. * @return True if the mutex lock was acquired, otherwise false. */ - auto try_lock() -> bool; + [[nodiscard]] auto try_lock() -> bool; /** * Releases the mutex's lock. @@ -105,7 +149,7 @@ class mutex auto unlock() -> void; private: - friend struct lock_operation; + friend struct detail::lock_operation_base; /// unlocked -> state == unlocked_value() /// locked but empty waiter list == nullptr @@ -113,7 +157,7 @@ class mutex std::atomic m_state; /// A list of grabbed internal waiters that are only accessed by the unlock()'er. - lock_operation* m_internal_waiters{nullptr}; + detail::lock_operation_base* m_internal_waiters{nullptr}; /// Inactive value, this cannot be nullptr since we want nullptr to signify that the mutex /// is locked but there are zero waiters, this makes it easy to CAS new waiters into the diff --git a/include/coro/queue.hpp b/include/coro/queue.hpp new file mode 100644 index 00000000..52cc1663 --- /dev/null +++ b/include/coro/queue.hpp @@ -0,0 +1,363 @@ +#pragma once + +#include "coro/concepts/executor.hpp" +#include "coro/expected.hpp" +#include "coro/sync_wait.hpp" + +#include + +namespace coro +{ + +enum class queue_produce_result +{ + /** + * @brief The item was successfully produced. + */ + produced, + /** + * @brief The queue is shutting down or stopped, no more items are allowed to be produced. + */ + queue_stopped +}; + +enum class queue_consume_result +{ + /** + * @brief The queue has shut down/stopped and the user should stop calling pop(). + */ + queue_stopped +}; + +/** + * @brief An unbounded queue. If the queue is empty and there are waiters to consume then + * there are no allocations and the coroutine context will simply be passed to the + * waiter. If there are no waiters the item being produced will be placed into the + * queue. + * + * @tparam element_type The type of items being produced and consumed. + */ +template +class queue +{ +public: + struct awaiter + { + explicit awaiter(queue& q) noexcept : m_queue(q) {} + + /** + * @brief Acquires the coro::queue lock. + * + * @return coro::task + */ + auto make_acquire_lock_task() -> coro::task { co_return co_await m_queue.m_mutex.scoped_lock(); } + + auto await_ready() noexcept -> bool + { + // This awaiter is ready when it has actually acquired an element or it is shutting down. + if (m_queue.m_stopped.load(std::memory_order::acquire)) + { + return false; + } + + auto lock = coro::sync_wait(make_acquire_lock_task()); + if (!m_queue.empty()) + { + if constexpr (std::is_move_constructible_v) + { + m_element = std::move(m_queue.m_elements.front()); + } + else + { + m_element = m_queue.m_elements.front(); + } + + m_queue.m_elements.pop(); + return true; + } + + return false; + } + + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool + { + // Don't suspend if the stop signal has been set. + if (m_queue.m_stopped.load(std::memory_order::acquire)) + { + return false; + } + + auto lock = coro::sync_wait(make_acquire_lock_task()); + if (!m_queue.empty()) + { + if constexpr (std::is_move_constructible_v) + { + m_element = std::move(m_queue.m_elements.front()); + } + else + { + m_element = m_queue.m_elements.front(); + } + + m_queue.m_elements.pop(); + return false; + } + + // No element is ready, put ourselves on the waiter list and suspend. + this->m_next = m_queue.m_waiters; + m_queue.m_waiters = this; + m_awaiting_coroutine = awaiting_coroutine; + + return true; + } + + [[nodiscard]] auto await_resume() noexcept -> expected + { + if (m_element.has_value()) + { + if constexpr (std::is_move_constructible_v) + { + return std::move(m_element.value()); + } + else + { + return m_element.value(); + } + } + else + { + // If we don't have an item the queue has stopped, the prior functions will have checked the state. + return unexpected(queue_consume_result::queue_stopped); + } + } + + std::optional m_element{std::nullopt}; + queue& m_queue; + std::coroutine_handle<> m_awaiting_coroutine{nullptr}; + /// The next awaiter in line for this queue, nullptr if this is the end. + awaiter* m_next{nullptr}; + }; + + queue() {} + ~queue() {} + + queue(const queue&) = delete; + queue(queue&& other) = delete; + + auto operator=(const queue&) -> queue& = delete; + auto operator=(queue&& other) -> queue& = delete; + + /** + * @brief Determines if the queue is empty. + * + * @return true If the queue is empty. + * @return false If the queue is not empty. + */ + auto empty() const -> bool { return size() == 0; } + + /** + * @brief Gets the number of elements in the queue. + * + * @return std::size_t The number of elements in the queue. + */ + auto size() const -> std::size_t + { + std::atomic_thread_fence(std::memory_order::acquire); + return m_elements.size(); + } + + /** + * @brief Pushes the element into the queue. If the queue is empty and there are waiters + * then the element will be processed immediately by transfering the coroutine task + * context to the waiter. + * + * @param element The element being produced. + * @return coro::task + */ + auto push(const element_type& element) -> coro::task + { + if (m_shutting_down.load(std::memory_order::acquire)) + { + co_return queue_produce_result::queue_stopped; + } + + // The general idea is to see if anyone is waiting, and if so directly transfer the element + // to that waiter. If there is nobody waiting then move the element into the queue. + auto lock = co_await m_mutex.scoped_lock(); + + if (m_waiters != nullptr) + { + awaiter* waiter = m_waiters; + m_waiters = m_waiters->m_next; + lock.unlock(); + + // Transfer the element directly to the awaiter. + waiter->m_element = element; + waiter->m_awaiting_coroutine.resume(); + } + else + { + m_elements.push(element); + } + + co_return queue_produce_result::produced; + } + + /** + * @brief Pushes the element into the queue. If the queue is empty and there are waiters + * then the element will be processed immediately by transfering the coroutine task + * context to the waiter. + * + * @param element The element being produced. + * @return coro::task + */ + auto push(element_type&& element) -> coro::task + { + if (m_shutting_down.load(std::memory_order::acquire)) + { + co_return queue_produce_result::queue_stopped; + } + + auto lock = co_await m_mutex.scoped_lock(); + + if (m_waiters != nullptr) + { + awaiter* waiter = m_waiters; + m_waiters = m_waiters->m_next; + lock.unlock(); + + // Transfer the element directly to the awaiter. + waiter->m_element = std::move(element); + waiter->m_awaiting_coroutine.resume(); + } + else + { + m_elements.push(std::move(element)); + } + + co_return queue_produce_result::produced; + } + + /** + * @brief Emplaces an element into the queue. Has the same behavior as push if the queue + * is empty and has waiters. + * + * @param args The element's constructor argument types and values. + * @return coro::task + */ + template + auto emplace(args_type&&... args) -> coro::task + { + if (m_shutting_down.load(std::memory_order::acquire)) + { + co_return queue_produce_result::queue_stopped; + } + + auto lock = co_await m_mutex.scoped_lock(); + + if (m_waiters != nullptr) + { + awaiter* waiter = m_waiters; + m_waiters = m_waiters->m_next; + lock.unlock(); + + waiter->m_element.emplace(std::forward(args)...); + waiter->m_awaiting_coroutine.resume(); + } + else + { + m_elements.emplace(std::forward(args)...); + } + + co_return queue_produce_result::produced; + } + + /** + * @brief Pops the head element of the queue if available, or waits for one to be available. + * + * @return awaiter A waiter task that upon co_await complete returns an element or the queue + * status that it is shut down. + */ + [[nodiscard]] auto pop() -> awaiter { return awaiter{*this}; } + + /** + * @brief Shuts down the queue immediately discarding any elements that haven't been processed. + * + * @return coro::task + */ + auto shutdown_notify_waiters() -> coro::task + { + auto expected = false; + if (!m_shutting_down.compare_exchange_strong( + expected, true, std::memory_order::acq_rel, std::memory_order::relaxed)) + { + co_return; + } + + // Since this isn't draining just let the awaiters know we're stopped. + m_stopped.exchange(true, std::memory_order::release); + + while (m_waiters != nullptr) + { + co_await m_mutex.lock(); + auto* to_resume = m_waiters; + m_waiters = m_waiters->m_next; + m_mutex.unlock(); + + to_resume->m_awaiting_coroutine.resume(); + } + } + + /** + * @brief Shuts down the queue but waits for it to be drained so all elements are processed. + * Will yield on the given executor between checking if the queue is empty so the tasks + * can be processed. + * + * @tparam executor_t The executor type. + * @param e The executor to yield this task to wait for elements to be processed. + * @return coro::task + */ + template + auto shutdown_notify_waiters_drain(executor_t& e) -> coro::task + { + auto expected = false; + if (!m_shutting_down.compare_exchange_strong( + expected, true, std::memory_order::acq_rel, std::memory_order::relaxed)) + { + co_return; + } + + while (!empty()) + { + co_await e.yield(); + } + + // Now that the queue is drained let all the awaiters know that we're stopped. + m_stopped.exchange(true, std::memory_order::release); + + while (m_waiters != nullptr) + { + co_await m_mutex.lock(); + auto* to_resume = m_waiters; + m_waiters = m_waiters->m_next; + m_mutex.unlock(); + + to_resume->m_awaiting_coroutine.resume(); + } + } + +private: + friend awaiter; + /// The list of pop() awaiters. + awaiter* m_waiters{nullptr}; + /// Mutex for properly maintaining the queue. + coro::mutex m_mutex{}; + /// The underlying queue data structure. + std::queue m_elements{}; + /// Has the shutdown process begun? + std::atomic m_shutting_down{false}; + /// Has this queue been shutdown? + std::atomic m_stopped{false}; +}; + +} // namespace coro diff --git a/include/coro/ring_buffer.hpp b/include/coro/ring_buffer.hpp index 3e5149dc..16657960 100644 --- a/include/coro/ring_buffer.hpp +++ b/include/coro/ring_buffer.hpp @@ -1,6 +1,6 @@ #pragma once -#include +#include "coro/expected.hpp" #include #include @@ -74,7 +74,6 @@ class ring_buffer // Don't suspend if the stop signal has been set. if (m_rb.m_stopped.load(std::memory_order::acquire)) { - m_stopped = true; return false; } @@ -89,7 +88,8 @@ class ring_buffer */ auto await_resume() -> rb::produce_result { - return !m_stopped ? rb::produce_result::produced : rb::produce_result::ring_buffer_stopped; + return !m_rb.m_stopped.load(std::memory_order::acquire) ? rb::produce_result::produced + : rb::produce_result::ring_buffer_stopped; } private: @@ -104,8 +104,6 @@ class ring_buffer produce_operation* m_next{nullptr}; /// The element this produce operation is producing into the ring buffer. element m_e; - /// Was the operation stopped? - bool m_stopped{false}; }; struct consume_operation @@ -131,7 +129,6 @@ class ring_buffer // Don't suspend if the stop signal has been set. if (m_rb.m_stopped.load(std::memory_order::acquire)) { - m_stopped = true; return false; } m_awaiting_coroutine = awaiting_coroutine; @@ -145,7 +142,7 @@ class ring_buffer */ auto await_resume() -> expected { - if (m_stopped) + if (m_rb.m_stopped.load(std::memory_order::acquire)) { return unexpected(rb::consume_result::ring_buffer_stopped); } @@ -165,8 +162,6 @@ class ring_buffer consume_operation* m_next{nullptr}; /// The element this consume operation will consume. element m_e; - /// Was the operation stopped? - bool m_stopped{false}; }; /** @@ -202,20 +197,19 @@ class ring_buffer */ auto notify_waiters() -> void { - std::unique_lock lk{m_mutex}; - // Only wake up waiters once. - if (m_stopped.load(std::memory_order::acquire)) + auto expected = false; + if (!m_stopped.compare_exchange_strong(expected, true, std::memory_order::acq_rel, std::memory_order::relaxed)) { + // Only wake up waiters once. return; } - m_stopped.exchange(true, std::memory_order::release); + std::unique_lock lk{m_mutex}; while (m_produce_waiters != nullptr) { - auto* to_resume = m_produce_waiters; - to_resume->m_stopped = true; - m_produce_waiters = m_produce_waiters->m_next; + auto* to_resume = m_produce_waiters; + m_produce_waiters = m_produce_waiters->m_next; lk.unlock(); to_resume->m_awaiting_coroutine.resume(); @@ -224,9 +218,8 @@ class ring_buffer while (m_consume_waiters != nullptr) { - auto* to_resume = m_consume_waiters; - to_resume->m_stopped = true; - m_consume_waiters = m_consume_waiters->m_next; + auto* to_resume = m_consume_waiters; + m_consume_waiters = m_consume_waiters->m_next; lk.unlock(); to_resume->m_awaiting_coroutine.resume(); diff --git a/include/coro/sync_wait.hpp b/include/coro/sync_wait.hpp index b403e953..36c93ea6 100644 --- a/include/coro/sync_wait.hpp +++ b/include/coro/sync_wait.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include namespace coro @@ -67,9 +68,9 @@ class sync_wait_task_promise : public sync_wait_task_promise_base static constexpr bool return_type_is_reference = std::is_reference_v; using stored_type = std::conditional_t< - return_type_is_reference, - std::remove_reference_t*, - std::remove_const_t>; + return_type_is_reference, + std::remove_reference_t*, + std::remove_const_t>; using variant_type = std::variant; sync_wait_task_promise() noexcept = default; @@ -88,9 +89,9 @@ class sync_wait_task_promise : public sync_wait_task_promise_base auto get_return_object() noexcept { return coroutine_type::from_promise(*this); } template - requires(return_type_is_reference and std::is_constructible_v) or - (not return_type_is_reference and - std::is_constructible_v) auto return_value(value_type&& value) -> void + requires(return_type_is_reference and std::is_constructible_v) or + (not return_type_is_reference and std::is_constructible_v) + auto return_value(value_type&& value) -> void { if constexpr (return_type_is_reference) { @@ -103,7 +104,8 @@ class sync_wait_task_promise : public sync_wait_task_promise_base } } - auto return_value(stored_type value) -> void requires(not return_type_is_reference) + auto return_value(stored_type value) -> void + requires(not return_type_is_reference) { if constexpr (std::is_move_constructible_v) { diff --git a/include/coro/task.hpp b/include/coro/task.hpp index d2cd6968..6a736a1c 100644 --- a/include/coro/task.hpp +++ b/include/coro/task.hpp @@ -102,7 +102,7 @@ struct promise final : public promise_base } } - auto return_value(stored_type value) -> void requires(not return_type_is_reference) + auto return_value(stored_type&& value) -> void requires(not return_type_is_reference) { if constexpr (std::is_move_constructible_v) { diff --git a/include/coro/thread_pool.hpp b/include/coro/thread_pool.hpp index 77113930..b730317f 100644 --- a/include/coro/thread_pool.hpp +++ b/include/coro/thread_pool.hpp @@ -25,7 +25,7 @@ namespace coro * the thread pool will stop accepting new tasks but will complete all tasks that were scheduled * prior to the shutdown request. */ -class thread_pool +class thread_pool : public std::enable_shared_from_this { public: /** diff --git a/include/coro/when_any.hpp b/include/coro/when_any.hpp index cd35c032..a084d246 100644 --- a/include/coro/when_any.hpp +++ b/include/coro/when_any.hpp @@ -6,13 +6,16 @@ #include "coro/concepts/awaitable.hpp" #include "coro/detail/task_self_deleting.hpp" #include "coro/event.hpp" + #include "coro/expected.hpp" #include "coro/mutex.hpp" #include "coro/task.hpp" + #include "coro/when_all.hpp" #include #include #include #include + #include #include #include @@ -22,21 +25,51 @@ namespace coro namespace detail { +template +auto make_when_any_tuple_task( + std::atomic& first_completed, coro::event& notify, std::optional& return_value, awaitable a) + -> coro::task +{ + auto expected = false; + auto result = co_await static_cast(a); + if (first_completed.compare_exchange_strong(expected, true, std::memory_order::acq_rel, std::memory_order::relaxed)) + { + return_value = std::move(result); + notify.set(); + } + + co_return; +} + +template +[[nodiscard]] auto make_when_any_tuple_controller_task( + coro::event& notify, std::optional& return_value, awaitable_type... awaitables) + -> coro::detail::task_self_deleting +{ + std::atomic first_completed{false}; + co_await when_all(make_when_any_tuple_task(first_completed, notify, return_value, std::move(awaitables))...); + co_return; +} + +template +static auto make_when_any_task_return_void(awaitable a, coro::event& notify) -> coro::task +{ + co_await static_cast(a); + notify.set(); // This will trigger the controller task to wake up exactly once. + co_return; +} + template static auto make_when_any_task( - awaitable a, - coro::mutex& m, - std::atomic& return_value_set, - coro::event& notify, - std::optional& return_value) -> coro::task + awaitable a, std::atomic& first_completed, coro::event& notify, std::optional& return_value) + -> coro::task { - auto result = co_await static_cast(a); - co_await m.lock(); + auto expected = false; + auto result = co_await static_cast(a); // Its important to only touch return_value and notify once since their lifetimes will be destroyed - // after being set ane notified the first time. - if (return_value_set.load(std::memory_order::acquire) == false) + // after being set and notified the first time. + if (first_completed.compare_exchange_strong(expected, true, std::memory_order::acq_rel, std::memory_order::relaxed)) { - return_value_set.store(true, std::memory_order::release); return_value = std::move(result); notify.set(); } @@ -44,6 +77,26 @@ static auto make_when_any_task( co_return; } +template> +static auto make_when_any_controller_task_return_void(range_type awaitables, coro::event& notify) + -> coro::detail::task_self_deleting +{ + std::vector> tasks{}; + + if constexpr (std::ranges::sized_range) + { + tasks.reserve(std::size(awaitables)); + } + + for (auto&& a : awaitables) + { + tasks.emplace_back(make_when_any_task_return_void(std::move(a), notify)); + } + + co_await coro::when_all(std::move(tasks)); + co_return; +} + template< std::ranges::range range_type, concepts::awaitable awaitable_type = std::ranges::range_value_t, @@ -53,11 +106,10 @@ static auto make_when_any_controller_task( range_type awaitables, coro::event& notify, std::optional& return_value) -> coro::detail::task_self_deleting { - // These must live for as long as the longest running when_any task since each task tries to see + // This must live for as long as the longest running when_any task since each task tries to see // if it was the first to complete. Only the very first task to complete will set the return_value // and notify. - coro::mutex m{}; - std::atomic return_value_set{false}; + std::atomic first_completed{false}; // This detatched task will maintain the lifetime of all the when_any tasks. std::vector> tasks{}; @@ -69,8 +121,8 @@ static auto make_when_any_controller_task( for (auto&& a : awaitables) { - tasks.emplace_back(make_when_any_task( - std::move(a), m, return_value_set, notify, return_value)); + tasks.emplace_back( + make_when_any_task(std::move(a), first_completed, notify, return_value)); } co_await coro::when_all(std::move(tasks)); @@ -79,6 +131,41 @@ static auto make_when_any_controller_task( } // namespace detail +template +[[nodiscard]] auto when_any(std::stop_source stop_source, awaitable_type... awaitables) -> coro::task< + std::variant::awaiter_return_type>...>> +{ + using return_type = std::variant< + std::remove_reference_t::awaiter_return_type>...>; + + coro::event notify{}; + std::optional return_value{std::nullopt}; + auto controller_task = + detail::make_when_any_tuple_controller_task(notify, return_value, std::forward(awaitables)...); + controller_task.handle().resume(); + + co_await notify; + stop_source.request_stop(); + co_return std::move(return_value.value()); +} + +template +[[nodiscard]] auto when_any(awaitable_type... awaitables) -> coro::task< + std::variant::awaiter_return_type>...>> +{ + using return_type = std::variant< + std::remove_reference_t::awaiter_return_type>...>; + + coro::event notify{}; + std::optional return_value{std::nullopt}; + auto controller_task = + detail::make_when_any_tuple_controller_task(notify, return_value, std::forward(awaitables)...); + controller_task.handle().resume(); + + co_await notify; + co_return std::move(return_value.value()); +} + template< std::ranges::range range_type, concepts::awaitable awaitable_type = std::ranges::range_value_t, @@ -86,17 +173,32 @@ template< typename return_type_base = std::remove_reference_t> [[nodiscard]] auto when_any(std::stop_source stop_source, range_type awaitables) -> coro::task { - // Using an std::optional to prevent the need to default construct the type on the stack. - std::optional return_value{std::nullopt}; - coro::event notify{}; + coro::event notify{}; - auto controller_task = - detail::make_when_any_controller_task(std::forward(awaitables), notify, return_value); - controller_task.handle().resume(); + if constexpr (std::is_void_v) + { + auto controller_task = + detail::make_when_any_controller_task_return_void(std::forward(awaitables), notify); + controller_task.handle().resume(); - co_await notify; - stop_source.request_stop(); - co_return std::move(return_value.value()); + co_await notify; + stop_source.request_stop(); + co_return; + } + else + { + // Using an std::optional to prevent the need to default construct the type on the stack. + std::optional return_value{std::nullopt}; + + auto controller_task = + detail::make_when_any_controller_task(std::forward(awaitables), notify, return_value); + controller_task.handle().resume(); + + co_await notify; + stop_source.request_stop(); + + co_return std::move(return_value.value()); + } } template< @@ -106,15 +208,29 @@ template< typename return_type_base = std::remove_reference_t> [[nodiscard]] auto when_any(range_type awaitables) -> coro::task { - std::optional return_value{std::nullopt}; - coro::event notify{}; + coro::event notify{}; - auto controller_task = - detail::make_when_any_controller_task(std::forward(awaitables), notify, return_value); - controller_task.handle().resume(); + if constexpr (std::is_void_v) + { + auto controller_task = + detail::make_when_any_controller_task_return_void(std::forward(awaitables), notify); + controller_task.handle().resume(); - co_await notify; - co_return std::move(return_value.value()); + co_await notify; + co_return; + } + else + { + std::optional return_value{std::nullopt}; + + auto controller_task = + detail::make_when_any_controller_task(std::forward(awaitables), notify, return_value); + controller_task.handle().resume(); + + co_await notify; + + co_return std::move(return_value.value()); + } } } // namespace coro diff --git a/src/default_executor.cpp b/src/default_executor.cpp new file mode 100644 index 00000000..72d05c16 --- /dev/null +++ b/src/default_executor.cpp @@ -0,0 +1,85 @@ +#include "coro/default_executor.hpp" +#include +#include + +static const auto s_initialization_check_interval = std::chrono::milliseconds(1); + +static coro::thread_pool::options s_default_executor_options; +static std::atomic s_default_executor = {nullptr}; +static std::shared_ptr s_default_executor_shared; +static const auto s_default_executor_initializing = reinterpret_cast(&s_default_executor); + +#ifdef LIBCORO_FEATURE_NETWORKING +static coro::io_scheduler::options s_default_io_executor_options; +static std::atomic s_default_io_executor = {nullptr}; +static std::shared_ptr s_default_io_executor_shared; +static const auto s_default_io_executor_initializing = reinterpret_cast(&s_default_io_executor); +#endif + +void coro::default_executor::set_executor_options(thread_pool::options thread_pool_options) +{ + s_default_executor_options = thread_pool_options; +} + +std::shared_ptr coro::default_executor::executor() +{ + do + { + auto result = s_default_executor.load(std::memory_order::acquire); + while (result == s_default_executor_initializing) + { + std::this_thread::sleep_for(s_initialization_check_interval); + result = s_default_executor.load(std::memory_order::acquire); + } + + if (result) + { + return result->shared_from_this(); + } + + if (s_default_executor.compare_exchange_strong( + result, s_default_executor_initializing, std::memory_order::release, std::memory_order::acquire)) + { + break; + } + } while (true); + + s_default_executor_shared = std::make_shared(s_default_executor_options); + s_default_executor.store(s_default_executor_shared.get(), std::memory_order::release); + return s_default_executor_shared; +} + +#ifdef LIBCORO_FEATURE_NETWORKING +void coro::default_executor::set_io_executor_options(io_scheduler::options io_scheduler_options) +{ + s_default_io_executor_options = io_scheduler_options; +} + +std::shared_ptr coro::default_executor::io_executor() +{ + do + { + auto result = s_default_io_executor.load(std::memory_order::acquire); + while (result == s_default_io_executor_initializing) + { + std::this_thread::sleep_for(s_initialization_check_interval); + result = s_default_io_executor.load(std::memory_order::acquire); + } + + if (result) + { + return result->shared_from_this(); + } + + if (s_default_io_executor.compare_exchange_strong( + result, s_default_io_executor_initializing, std::memory_order::release, std::memory_order::acquire)) + { + break; + } + } while (true); + + s_default_io_executor_shared = coro::io_scheduler::make_shared(s_default_io_executor_options); + s_default_io_executor.store(s_default_io_executor_shared.get(), std::memory_order::release); + return s_default_io_executor_shared; +} +#endif diff --git a/src/detail/task_self_deleting.cpp b/src/detail/task_self_deleting.cpp index c86155ab..3d4eaf44 100644 --- a/src/detail/task_self_deleting.cpp +++ b/src/detail/task_self_deleting.cpp @@ -20,7 +20,7 @@ promise_self_deleting::promise_self_deleting(promise_self_deleting&& other) auto promise_self_deleting::operator=(promise_self_deleting&& other) -> promise_self_deleting& { - if (std::addressof(other) != nullptr) + if (std::addressof(other) != this) { m_executor_size = std::exchange(other.m_executor_size, nullptr); } diff --git a/src/mutex.cpp b/src/mutex.cpp index b836929c..d12a23b3 100644 --- a/src/mutex.cpp +++ b/src/mutex.cpp @@ -4,23 +4,9 @@ namespace coro { -scoped_lock::~scoped_lock() +namespace detail { - unlock(); -} - -auto scoped_lock::unlock() -> void -{ - if (m_mutex != nullptr) - { - std::atomic_thread_fence(std::memory_order::release); - m_mutex->unlock(); - // Only allow a scoped lock to unlock the mutex a single time. - m_mutex = nullptr; - } -} - -auto mutex::lock_operation::await_ready() const noexcept -> bool +auto lock_operation_base::await_ready() const noexcept -> bool { if (m_mutex.try_lock()) { @@ -31,7 +17,7 @@ auto mutex::lock_operation::await_ready() const noexcept -> bool return false; } -auto mutex::lock_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool +auto lock_operation_base::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool { m_awaiting_coroutine = awaiting_coroutine; void* current = m_mutex.m_state.load(std::memory_order::acquire); @@ -49,7 +35,7 @@ auto mutex::lock_operation::await_suspend(std::coroutine_handle<> awaiting_corou { // If the current value is a waiting lock operation, or nullptr, set our next to that // lock op and attempt to set ourself as the head of the waiter list. - m_next = static_cast(current); + m_next = static_cast(current); new_value = static_cast(this); } } while (!m_mutex.m_state.compare_exchange_weak(current, new_value, std::memory_order::acq_rel)); @@ -65,6 +51,23 @@ auto mutex::lock_operation::await_suspend(std::coroutine_handle<> awaiting_corou return true; } +} // namespace detail + +scoped_lock::~scoped_lock() +{ + unlock(); +} + +auto scoped_lock::unlock() -> void +{ + if (m_mutex != nullptr) + { + std::atomic_thread_fence(std::memory_order::release); + m_mutex->unlock(); + m_mutex = nullptr; + } +} + auto mutex::try_lock() -> bool { void* expected = const_cast(unlocked_value()); @@ -92,7 +95,7 @@ auto mutex::unlock() -> void } // There are waiters on the atomic list, acquire them and update the state for all others. - m_internal_waiters = static_cast(m_state.exchange(nullptr, std::memory_order::acq_rel)); + m_internal_waiters = static_cast(m_state.exchange(nullptr, std::memory_order::acq_rel)); // Should internal waiters be reversed to allow for true FIFO, or should they be resumed // in this reverse order to maximum throuhgput? If this list ever gets 'long' the reversal @@ -105,8 +108,8 @@ auto mutex::unlock() -> void // assert m_internal_waiters != nullptr - lock_operation* to_resume = m_internal_waiters; - m_internal_waiters = m_internal_waiters->m_next; + detail::lock_operation_base* to_resume = m_internal_waiters; + m_internal_waiters = m_internal_waiters->m_next; to_resume->m_awaiting_coroutine.resume(); } diff --git a/src/net/tls/client.cpp b/src/net/tls/client.cpp index ba5388de..1c67fdf7 100644 --- a/src/net/tls/client.cpp +++ b/src/net/tls/client.cpp @@ -58,7 +58,7 @@ client::~client() if (m_tls_info.m_tls_ptr != nullptr && !m_tls_info.m_tls_error) { // Should the shutdown timeout be configurable? - m_io_scheduler->schedule(tls_shutdown_and_free( + m_io_scheduler->spawn(tls_shutdown_and_free( m_io_scheduler, std::move(m_socket), std::move(m_tls_info.m_tls_ptr), std::chrono::seconds{30})); } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 854270d7..3024aded 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -7,6 +7,7 @@ set(LIBCORO_TEST_SOURCE_FILES test_latch.cpp test_mutex.cpp test_ring_buffer.cpp + test_queue.cpp test_semaphore.cpp test_shared_mutex.cpp test_sync_wait.cpp diff --git a/test/bench.cpp b/test/bench.cpp index c77c5036..4f01ac3e 100644 --- a/test/bench.cpp +++ b/test/bench.cpp @@ -940,7 +940,7 @@ TEST_CASE("benchmark tls::server echo server thread pool", "[benchmark]") { // std::cerr << "CLIENT: writing histogram\n"; - auto lock = co_await histogram_mutex.lock(); + auto lock = co_await histogram_mutex.scoped_lock(); for (auto [ms, count] : histogram) { g_histogram[ms] += count; diff --git a/test/net/test_dns_resolver.cpp b/test/net/test_dns_resolver.cpp index 9e2cd135..3fac8216 100644 --- a/test/net/test_dns_resolver.cpp +++ b/test/net/test_dns_resolver.cpp @@ -12,7 +12,7 @@ TEST_CASE("dns_resolver basic", "[dns]") coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); coro::net::dns::resolver dns_resolver{scheduler, std::chrono::milliseconds{5000}}; - auto make_host_by_name_task = [](std::shared_ptr& scheduler, + auto make_host_by_name_task = [](std::shared_ptr scheduler, coro::net::dns::resolver& dns_resolver, coro::net::hostname hn) -> coro::task { diff --git a/test/net/test_tcp_server.cpp b/test/net/test_tcp_server.cpp index 44b3097e..95686af5 100644 --- a/test/net/test_tcp_server.cpp +++ b/test/net/test_tcp_server.cpp @@ -14,9 +14,9 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") auto scheduler = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); - auto make_client_task = [](std::shared_ptr& scheduler, - const std::string& client_msg, - const std::string& server_msg) -> coro::task + auto make_client_task = [](std::shared_ptr scheduler, + const std::string& client_msg, + const std::string& server_msg) -> coro::task { co_await scheduler->schedule(); coro::net::tcp::client client{scheduler}; @@ -49,7 +49,7 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") co_return; }; - auto make_server_task = [](std::shared_ptr& scheduler, + auto make_server_task = [](std::shared_ptr scheduler, const std::string& client_msg, const std::string& server_msg) -> coro::task { @@ -99,7 +99,7 @@ TEST_CASE("tcp_server concurrent polling on the same socket", "[tcp_server]") auto scheduler = coro::io_scheduler::make_shared(coro::io_scheduler::options{ .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline}); - auto make_server_task = [](std::shared_ptr& scheduler) -> coro::task + auto make_server_task = [](std::shared_ptr scheduler) -> coro::task { auto make_read_task = [](coro::net::tcp::client client) -> coro::task { @@ -144,7 +144,7 @@ TEST_CASE("tcp_server concurrent polling on the same socket", "[tcp_server]") co_return data; }; - auto make_client_task = [](std::shared_ptr& scheduler) -> coro::task + auto make_client_task = [](std::shared_ptr scheduler) -> coro::task { co_await scheduler->schedule(); coro::net::tcp::client client{scheduler}; diff --git a/test/test_io_scheduler.cpp b/test/test_io_scheduler.cpp index e4248f3e..5d9e9ebf 100644 --- a/test/test_io_scheduler.cpp +++ b/test/test_io_scheduler.cpp @@ -598,10 +598,10 @@ TEST_CASE("io_scheduler self generating coroutine (stack overflow check)", "[io_ std::vector> tasks; tasks.reserve(total); - auto func = [](std::shared_ptr& s, - uint64_t& counter, - auto f, - std::vector>& tasks) -> coro::task + auto func = [](std::shared_ptr s, + uint64_t& counter, + auto f, + std::vector>& tasks) -> coro::task { co_await s->schedule(); ++counter; diff --git a/test/test_mutex.cpp b/test/test_mutex.cpp index 03835eee..b2553a0e 100644 --- a/test/test_mutex.cpp +++ b/test/test_mutex.cpp @@ -16,7 +16,7 @@ TEST_CASE("mutex single waiter not locked", "[mutex]") { std::cerr << "Acquiring lock\n"; { - auto scoped_lock = co_await m.lock(); + auto scoped_lock = co_await m.scoped_lock(); REQUIRE_FALSE(m.try_lock()); std::cerr << "lock acquired, emplacing back 1\n"; output.emplace_back(1); @@ -55,7 +55,7 @@ TEST_CASE("mutex many waiters until event", "[mutex]") { co_await tp.schedule(); std::cerr << "id = " << id << " waiting to acquire the lock\n"; - auto scoped_lock = co_await m.lock(); + auto scoped_lock = co_await m.scoped_lock(); // Should always be locked upon acquiring the locks. REQUIRE_FALSE(m.try_lock()); @@ -70,7 +70,7 @@ TEST_CASE("mutex many waiters until event", "[mutex]") { co_await tp.schedule(); std::cerr << "block task acquiring lock\n"; - auto scoped_lock = co_await m.lock(); + auto scoped_lock = co_await m.scoped_lock(); REQUIRE_FALSE(m.try_lock()); std::cerr << "block task acquired lock, waiting on event\n"; co_await e; @@ -108,7 +108,7 @@ TEST_CASE("mutex scoped_lock unlock prior to scope exit", "[mutex]") auto make_task = [](coro::mutex& m) -> coro::task { { - auto lk = co_await m.lock(); + auto lk = co_await m.scoped_lock(); REQUIRE_FALSE(m.try_lock()); lk.unlock(); REQUIRE(m.try_lock()); @@ -118,3 +118,21 @@ TEST_CASE("mutex scoped_lock unlock prior to scope exit", "[mutex]") coro::sync_wait(make_task(m)); } + +TEST_CASE("mutex lock", "[mutex]") +{ + coro::mutex m; + + auto make_task = [](coro::mutex& m) -> coro::task + { + { + co_await m.lock(); + REQUIRE_FALSE(m.try_lock()); + m.unlock(); + REQUIRE(m.try_lock()); + } + co_return; + }; + + coro::sync_wait(make_task(m)); +} diff --git a/test/test_queue.cpp b/test/test_queue.cpp new file mode 100644 index 00000000..0bd8cde8 --- /dev/null +++ b/test/test_queue.cpp @@ -0,0 +1,200 @@ +#include "catch_amalgamated.hpp" + +#include + +TEST_CASE("queue shutdown produce", "[queue]") +{ + coro::queue q{}; + + auto make_consumer_task = [](coro::queue& q) -> coro::task + { + auto expected = co_await q.pop(); + if (!expected) + { + co_return 0; + } + co_return std::move(*expected); + }; + + coro::sync_wait(q.shutdown_notify_waiters()); + coro::sync_wait(q.push(42)); + + auto result = coro::sync_wait(make_consumer_task(q)); + REQUIRE(result == 0); + REQUIRE(q.empty()); +} + +TEST_CASE("queue single produce consume", "[queue]") +{ + coro::queue q{}; + + auto make_consumer_task = [](coro::queue& q) -> coro::task + { + auto expected = co_await q.pop(); + if (!expected) + { + co_return 0; + } + co_return std::move(*expected); + }; + + coro::sync_wait(q.push(42)); + + auto result = coro::sync_wait(make_consumer_task(q)); + REQUIRE(result == 42); + REQUIRE(q.empty()); +} + +TEST_CASE("queue multiple produce and consume", "[queue]") +{ + const uint64_t ITERATIONS = 10; + coro::queue q{}; + + auto make_consumer_task = [](coro::queue& q) -> coro::task + { + auto expected = co_await q.pop(); + if (!expected) + { + co_return 0; + } + co_return std::move(*expected); + }; + + std::vector> tasks{}; + for (uint64_t i = 0; i < ITERATIONS; ++i) + { + coro::sync_wait(q.push(i)); + tasks.emplace_back(make_consumer_task(q)); + } + + auto results = coro::sync_wait(coro::when_all(std::move(tasks))); + for (uint64_t i = 0; i < ITERATIONS; ++i) + { + REQUIRE(results[i].return_value() == i); + } +} + +TEST_CASE("queue produce consume direct", "[queue]") +{ + const uint64_t ITERATIONS = 10; + coro::queue q{}; + coro::thread_pool tp{}; + + auto make_producer_task = [&ITERATIONS](coro::thread_pool& tp, coro::queue& q) -> coro::task + { + co_await tp.schedule(); + for (uint64_t i = 0; i < ITERATIONS; ++i) + { + co_await q.push(i); + co_await tp.yield(); + } + + co_await q.shutdown_notify_waiters_drain(tp); + + co_return 0; + }; + + auto make_consumer_task = [&ITERATIONS](coro::thread_pool& tp, coro::queue& q) -> coro::task + { + co_await tp.schedule(); + + uint64_t sum{0}; + + while (true) + { + auto expected = co_await q.pop(); + if (!expected) + { + co_return sum; + } + sum += *expected; + } + }; + + auto results = coro::sync_wait(coro::when_all(make_consumer_task(tp, q), make_producer_task(tp, q))); + REQUIRE(std::get<0>(results).return_value() == 45); + REQUIRE(std::get<1>(results).return_value() == 0); +} + +TEST_CASE("queue multithreaded produce consume", "[queue]") +{ + const uint64_t WORKERS = 3; + const uint64_t ITERATIONS = 100; + coro::queue q{}; + coro::thread_pool tp{}; + std::atomic counter{0}; + coro::latch wait{WORKERS}; + + auto make_producer_task = + [&ITERATIONS](coro::thread_pool& tp, coro::queue& q, coro::latch& w) -> coro::task + { + co_await tp.schedule(); + for (uint64_t i = 0; i < ITERATIONS; ++i) + { + co_await q.push(i); + co_await tp.yield(); + } + + w.count_down(); + co_return; + }; + + auto make_shutdown_task = [](coro::thread_pool& tp, coro::queue& q, coro::latch& w) -> coro::task + { + // Wait for all producers to complete. + co_await w; + + // Wake up all waiters. + co_await q.shutdown_notify_waiters_drain(tp); + }; + + auto make_consumer_task = + [&ITERATIONS]( + coro::thread_pool& tp, coro::queue& q, std::atomic& counter) -> coro::task + { + co_await tp.schedule(); + + while (true) + { + auto expected = co_await q.pop(); + if (!expected) + { + co_return; + } + counter += *expected; + } + }; + + std::vector> tasks{}; + for (uint64_t i = 0; i < WORKERS; ++i) + { + tasks.emplace_back(make_producer_task(tp, q, wait)); + tasks.emplace_back(make_consumer_task(tp, q, counter)); + } + tasks.emplace_back(make_shutdown_task(tp, q, wait)); + + coro::sync_wait(coro::when_all(std::move(tasks))); + REQUIRE(counter == 14850); +} + +TEST_CASE("queue stopped", "[queue]") +{ + coro::queue q{}; + + auto make_consumer_task = [](coro::queue& q) -> coro::task + { + auto expected = co_await q.pop(); + if (!expected) + { + co_return 0; + } + co_return std::move(*expected); + }; + + coro::sync_wait(q.push(42)); + coro::sync_wait(q.shutdown_notify_waiters()); + + auto result = coro::sync_wait(make_consumer_task(q)); + REQUIRE(result == 0); + REQUIRE(q.size() == 1); // The item was not consumed due to shutdown. +} diff --git a/test/test_ring_buffer.cpp b/test/test_ring_buffer.cpp index d9ed98d3..081d860c 100644 --- a/test/test_ring_buffer.cpp +++ b/test/test_ring_buffer.cpp @@ -55,8 +55,10 @@ TEST_CASE("ring_buffer many elements many producers many consumers", "[ring_buff coro::thread_pool tp{coro::thread_pool::options{.thread_count = 4}}; coro::ring_buffer rb{}; + coro::latch wait{producers}; - auto make_producer_task = [](coro::thread_pool& tp, coro::ring_buffer& rb) -> coro::task + auto make_producer_task = + [](coro::thread_pool& tp, coro::ring_buffer& rb, coro::latch& w) -> coro::task { co_await tp.schedule(); auto to_produce = iterations / producers; @@ -66,6 +68,16 @@ TEST_CASE("ring_buffer many elements many producers many consumers", "[ring_buff co_await rb.produce(i); } + w.count_down(); + co_return; + }; + + auto make_shutdown_task = + [](coro::thread_pool& tp, coro::ring_buffer& rb, coro::latch& w) -> coro::task + { + co_await tp.schedule(); + co_await w; + // Wait for all the values to be consumed prior to shutting down the ring buffer. while (!rb.empty()) { @@ -107,8 +119,9 @@ TEST_CASE("ring_buffer many elements many producers many consumers", "[ring_buff } for (size_t i = 0; i < producers; ++i) { - tasks.emplace_back(make_producer_task(tp, rb)); + tasks.emplace_back(make_producer_task(tp, rb, wait)); } + tasks.emplace_back(make_shutdown_task(tp, rb, wait)); coro::sync_wait(coro::when_all(std::move(tasks))); diff --git a/test/test_shared_mutex.cpp b/test/test_shared_mutex.cpp index 2a13cb2c..179d37d6 100644 --- a/test/test_shared_mutex.cpp +++ b/test/test_shared_mutex.cpp @@ -92,7 +92,7 @@ TEST_CASE("mutex many shared and exclusive waiters interleaved", "[shared_mutex] std::atomic read_value{false}; - auto make_exclusive_task = [](std::shared_ptr& s, + auto make_exclusive_task = [](std::shared_ptr s, coro::shared_mutex& m, std::atomic& read_value) -> coro::task { @@ -112,11 +112,11 @@ TEST_CASE("mutex many shared and exclusive waiters interleaved", "[shared_mutex] co_return; }; - auto make_shared_tasks_task = [](std::shared_ptr& s, + auto make_shared_tasks_task = [](std::shared_ptr s, coro::shared_mutex& m, std::atomic& read_value) -> coro::task { - auto make_shared_task = [](std::shared_ptr& s, + auto make_shared_task = [](std::shared_ptr s, coro::shared_mutex& m, std::atomic& read_value) -> coro::task { diff --git a/test/test_sync_wait.cpp b/test/test_sync_wait.cpp index 1a7bcbab..ee469535 100644 --- a/test/test_sync_wait.cpp +++ b/test/test_sync_wait.cpp @@ -108,3 +108,84 @@ TEST_CASE("sync_wait very rarely hangs issue-270", "[sync_wait]") REQUIRE(count > 0); } + +struct Foo +{ + static std::atomic m_copies; + static std::atomic m_moves; + int v; + Foo() { std::cerr << "Foo::Foo()" << std::endl; } + Foo(const Foo& other) : v(other.v) + { + std::cerr << "Foo::Foo(const Foo&)" << std::endl; + m_copies.fetch_add(1); + } + Foo(Foo&& other) : v(std::exchange(other.v, 0)) + { + std::cerr << "Foo::Foo(Foo&&)" << std::endl; + m_moves.fetch_add(1); + } + + auto operator=(const Foo& other) -> Foo& + { + std::cerr << "Foo::operator=(const Foo&) -> Foo&" << std::endl; + m_copies.fetch_add(1); + if (std::addressof(other) != this) + { + this->v = other.v; + } + return *this; + } + auto operator=(Foo&& other) -> Foo& + { + std::cerr << "Foo::operator=(Foo&&) -> Foo&" << std::endl; + m_moves.fetch_add(1); + if (std::addressof(other) != this) + { + this->v = std::exchange(other.v, 0); + } + return *this; + } + + ~Foo() + { + std::cerr << "Foo::~Foo()" + << "v=" << this->v << std::endl; + } +}; + +std::atomic Foo::m_copies = std::atomic{0}; +std::atomic Foo::m_moves = std::atomic{0}; + +TEST_CASE("issue-286", "[sync_wait]") +{ + /** + * The expected output from this should be the follow as of writing this test. + * https://github.com/jbaldwin/libcoro/issues/286 user @baderouaich reported + * that libcoro compared to other coroutine libraries sync_wait equivalent had + * and extra move. + * + * Foo::Foo() + * co_return foo; + * Foo::Foo(Foo &&) + * Foo::~Foo()v=0 + * Foo::Foo(Foo &&) + * Foo::~Foo()v=0 + * 1337 + * Foo::~Foo()v=1337 + */ + + auto getFoo = []() -> coro::task + { + Foo foo{}; + foo.v = 1337; + std::cerr << "co_return foo;" << std::endl; + co_return foo; + }; + + auto foo = coro::sync_wait(getFoo()); + std::cerr << foo.v << std::endl; + REQUIRE(foo.v == 1337); + REQUIRE(foo.m_copies == 0); + REQUIRE(foo.m_moves == 2); +} diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index 1ec8da54..448eb120 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -183,7 +183,7 @@ TEST_CASE("thread_pool high cpu usage when threadcount is greater than the numbe auto wait_for_task = [](coro::thread_pool& pool, std::chrono::seconds delay) -> coro::task<> { - auto sleep_for_task = [](std::chrono::seconds duration) -> coro::task + auto sleep_for_task = [](std::chrono::seconds duration) -> coro::task { std::this_thread::sleep_for(duration); co_return duration.count(); diff --git a/test/test_when_all.cpp b/test/test_when_all.cpp index 96faf2fe..a450a836 100644 --- a/test/test_when_all.cpp +++ b/test/test_when_all.cpp @@ -204,3 +204,25 @@ TEST_CASE("when_all each task throws", "[when_all]") } } } + +TEST_CASE("when_all return void", "[when_all]") +{ + coro::thread_pool tp{}; + std::atomic counter{0}; + + auto make_task = [](coro::thread_pool& tp, std::atomic& counter, uint64_t i) -> coro::task + { + co_await tp.schedule(); + counter += i; + co_return; + }; + + std::vector> tasks; + for (auto i = 1; i <= 4; ++i) + { + tasks.emplace_back(make_task(tp, counter, i)); + } + + coro::sync_wait(coro::when_all(std::move(tasks))); + REQUIRE(counter == 1 + 2 + 3 + 4); +} diff --git a/test/test_when_any.cpp b/test/test_when_any.cpp index c3a9248b..b5e0deb2 100644 --- a/test/test_when_any.cpp +++ b/test/test_when_any.cpp @@ -1,8 +1,10 @@ #include "catch_amalgamated.hpp" +#include #include #include #include +#include TEST_CASE("when_any two tasks", "[when_any]") { @@ -16,6 +18,88 @@ TEST_CASE("when_any two tasks", "[when_any]") REQUIRE(result == 1); } +TEST_CASE("when_any return void", "[when_any]") +{ + coro::thread_pool tp{}; + std::atomic counter{0}; + + auto make_task = [](coro::thread_pool& tp, std::atomic& counter, uint64_t i) -> coro::task + { + co_await tp.schedule(); + // One thread will win. + uint64_t expected = 0; + counter.compare_exchange_strong(expected, i); + co_return; + }; + + std::vector> tasks; + for (auto i = 1; i <= 4; ++i) + { + tasks.emplace_back(make_task(tp, counter, i)); + } + + coro::sync_wait(coro::when_any(std::move(tasks))); + REQUIRE(counter.load() > 0); +} + +TEST_CASE("when_any tuple return void (monostate)", "[when_any]") +{ + // This test needs to use a mutex to guarantee that the task that sets the counter + // is the first task to complete, otherwise there is a race condition if counter is atomic + // as the other task could complete first (unlikely but happens) and cause the REQUIRE statements + // between what is returned to mismatch from what is executed. + coro::mutex m{}; + coro::thread_pool tp{}; + std::atomic counter{0}; + + auto make_task_return_void = + [](coro::thread_pool& tp, coro::mutex& m, std::atomic& counter, uint64_t i) -> coro::task + { + co_await tp.schedule(); + co_await m.lock(); + if (counter == 0) + { + counter = i; + } + else + { + REQUIRE(counter == 2); + } + co_return std::monostate{}; + }; + + auto make_task = [](coro::thread_pool& tp, coro::mutex& m, std::atomic& counter, uint64_t i) -> coro::task + { + co_await tp.schedule(); + co_await m.lock(); + if (counter == 0) + { + counter = i; + } + else + { + REQUIRE(counter == 1); + } + co_return i; + }; + + auto result = + coro::sync_wait(coro::when_any(make_task_return_void(tp, m, counter, 1), make_task(tp, m, counter, 2))); + // Because of how coro::mutex works.. we need to release it after when_any returns since it symetrically transfers to the other coroutine task + // and can cause a race condition where the result does not equal the counter. This guarantees the task has fully completed before issuing REQUIREs. + m.unlock(); + + if (std::holds_alternative(result)) + { + REQUIRE(counter == 1); + } + else + { + REQUIRE(std::get(result) == 2); + REQUIRE(counter == 2); + } +} + #ifdef LIBCORO_FEATURE_NETWORKING TEST_CASE("when_any two tasks one long running", "[when_any]") @@ -49,8 +133,10 @@ TEST_CASE("when_any two tasks one long running with cancellation", "[when_any]") auto s = coro::io_scheduler::make_shared( coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); + std::atomic thrown{false}; + auto make_task = - [](std::shared_ptr s, std::stop_token stop_token, uint64_t amount) -> coro::task + [](std::shared_ptr s, std::stop_token stop_token, uint64_t amount, std::atomic& thrown) -> coro::task { co_await s->schedule(); try @@ -74,23 +160,28 @@ TEST_CASE("when_any two tasks one long running with cancellation", "[when_any]") { REQUIRE(amount == 1); REQUIRE(e.what() == std::string{"task was cancelled"}); + thrown = true; } co_return amount; }; std::vector> tasks{}; - tasks.emplace_back(make_task(s, stop_source.get_token(), 1)); - tasks.emplace_back(make_task(s, stop_source.get_token(), 2)); + tasks.emplace_back(make_task(s, stop_source.get_token(), 1, thrown)); + tasks.emplace_back(make_task(s, stop_source.get_token(), 2, thrown)); auto result = coro::sync_wait(coro::when_any(std::move(stop_source), std::move(tasks))); REQUIRE(result == 2); - std::this_thread::sleep_for(std::chrono::milliseconds{250}); + while (!thrown) + { + std::this_thread::sleep_for(std::chrono::milliseconds{250}); + } } TEST_CASE("when_any timeout", "[when_any]") { - auto scheduler = coro::io_scheduler::make_shared(); + auto scheduler = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 2}}); auto make_long_running_task = [](std::shared_ptr scheduler, std::chrono::milliseconds execution_time) -> coro::task @@ -100,16 +191,17 @@ TEST_CASE("when_any timeout", "[when_any]") co_return 1; }; - auto make_timeout_task = [](std::shared_ptr scheduler) -> coro::task + auto make_timeout_task = [](std::shared_ptr scheduler, + std::chrono::milliseconds timeout) -> coro::task { - co_await scheduler->schedule_after(std::chrono::milliseconds{100}); + co_await scheduler->schedule_after(timeout); co_return -1; }; { std::vector> tasks{}; tasks.emplace_back(make_long_running_task(scheduler, std::chrono::milliseconds{50})); - tasks.emplace_back(make_timeout_task(scheduler)); + tasks.emplace_back(make_timeout_task(scheduler, std::chrono::milliseconds{500})); auto result = coro::sync_wait(coro::when_any(std::move(tasks))); REQUIRE(result == 1); @@ -118,11 +210,128 @@ TEST_CASE("when_any timeout", "[when_any]") { std::vector> tasks{}; tasks.emplace_back(make_long_running_task(scheduler, std::chrono::milliseconds{500})); - tasks.emplace_back(make_timeout_task(scheduler)); + tasks.emplace_back(make_timeout_task(scheduler, std::chrono::milliseconds{50})); auto result = coro::sync_wait(coro::when_any(std::move(tasks))); REQUIRE(result == -1); } } +TEST_CASE("when_any io_scheduler::schedule(task, timeout)", "[when_any]") +{ + auto scheduler = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 2}}); + + auto make_task = [](std::shared_ptr scheduler, + std::chrono::milliseconds execution_time) -> coro::task + { + co_await scheduler->yield_for(execution_time); + co_return 1; + }; + + { + auto result = coro::sync_wait( + scheduler->schedule(make_task(scheduler, std::chrono::milliseconds{10}), std::chrono::milliseconds{50})); + REQUIRE(result.has_value()); + REQUIRE(result.value() == 1); + } + + { + auto result = coro::sync_wait( + scheduler->schedule(make_task(scheduler, std::chrono::milliseconds{50}), std::chrono::milliseconds{10})); + REQUIRE_FALSE(result.has_value()); + REQUIRE(result.error() == coro::timeout_status::timeout); + } +} + + #ifndef EMSCRIPTEN +TEST_CASE("when_any io_scheduler::schedule(task, timeout stop_token)", "[when_any]") +{ + auto scheduler = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 2}}); + + auto make_task = [](std::shared_ptr scheduler, + std::chrono::milliseconds execution_time, + std::stop_token stop_token) -> coro::task + { + co_await scheduler->yield_for(execution_time); + if (stop_token.stop_requested()) + { + co_return -1; + } + co_return 1; + }; + + { + std::stop_source stop_source{}; + auto result = coro::sync_wait(scheduler->schedule( + std::move(stop_source), + make_task(scheduler, std::chrono::milliseconds{10}, stop_source.get_token()), + std::chrono::milliseconds{50})); + REQUIRE(result.has_value()); + REQUIRE(result.value() == 1); + } + + { + std::stop_source stop_source{}; + auto result = coro::sync_wait(scheduler->schedule( + std::move(stop_source), + make_task(scheduler, std::chrono::milliseconds{50}, stop_source.get_token()), + std::chrono::milliseconds{10})); + REQUIRE_FALSE(result.has_value()); + REQUIRE(result.error() == coro::timeout_status::timeout); + } +} + #endif + +TEST_CASE("when_any tuple multiple", "[when_any]") +{ + using namespace std::chrono_literals; + + auto scheduler = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 4}}); + + auto make_task1 = [](std::shared_ptr scheduler, + std::chrono::milliseconds execution_time) -> coro::task + { + co_await scheduler->schedule_after(execution_time); + co_return 1; + }; + + auto make_task2 = [](std::shared_ptr scheduler, + std::chrono::milliseconds execution_time) -> coro::task + { + co_await scheduler->schedule_after(execution_time); + co_return 3.14; + }; + + auto make_task3 = [](std::shared_ptr scheduler, + std::chrono::milliseconds execution_time) -> coro::task + { + co_await scheduler->schedule_after(execution_time); + co_return std::string{"hello world"}; + }; + + { + auto result = coro::sync_wait( + coro::when_any(make_task1(scheduler, 10ms), make_task2(scheduler, 150ms), make_task3(scheduler, 150ms))); + REQUIRE(result.index() == 0); + REQUIRE(std::get<0>(result) == 1); + } + + { + auto result = coro::sync_wait( + coro::when_any(make_task1(scheduler, 150ms), make_task2(scheduler, 10ms), make_task3(scheduler, 150ms))); + REQUIRE(result.index() == 1); + REQUIRE(std::get<1>(result) == 3.14); + } + + { + auto result = coro::sync_wait( + coro::when_any(make_task1(scheduler, 150ms), make_task2(scheduler, 150ms), make_task3(scheduler, 10ms))); + REQUIRE(result.index() == 2); + REQUIRE(std::get<2>(result) == "hello world"); + } +} + #endif