diff --git a/.github/workflows/ci-emscripten.yml b/.github/workflows/ci-emscripten.yml index 6b98565f..7b09a79f 100644 --- a/.github/workflows/ci-emscripten.yml +++ b/.github/workflows/ci-emscripten.yml @@ -21,7 +21,8 @@ jobs: apt-get install -y \ cmake \ git \ - ninja-build + ninja-build \ + nodejs - name: Checkout uses: actions/checkout@v4 with: @@ -50,4 +51,4 @@ jobs: cd emsdk . ./emsdk_env.sh cd ../Release - node --experimental-wasm-eh ./test/libcoro_test.js + node --experimental-wasm-threads --experimental-wasm-bulk-memory ./test/libcoro_test.js diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml index 30662e56..003b1c7b 100644 --- a/.github/workflows/ci-macos.yml +++ b/.github/workflows/ci-macos.yml @@ -1,45 +1,45 @@ -name: ci-macos +# name: ci-macos -on: [pull_request, workflow_dispatch] +# on: [pull_request, workflow_dispatch] -jobs: - macos: - name: macos-12 - runs-on: macos-12 - strategy: - fail-fast: false - matrix: - clang_version: [17] - cxx_standard: [20, 23] - libcoro_feature_networking: [ {enabled: OFF, tls: OFF} ] - libcoro_build_shared_libs: [OFF, ON] - steps: - - name: Install Dependencies - run: | - brew update - brew install llvm@${{ matrix.clang_version }} - brew install ninja - - name: Checkout - uses: actions/checkout@v4 - with: - submodules: recursive - - name: Release - run: | - brew --prefix llvm@17 - mkdir Release - cd Release - cmake \ - -GNinja \ - -DCMAKE_BUILD_TYPE=Release \ - -DCMAKE_C_COMPILER=$(brew --prefix llvm@${{ matrix.clang_version }})/bin/clang-${{ matrix.clang_version }} \ - -DCMAKE_CXX_COMPILER=$(brew --prefix llvm@${{ matrix.clang_version }})/bin/clang-${{ matrix.clang_version }} \ - -DCMAKE_CXX_STANDARD=${{ matrix.cxx_standard }} \ - -DLIBCORO_FEATURE_NETWORKING=${{ matrix.libcoro_feature_networking.enabled }} \ - -DLIBCORO_FEATURE_TLS=${{ matrix.libcoro_feature_networking.tls }} \ - -DLIBCORO_BUILD_SHARED_LIBS=${{ matrix.libcoro_build_shared_libs }} \ - .. - cmake --build . --config Release - - name: Test - run: | - cd Release - ctest --build-config Release -VV +# jobs: +# macos: +# name: macos-12 +# runs-on: macos-12 +# strategy: +# fail-fast: false +# matrix: +# clang_version: [17] +# cxx_standard: [20, 23] +# libcoro_feature_networking: [ {enabled: OFF, tls: OFF} ] +# libcoro_build_shared_libs: [OFF, ON] +# steps: +# - name: Install Dependencies +# run: | +# brew update +# brew install llvm@${{ matrix.clang_version }} +# brew install ninja +# - name: Checkout +# uses: actions/checkout@v4 +# with: +# submodules: recursive +# - name: Release +# run: | +# brew --prefix llvm@17 +# mkdir Release +# cd Release +# cmake \ +# -GNinja \ +# -DCMAKE_BUILD_TYPE=Release \ +# -DCMAKE_C_COMPILER=$(brew --prefix llvm@${{ matrix.clang_version }})/bin/clang-${{ matrix.clang_version }} \ +# -DCMAKE_CXX_COMPILER=$(brew --prefix llvm@${{ matrix.clang_version }})/bin/clang-${{ matrix.clang_version }} \ +# -DCMAKE_CXX_STANDARD=${{ matrix.cxx_standard }} \ +# -DLIBCORO_FEATURE_NETWORKING=${{ matrix.libcoro_feature_networking.enabled }} \ +# -DLIBCORO_FEATURE_TLS=${{ matrix.libcoro_feature_networking.tls }} \ +# -DLIBCORO_BUILD_SHARED_LIBS=${{ matrix.libcoro_build_shared_libs }} \ +# .. +# cmake --build . --config Release +# - name: Test +# run: | +# cd Release +# ctest --build-config Release -VV diff --git a/CMakeLists.txt b/CMakeLists.txt index 226fd6c8..0cc35d3f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,6 +76,7 @@ set(LIBCORO_SOURCE_FILES include/coro/concepts/promise.hpp include/coro/concepts/range_of.hpp + include/coro/detail/task_self_deleting.hpp / src/detail/task_self_deleting.cpp include/coro/detail/void_value.hpp include/coro/attribute.hpp diff --git a/examples/coro_task_container.cpp b/examples/coro_task_container.cpp index 7ae29c55..800d1ae4 100644 --- a/examples/coro_task_container.cpp +++ b/examples/coro_task_container.cpp @@ -55,7 +55,7 @@ int main() tc.start(serve_client(std::move(client))); // Wait for all clients to complete before shutting down the tcp::server. - co_await tc.garbage_collect_and_yield_until_empty(); + co_await tc.yield_until_empty(); co_return; }; diff --git a/include/coro/detail/task_self_deleting.hpp b/include/coro/detail/task_self_deleting.hpp new file mode 100644 index 00000000..4c8ead9b --- /dev/null +++ b/include/coro/detail/task_self_deleting.hpp @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include + +namespace coro::detail +{ + +class task_self_deleting; + +class promise_self_deleting +{ +public: + promise_self_deleting(); + ~promise_self_deleting(); + + promise_self_deleting(const promise_self_deleting&) = delete; + promise_self_deleting(promise_self_deleting&&); + auto operator=(const promise_self_deleting&) -> promise_self_deleting& = delete; + auto operator=(promise_self_deleting&&) -> promise_self_deleting&; + + auto get_return_object() -> task_self_deleting; + auto initial_suspend() -> std::suspend_always; + auto final_suspend() noexcept -> std::suspend_never; + auto return_void() noexcept -> void; + auto unhandled_exception() -> void; + + auto task_container_size(std::atomic& task_container_size) -> void; +private: + /** + * The coro::task_container m_size member to decrement upon the coroutine completing. + */ + std::atomic* m_task_container_size{nullptr}; +}; + +/** + * This task will self delete upon completing. This is useful for usecase that the lifetime of the + * coroutine cannot be determined and it needs to 'self' delete. This is achieved by returning + * std::suspend_never from the promise::final_suspend which then based on the spec tells the + * coroutine to delete itself. This means any classes that use this task cannot have owning + * pointers or relationships to this class and must not use it past its completion. + * + * This class is currently only used by coro::task_container and will decrement its + * m_size internal count when the coroutine completes. + */ +class task_self_deleting +{ +public: + using promise_type = promise_self_deleting; + + explicit task_self_deleting(promise_self_deleting& promise); + ~task_self_deleting(); + + task_self_deleting(const task_self_deleting&) = delete; + task_self_deleting(task_self_deleting&&); + auto operator=(const task_self_deleting&) -> task_self_deleting& = delete; + auto operator=(task_self_deleting&&) -> task_self_deleting&; + + auto promise() -> promise_self_deleting& { return *m_promise; } + auto handle() -> std::coroutine_handle { return std::coroutine_handle::from_promise(*m_promise); } +private: + promise_self_deleting* m_promise{nullptr}; +}; + +} // namespace coro::detail diff --git a/include/coro/io_scheduler.hpp b/include/coro/io_scheduler.hpp index db48b7c7..6f9ca7ad 100644 --- a/include/coro/io_scheduler.hpp +++ b/include/coro/io_scheduler.hpp @@ -177,8 +177,10 @@ class io_scheduler : public std::enable_shared_from_this * longer have control over the scheduled task. * @param task The task to execute on this io_scheduler. It's lifetime ownership will be transferred * to this io_scheduler. + * @return True if the task was succesfully scheduled onto the io_scheduler. This can fail if the task + * is already completed or does not contain a valid coroutine anymore. */ - auto schedule(coro::task&& task) -> void; + auto schedule(coro::task&& task) -> bool; /** * Schedules the current task to run after the given amount of time has elapsed. @@ -247,7 +249,7 @@ class io_scheduler : public std::enable_shared_from_this */ auto resume(std::coroutine_handle<> handle) -> bool { - if (handle == nullptr) + if (handle == nullptr || handle.done()) { return false; } @@ -259,6 +261,7 @@ class io_scheduler : public std::enable_shared_from_this if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline) { + m_size.fetch_add(1, std::memory_order::release); { std::scoped_lock lk{m_scheduled_tasks_mutex}; m_scheduled_tasks.emplace_back(handle); @@ -309,13 +312,6 @@ class io_scheduler : public std::enable_shared_from_this */ auto shutdown() noexcept -> void; - /** - * Scans for completed coroutines and destroys them freeing up resources. This is also done on starting - * new tasks but this allows the user to cleanup resources manually. One usage might be making sure fds - * are cleaned up as soon as possible. - */ - auto garbage_collect() noexcept -> void; - private: /// The configuration options. options m_opts; diff --git a/include/coro/task_container.hpp b/include/coro/task_container.hpp index 373ea900..5af5c460 100644 --- a/include/coro/task_container.hpp +++ b/include/coro/task_container.hpp @@ -2,6 +2,7 @@ #include "coro/attribute.hpp" #include "coro/concepts/executor.hpp" +#include "coro/detail/task_self_deleting.hpp" #include "coro/task.hpp" #include @@ -9,6 +10,7 @@ #include #include #include +#include #include #include @@ -16,34 +18,24 @@ namespace coro { class io_scheduler; + template class task_container { public: - struct options - { - /// The number of task spots to reserve space for upon creating the container. - std::size_t reserve_size{8}; - /// The growth factor for task space in the container when capacity is full. - double growth_factor{2}; - }; - /** * @param e Tasks started in the container are scheduled onto this executor. For tasks created * from a coro::io_scheduler, this would usually be that coro::io_scheduler instance. * @param opts Task container options. */ task_container( - std::shared_ptr e, const options opts = options{.reserve_size = 8, .growth_factor = 2}) - : m_growth_factor(opts.growth_factor), - m_executor(std::move(e)) + std::shared_ptr e) + : m_executor(std::move(e)) { if (m_executor == nullptr) { throw std::runtime_error{"task_container cannot have a nullptr executor"}; } - - init(opts.reserve_size); } task_container(const task_container&) = delete; task_container(task_container&&) = delete; @@ -54,86 +46,37 @@ class task_container // This will hang the current thread.. but if tasks are not complete thats also pretty bad. while (!empty()) { - garbage_collect(); + // Sleep a bit so the cpu doesn't totally churn. + std::this_thread::sleep_for(std::chrono::milliseconds{10}); } } - enum class garbage_collect_t - { - /// Execute garbage collection. - yes, - /// Do not execute garbage collection. - no - }; - /** * Stores a user task and starts its execution on the container's thread pool. * @param user_task The scheduled user's task to store in this task container and start its execution. - * @param cleanup Should the task container run garbage collect at the beginning of this store - * call? Calling at regular intervals will reduce memory usage of completed - * tasks and allow for the task container to re-use allocated space. + * @return True if the task was succesfully started into the task container. This can fail if the task + * is already completed or does not contain a valid coroutine anymore. */ - auto start(coro::task&& user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void + auto start(coro::task&& user_task) -> bool { m_size.fetch_add(1, std::memory_order::relaxed); - std::size_t index{}; - { - std::unique_lock lk{m_mutex}; - - if (cleanup == garbage_collect_t::yes) - { - gc_internal(); - } - - // Only grow if completely full and attempting to add more. - if (m_free_task_indices.empty()) - { - grow(); - } - - // Reserve a free task index - index = m_free_task_indices.front(); - m_free_task_indices.pop(); - } - - // Store the task inside a cleanup task for self deletion. - m_tasks[index] = make_cleanup_task(std::move(user_task), index); - - // Start executing from the cleanup task to schedule the user's task onto the thread pool. - m_tasks[index].resume(); - } - - /** - * Garbage collects any tasks that are marked as deleted. This frees up space to be re-used by - * the task container for newly stored tasks. - * @return The number of tasks that were deleted. - */ - auto garbage_collect() -> std::size_t __ATTRIBUTE__(used) - { - std::scoped_lock lk{m_mutex}; - return gc_internal(); + auto task = make_self_deleting_task(std::move(user_task)); + // Hook the promise to decrement the size upon its self deletion of the coroutine frame. + task.promise().task_container_size(m_size); + return m_executor->resume(task.handle()); } /** * @return The number of active tasks in the container. */ - auto size() const -> std::size_t { return m_size.load(std::memory_order::relaxed); } + auto size() const -> std::size_t { return m_size.load(std::memory_order::acquire); } /** * @return True if there are no active tasks in the container. */ auto empty() const -> bool { return size() == 0; } - /** - * @return The capacity of this task manager before it will need to grow in size. - */ - auto capacity() const -> std::size_t - { - std::atomic_thread_fence(std::memory_order::acquire); - return m_tasks.size(); - } - /** * Will continue to garbage collect and yield until all tasks are complete. This method can be * co_await'ed to make it easier to wait for the task container to have all its tasks complete. @@ -141,132 +84,25 @@ class task_container * This does not shut down the task container, but can be used when shutting down, or if your * logic requires all the tasks contained within to complete, it is similar to coro::latch. */ - auto garbage_collect_and_yield_until_empty() -> coro::task + auto yield_until_empty() -> coro::task { while (!empty()) { - garbage_collect(); co_await m_executor->yield(); } } private: - /** - * Grows each task container by the growth factor. - * @return The position of the free index after growing. - */ - auto grow() -> void + auto make_self_deleting_task(task user_task) -> detail::task_self_deleting { - // Save an index at the current last item. - std::size_t new_size = m_tasks.size() * m_growth_factor; - for (std::size_t i = m_tasks.size(); i < new_size; ++i) - { - m_free_task_indices.emplace(i); - } - m_tasks.resize(new_size); - } - - /** - * Internal GC call, expects the public function to lock. - */ - auto gc_internal() -> std::size_t - { - std::size_t deleted{0}; - auto pos = std::begin(m_tasks_to_delete); - while (pos != std::end(m_tasks_to_delete)) - { - // Skip tasks that are still running or have yet to start. - if (!m_tasks[*pos].is_ready()) - { - pos++; - continue; - } - // Destroy the cleanup task. - m_tasks[*pos].destroy(); - // Put the deleted position at the end of the free indexes list. - m_free_task_indices.emplace(*pos); - // Remove index from tasks to delete - m_tasks_to_delete.erase(pos++); - // Indicate a task was deleted. - ++deleted; - } - m_size.fetch_sub(deleted, std::memory_order::relaxed); - return deleted; - } - - /** - * Encapsulate the users tasks in a cleanup task which marks itself for deletion upon - * completion. Simply co_await the users task until its completed and then mark the given - * position within the task manager as being deletable. The scheduler's next iteration - * in its event loop will then free that position up to be re-used. - * - * This function will also unconditionally catch all unhandled exceptions by the user's - * task to prevent the scheduler from throwing exceptions. - * @param user_task The user's task. - * @param index The index where the task data will be stored in the task manager. - * @return The user's task wrapped in a self cleanup task. - */ - auto make_cleanup_task(task user_task, std::size_t index) -> coro::task - { - // Immediately move the task onto the executor. - co_await m_executor->schedule(); - - try - { - // Await the users task to complete. - co_await user_task; - } - catch (const std::exception& e) - { - // TODO: what would be a good way to report this to the user...? Catching here is required - // since the co_await will unwrap the unhandled exception on the task. - // The user's task should ideally be wrapped in a catch all and handle it themselves, but - // that cannot be guaranteed. - std::cerr << "coro::task_container user_task had an unhandled exception e.what()= " << e.what() << "\n"; - } - catch (...) - { - // don't crash if they throw something that isn't derived from std::exception - std::cerr << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n"; - } - - // Destroy the user task since it is complete. This is important to do so outside the lock - // since the user could schedule a new task from the destructor (tls::client does this interanlly) - // causing a deadlock. - user_task.destroy(); - - { - std::scoped_lock lk{m_mutex}; - m_tasks_to_delete.emplace_back(index); - } - + co_await user_task; co_return; } - /// Mutex for safely mutating the task containers across threads, expected usage is within - /// thread pools for indeterminate lifetime requests. - std::mutex m_mutex{}; /// The number of alive tasks. std::atomic m_size{}; - /// Maintains the lifetime of the tasks until they are completed. - std::vector> m_tasks{}; - /// The full set of free indicies into `m_tasks`. - std::queue m_free_task_indices{}; - /// The set of tasks that have completed and need to be deleted. - std::list m_tasks_to_delete{}; - /// The amount to grow the containers by when all spaces are taken. - double m_growth_factor{}; /// The executor to schedule tasks that have just started. std::shared_ptr m_executor{nullptr}; - - auto init(std::size_t reserve_size) -> void - { - m_tasks.resize(reserve_size); - for (std::size_t i = 0; i < reserve_size; ++i) - { - m_free_task_indices.emplace(i); - } - } }; } // namespace coro diff --git a/include/coro/thread_pool.hpp b/include/coro/thread_pool.hpp index c8e8c0bf..dbc663d5 100644 --- a/include/coro/thread_pool.hpp +++ b/include/coro/thread_pool.hpp @@ -134,7 +134,7 @@ class thread_pool /** * Schedules any coroutine handle that is ready to be resumed. * @param handle The coroutine handle to schedule. - * @return True if the coroutine is resumed, false if its a nullptr. + * @return True if the coroutine is resumed, false if its a nullptr or the coroutine is already done. */ auto resume(std::coroutine_handle<> handle) noexcept -> bool; diff --git a/src/detail/task_self_deleting.cpp b/src/detail/task_self_deleting.cpp new file mode 100644 index 00000000..9b6cb5aa --- /dev/null +++ b/src/detail/task_self_deleting.cpp @@ -0,0 +1,98 @@ +#include "coro/detail/task_self_deleting.hpp" + +#include + +namespace coro::detail +{ + +promise_self_deleting::promise_self_deleting() +{ + (void)m_task_container_size; // make codacy happy +} + +promise_self_deleting::~promise_self_deleting() +{ + +} + +promise_self_deleting::promise_self_deleting(promise_self_deleting&& other) + : m_task_container_size(std::exchange(other.m_task_container_size, nullptr)) +{ + +} + +auto promise_self_deleting::operator=(promise_self_deleting&& other) -> promise_self_deleting& +{ + if (std::addressof(other) != nullptr) + { + m_task_container_size = std::exchange(other.m_task_container_size, nullptr); + } + + return *this; +} + +auto promise_self_deleting::get_return_object() -> task_self_deleting +{ + return task_self_deleting{*this}; +} + +auto promise_self_deleting::initial_suspend() -> std::suspend_always +{ + return std::suspend_always{}; +} + +auto promise_self_deleting::final_suspend() noexcept -> std::suspend_never +{ + // Notify the task_container that this coroutine has completed. + if (m_task_container_size != nullptr) + { + m_task_container_size->fetch_sub(1); + } + + // By not suspending this lets the coroutine destroy itself. + return std::suspend_never{}; +} + +auto promise_self_deleting::return_void() noexcept -> void +{ + // no-op +} + +auto promise_self_deleting::unhandled_exception() -> void +{ + // The user cannot access the promise anyways, ignore the exception. +} + +auto promise_self_deleting::task_container_size(std::atomic& task_container_size) -> void +{ + m_task_container_size = &task_container_size; +} + +task_self_deleting::task_self_deleting(promise_self_deleting& promise) + : m_promise(&promise) +{ + +} + +task_self_deleting::~task_self_deleting() +{ + +} + +task_self_deleting::task_self_deleting(task_self_deleting&& other) + : m_promise(other.m_promise) +{ + +} + +auto task_self_deleting::operator=(task_self_deleting&& other) -> task_self_deleting& +{ + if (std::addressof(other) != this) + { + m_promise = other.m_promise; + } + + return *this; +} + +} // namespace coro::detail diff --git a/src/io_scheduler.cpp b/src/io_scheduler.cpp index 7b7f3b9a..7c1cb370 100644 --- a/src/io_scheduler.cpp +++ b/src/io_scheduler.cpp @@ -95,10 +95,10 @@ auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::siz return size(); } -auto io_scheduler::schedule(coro::task&& task) -> void +auto io_scheduler::schedule(coro::task&& task) -> bool { auto* ptr = static_cast*>(m_owned_tasks); - ptr->start(std::move(task)); + return ptr->start(std::move(task)); } auto io_scheduler::schedule_after(std::chrono::milliseconds amount) -> coro::task @@ -219,12 +219,6 @@ auto io_scheduler::shutdown() noexcept -> void } } -auto io_scheduler::garbage_collect() noexcept -> void -{ - auto* ptr = static_cast*>(m_owned_tasks); - ptr->garbage_collect(); -} - auto io_scheduler::process_events_manual(std::chrono::milliseconds timeout) -> void { bool expected{false}; diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 3c33b0fe..1e6dbe57 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -1,7 +1,5 @@ #include "coro/thread_pool.hpp" -#include - namespace coro { thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp) @@ -35,28 +33,32 @@ thread_pool::~thread_pool() auto thread_pool::schedule() -> operation { + m_size.fetch_add(1, std::memory_order::release); if (!m_shutdown_requested.load(std::memory_order::acquire)) { - m_size.fetch_add(1, std::memory_order::release); return operation{*this}; } - - throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks."); + else + { + m_size.fetch_sub(1, std::memory_order::release); + throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks."); + } } auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool { - if (handle == nullptr) + if (handle == nullptr || handle.done()) { return false; } + m_size.fetch_add(1, std::memory_order::release); if (m_shutdown_requested.load(std::memory_order::acquire)) { + m_size.fetch_sub(1, std::memory_order::release); return false; } - m_size.fetch_add(1, std::memory_order::release); schedule_impl(handle); return true; } @@ -138,7 +140,7 @@ auto thread_pool::executor(std::size_t idx) -> void auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void { - if (handle == nullptr) + if (handle == nullptr || handle.done()) { return; } diff --git a/test/bench.cpp b/test/bench.cpp index f102fe26..7c85a74d 100644 --- a/test/bench.cpp +++ b/test/bench.cpp @@ -430,6 +430,7 @@ TEST_CASE("benchmark tcp::server echo server thread pool", "[benchmark]") std::cerr << "server co_await wait_for_clients\n"; co_await wait_for_clients; + std::cerr << "server co_return\n"; co_return; }; @@ -580,8 +581,10 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]") } s.live_clients--; + std::cerr << "s.live_clients=" << s.live_clients << std::endl; if (s.live_clients == 0) { + std::cerr << "s.wait_for_clients.set()" << std::endl; s.wait_for_clients.set(); } co_return; @@ -611,7 +614,9 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]") } } + std::cerr << "co_await s.wait_for_clients\n"; co_await s.wait_for_clients; + std::cerr << "make_server_task co_return\n"; co_return; }; @@ -671,8 +676,11 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]") { server s{}; s.id = server_id++; + std::cerr << "coro::sync_wait(make_server_task(s));\n"; coro::sync_wait(make_server_task(s)); + std::cerr << "server.scheduler->shutdown()\n"; s.scheduler->shutdown(); + std::cerr << "server thread exiting\n"; }}); } @@ -695,8 +703,11 @@ TEST_CASE("benchmark tcp::server echo server inline", "[benchmark]") { c.tasks.emplace_back(make_client_task(c)); } + std::cerr << "coro::sync_wait(coro::when_all(std::move(c.tasks)));\n"; coro::sync_wait(coro::when_all(std::move(c.tasks))); + std::cerr << "client.scheduler->shutdown()\n"; c.scheduler->shutdown(); + std::cerr << "client thread exiting\n"; }}); } diff --git a/test/test_io_scheduler.cpp b/test/test_io_scheduler.cpp index 9e5d73cd..ff519bdf 100644 --- a/test/test_io_scheduler.cpp +++ b/test/test_io_scheduler.cpp @@ -752,4 +752,28 @@ TEST_CASE("io_scheduler task throws after resume", "[io_scheduler]") }; REQUIRE_THROWS(coro::sync_wait(make_thrower())); -} \ No newline at end of file +} + +TEST_CASE("issue-287", "[io_scheduler]") +{ + const int ITERATIONS = 200000; + + std::atomic g_count = 0; + auto scheduler = coro::io_scheduler::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); + + auto task = [](std::atomic& count) -> coro::task { + count++; + co_return; + }; + + for (int i = 0; i < ITERATIONS; ++i) + { + REQUIRE(scheduler->schedule(task(g_count))); + } + + scheduler->shutdown(); + + std::cerr << "g_count = \t" << g_count.load() << std::endl; + REQUIRE(g_count.load() == ITERATIONS); +} diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index d0ebf2b1..cf72be44 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -231,4 +231,30 @@ TEST_CASE("thread_pool high cpu usage when threadcount is greater than the numbe coro::thread_pool pool{coro::thread_pool::options{.thread_count = 3}}; coro::sync_wait( coro::when_all(wait_for_task(pool, std::chrono::seconds{1}), wait_for_task(pool, std::chrono::seconds{3}))); -} \ No newline at end of file +} + +TEST_CASE("issue-287", "[thread_pool]") +{ + const int ITERATIONS = 200000; + + std::atomic g_count = 0; + auto thread_pool = std::make_shared( + coro::thread_pool::options{.thread_count = 1} + ); + auto task_container = coro::task_container{thread_pool}; + + auto task = [](std::atomic& count) -> coro::task { + count++; + co_return; + }; + + for (int i = 0; i < ITERATIONS; ++i) + { + REQUIRE(task_container.start(task(g_count))); + } + + thread_pool->shutdown(); + + std::cerr << "g_count = \t" << g_count.load() << std::endl; + REQUIRE(g_count.load() == ITERATIONS); +} diff --git a/test/test_when_all.cpp b/test/test_when_all.cpp index 6ead3221..9a5caea4 100644 --- a/test/test_when_all.cpp +++ b/test/test_when_all.cpp @@ -170,3 +170,38 @@ TEST_CASE("when_all use std::ranges::view", "[when_all]") auto result = coro::sync_wait(make_runner_task()); REQUIRE(result == (1 + 2 + 3)); } + +TEST_CASE("when_all each task throws", "[when_all]") +{ + coro::thread_pool tp{}; + + auto make_task = [&](uint64_t i) -> coro::task + { + co_await tp.schedule(); + if (i % 2 == 0) + { + throw std::runtime_error{std::to_string(i)}; + } + co_return i; + }; + + std::vector> tasks; + for (auto i = 1; i <= 4; ++i) + { + tasks.emplace_back(make_task(i)); + } + + auto output_tasks = coro::sync_wait(coro::when_all(std::move(tasks))); + for (auto i = 1; i <= 4; ++i) + { + auto& task = output_tasks.at(i - 1); + if (i % 2 == 0) + { + REQUIRE_THROWS(task.return_value()); + } + else + { + REQUIRE((int)task.return_value() == i); + } + } +}