8000 Handle when a task is co_awaited by multiple awaiters by danvratil · Pull Request #127 · qcoro/qcoro · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Handle when a task is co_awaited by multiple awaiters #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions qcoro/qcorotask.h
8000
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class TaskFinalSuspend {
* \param[in] awaitingCoroutine handle of the coroutine that is co_awaiting the current
* coroutine (continuation).
*/
explicit TaskFinalSuspend(std::coroutine_handle<> awaitingCoroutine)
: mAwaitingCoroutine(awaitingCoroutine) {}
explicit TaskFinalSuspend(const std::vector<std::coroutine_handle<>> &awaitingCoroutines)
: mAwaitingCoroutines(awaitingCoroutines) {}

//! Returns whether the just finishing coroutine should do final suspend or not
/*!
Expand All @@ -69,9 +69,10 @@ class TaskFinalSuspend {
void await_suspend(std::coroutine_handle<Promise> finishedCoroutine) noexcept {
auto &promise = finishedCoroutine.promise();

if (promise.mResumeAwaiter.exchange(true, std::memory_order_acq_rel)) {
promise.mAwaitingCoroutine.resume();
for (auto &awaiter : mAwaitingCoroutines) {
awaiter.resume();
}
mAwaitingCoroutines.clear();

// The handle will be destroyed here only if the associated Task has already been destroyed
if (promise.setDestroyHandle()) {
Expand All @@ -87,8 +88,7 @@ class TaskFinalSuspend {
constexpr void await_resume() const noexcept {}

private:
//! Handle of the coroutine co_awaiting the current coroutine.
std::coroutine_handle<> mAwaitingCoroutine;
std::vector<std::coroutine_handle<>> mAwaitingCoroutines;
};

//! Base class for the \c Task<T> promise_type.
Expand Down Expand Up @@ -148,7 +148,7 @@ class TaskPromiseBase {
* This decides what should happen when the coroutine is finished.
*/
auto final_suspend() const noexcept {
return TaskFinalSuspend{mAwaitingCoroutine};
return TaskFinalSuspend{mAwaitingCoroutines};
}

//! Called by co_await to obtain an Awaitable for type \c T.
Expand Down Expand Up @@ -225,13 +225,12 @@ class TaskPromiseBase {
* represented by this promise. When our coroutine finishes, it's
* our job to resume the awaiting coroutine.
*/
bool setAwaitingCoroutine(std::coroutine_handle<> awaitingCoroutine) {
mAwaitingCoroutine = awaitingCoroutine;
return !mResumeAwaiter.exchange(true, std::memory_order_acq_rel);
void addAwaitingCoroutine(std::coroutine_handle<> awaitingCoroutine) {
mAwaitingCoroutines.push_back(awaitingCoroutine);
}

bool hasAwaitingCoroutine() const {
return mAwaitingCoroutine != nullptr;
return !mAwaitingCoroutines.empty();
}

bool setDestroyHandle() noexcept {
Expand All @@ -242,9 +241,7 @@ class TaskPromiseBase {
friend class TaskFinalSuspend;

//! Handle of the coroutine that is currently co_awaiting this Awaitable
std::coroutine_handle<> mAwaitingCoroutine;
//! Indicates whether the awaiter should be resumed when it tries to co_await on us.
std::atomic<bool> mResumeAwaiter{false};
std::vector<std::coroutine_handle<>> mAwaitingCoroutines;

//! Indicates whether we can destroy the coroutine handle
std::atomic<bool> mDestroyHandle{false};
Expand Down Expand Up @@ -406,8 +403,8 @@ class TaskAwaiterBase {
* co_awaited coroutine has finished synchronously and the co_awaiting coroutine doesn't
* have to suspend.
*/
bool await_suspend(std::coroutine_handle<> awaitingCoroutine) noexcept {
return mAwaitedCoroutine.promise().setAwaitingCoroutine(awaitingCoroutine);
void await_suspend(std::coroutine_handle<> awaitingCoroutine) noexcept {
mAwaitedCoroutine.promise().addAwaitingCoroutine(awaitingCoroutine);
}

protected:
Expand Down
31 changes: 31 additions & 0 deletions tests/qcorotask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,35 @@ class QCoroTaskTest : public QCoro::TestObject<QCoroTaskTest>
QCOMPARE(result, QStringLiteral("42"));
}

A0A2 QCoro::Task<> testMultipleAwaiters_coro(QCoro::TestContext) {
auto task = timer(100ms);

bool called = false;
// Internally co_awaits task
task.then([&called]() {
called = true;
});

co_await task;

QCORO_VERIFY(called);
}

QCoro::Task<> testMultipleAwaitersSync_coro(QCoro::TestContext ctx) {
ctx.setShouldNotSuspend();

auto task = []() -> QCoro::Task<> { co_return; }();

bool called = false;
task.then([&called]() {
called = true;
});

co_await task;

QCORO_VERIFY(called);
}

private Q_SLOTS:
addTest(SimpleCoroutine)
addTest(CoroutineValue)
Expand All @@ -355,6 +384,8 @@ private Q_SLOTS:
addTest(ThenError)
addTest(ThenErrorWithValue)
addThenTest(ImplicitArgumentConversion)
addTest(MultipleAwaiters)
addTest(MultipleAwaitersSync)

// See https://github.com/danvratil/qcoro/issues/24
void testEarlyReturn()
Expand Down
0