8000 fix: Add sending queue to ng web server by kuznetsss · Pull Request #2273 · XRPLF/clio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix: Add sending queue to ng web server #2273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
20 changes: 14 additions & 6 deletions src/web/ng/SubscriptionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "web/ng/SubscriptionContext.hpp"

#include "util/Assert.hpp"
#include "util/Taggable.hpp"
#include "web/SubscriptionContextInterface.hpp"

Expand Down Expand Up @@ -50,24 +51,31 @@ SubscriptionContext::SubscriptionContext(
{
}

SubscriptionContext::~SubscriptionContext()
{
ASSERT(disconnected_, "SubscriptionContext must be disconnected before destroying");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's UB if the exception is thrown, but I think we should be fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We throw exceptions from ASSERT() only in tests. I don't think there is a better way to check this, but I will think how to get rid of exceptions in tests for ASSERT().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added WithMockAssertNoThrow fixture for this case in tests.

}

void
SubscriptionContext::send(std::shared_ptr<std::string> message)
{
if (disconnected_)
if (disconnected_ or gotError_)
return;

if (maxSendQueueSize_.has_value() and tasksGroup_.size() >= *maxSendQueueSize_) {
tasksGroup_.spawn(yield_, [this](boost::asio::yield_context innerYield) {
connection_.get().close(innerYield);
});
disconnected_ = true;
gotError_ = true;
return;
}

tasksGroup_.spawn(yield_, [this, message = std::move(message)](boost::asio::yield_context innerYield) {
auto const maybeError = connection_.get().sendBuffer(boost::asio::buffer(*message), innerYield);
if (maybeError.has_value() and errorHandler_(*maybeError, connection_))
tasksGroup_.spawn(yield_, [this, message = std::move(message)](boost::asio::yield_context innerYield) mutable {
auto const maybeError = connection_.get().sendShared(std::move(message), innerYield);
if (maybeError.has_value() and errorHandler_(*maybeError, connection_)) {
connection_.get().close(innerYield);
gotError_ = true;
}
});
}

Expand All @@ -92,8 +100,8 @@ SubscriptionContext::apiSubversion() const
void
SubscriptionContext::disconnect(boost::asio::yield_context yield)
{
onDisconnect_(this);
disconnected_ = true;
onDisconnect_(this);
tasksGroup_.asyncWait(yield);
}

Expand Down
3 changes: 3 additions & 0 deletions src/web/ng/SubscriptionContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class SubscriptionContext : public web::SubscriptionContextInterface {

boost::signals2::signal<void(SubscriptionContextInterface*)> onDisconnect_;
std::atomic_bool disconnected_{false};
std::atomic_bool gotError_{false};

/**
* @brief The API version of the web stream client.
Expand All @@ -87,6 +88,8 @@ class SubscriptionContext : public web::SubscriptionContextInterface {
ErrorHandler errorHandler
);

~SubscriptionContext() override;

/**
* @brief Send message to the client
* @note This method does nothing after disconnected() was called.
Expand Down
30 changes: 23 additions & 7 deletions src/web/ng/impl/HttpConnection.hpp
6D47
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "web/ng/Request.hpp"
#include "web/ng/Response.hpp"
#include "web/ng/impl/Concepts.hpp"
#include "web/ng/impl/SendingQueue.hpp"
#include "web/ng/impl/WsConnection.hpp"

#include <boost/asio/buffer.hpp>
Expand Down Expand Up @@ -75,6 +76,10 @@
StreamType stream_;
std::optional<boost::beast::http::request<boost::beast::http::string_body>> request_;
std::chrono::steady_clock::duration timeout_{kDEFAULT_TIMEOUT};

using MessageType = boost::beast::http::response<boost::beast::http::string_body>;
SendingQueue<MessageType> sendingQueue_;

bool closed_{false};

public:
Expand All @@ -85,7 +90,12 @@
util::TagDecoratorFactory const& tagDecoratorFactory
)
requires IsTcpStream<StreamType>
: UpgradableConnection(std::move(ip), std::move(buffer), tagDecoratorFactory), stream_{std::move(socket)}
: UpgradableConnection(std::move(ip), std::move(buffer), tagDecoratorFactory)
, stream_{std::move(socket)}
, sendingQueue_([this](MessageType const& message, auto&& yield) {
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);
boost::beast::http::async_write(stream_, message, yield);
})
{
}

Expand All @@ -99,9 +109,20 @@
requires IsSslTcpStream<StreamType>
: UpgradableConnection(std::move(ip), std::move(buffer), tagDecoratorFactory)
, stream_{std::move(socket), sslCtx}
, sendingQueue_([this](MessageType const& message, auto&& yield) {
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);
boost::beast::http::async_write(stream_, message, yield);

Check warning on line 114 in src/web/ng/impl/HttpConnection.hpp

View check run for this annotation

Codecov / codecov/patch

src/web/ng/impl/HttpConnection.hpp#L113-L114

Added lines #L113 - L114 were not covered by tests
})
{
}

HttpConnection(HttpConnection&& other) = delete;
HttpConnection&
operator=(HttpConnection&& other) = delete;
HttpConnection(HttpConnection const& other) = delete;
HttpConnection&
operator=(HttpConnection const& other) = delete;

std::optional<Error>
sslHandshake(boost::asio::yield_context yield)
requires IsSslTcpStream<StreamType>
Expand Down Expand Up @@ -130,12 +151,7 @@
boost::asio::yield_context yield
) override
{
boost::system::error_code error;
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);
boost::beast::http::async_write(stream_, response, yield[error]);
if (error)
return error;
return std::nullopt;
return sendingQueue_.send(std::move(response), yield);
}

void
Expand Down
73 changes: 73 additions & 0 deletions src/web/ng/impl/SendingQueue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.

Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#pragma once

#include "web/ng/Error.hpp"

#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/system/detail/error_code.hpp>

#include <functional>
#include <optional>
#include <queue>

namespace web::ng::impl {

template <typename T>
class SendingQueue {
public:
using Sender = std::function<void(T const&, boost::asio::basic_yield_context<boost::asio::any_io_executor>)>;

private:
std::queue<T> queue_;
Sender sender_;
Error error_;
bool isSending_{false};

public:
SendingQueue(Sender sender) : sender_{std::move(sender)}
{
}

std::optional<Error>
send(T message, boost::asio::yield_context yield)
{
if (error_)
return error_;

queue_.push(std::move(message));
if (isSending_)
return std::nullopt;

isSending_ = true;
while (not queue_.empty() and not error_) {
auto const responseToSend = std::move(queue_.front());
queue_.pop();
sender_(responseToSend, yield[error_]);
}
isSending_ = false;
if (error_)
return error_;
return std::nullopt;
}
};

} // namespace web::ng::impl
42 changes: 31 additions & 11 deletions src/web/ng/impl/WsConnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

#pragma once

#include "util/OverloadSet.hpp"
#include "util/Taggable.hpp"
#include "util/build/Build.hpp"
#include "web/ng/Connection.hpp"
#include "web/ng/Error.hpp"
#include "web/ng/Request.hpp"
#include "web/ng/Response.hpp"
#include "web/ng/impl/Concepts.hpp"
#include "web/ng/impl/SendingQueue.hpp"

#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
Expand All @@ -49,6 +51,7 @@
#include <optional>
#include <string>
#include <utility>
#include <variant>

namespace web::ng::impl {

Expand All @@ -57,13 +60,17 @@ class WsConnectionBase : public Connection {
using Connection::Connection;

virtual std::optional<Error>
sendBuffer(boost::asio::const_buffer buffer, boost::asio::yield_context yield) = 0;
sendShared(std::shared_ptr<std::string> message, boost::asio::yield_context yield) = 0;
};

template <typename StreamType>
class WsConnection : public WsConnectionBase {
boost::beast::websocket::stream<StreamType> stream_;
boost::beast::http::request<boost::beast::http::string_body> initialRequest_;

using MessageType = std::variant<Response, std::shared_ptr<std::string>>;
SendingQueue<MessageType> sendingQueue_;

bool closed_{false};

public:
Expand All @@ -77,10 +84,30 @@ class WsConnection : public WsConnectionBase {
: WsConnectionBase(std::move(ip), std::move(buffer), tagDecoratorFactory)
, stream_(std::move(stream))
, initialRequest_(std::move(initialRequest))
, sendingQueue_{[this](MessageType const& message, auto&& yield) {
boost::asio::const_buffer const buffer = std::visit(
util::OverloadSet{
[](Response const& r) -> boost::asio::const_buffer { return r.asWsResponse(); },
[](std::shared_ptr<std::string> const& m) -> boost::asio::const_buffer {
return boost::asio::buffer(*m);
}
},
message
);
stream_.async_write(buffer, yield);
}}
{
setupWsStream();
}

~WsConnection() override = default;
WsConnection(WsConnection&&) = delete;
WsConnection&
operator=(WsConnection&&) = delete;
WsConnection(WsConnection const&) = delete;
WsConnection&
operator=(WsConnection const&) = delete;

std::optional<Error>
performHandshake(boost::asio::yield_context yield)
{
Expand All @@ -98,16 +125,9 @@ class WsConnection : public WsConnectionBase {
}

std::optional<Error>
sendBuffer(boost::asio::const_buffer buffer, boost::asio::yield_context yield) override
sendShared(std::shared_ptr<std::string> message, boost::asio::yield_context yield) override
{
boost::beast::websocket::stream_base::timeout timeoutOption{};
stream_.get_option(timeoutOption);

boost::system::error_code error;
stream_.async_write(buffer, yield[error]);
if (error)
return error;
return std::nullopt;
return sendingQueue_.send(std::move(message), yield);
}

void
Expand All @@ -123,7 +143,7 @@ class WsConnection : public WsConnectionBase {
std::optional<Error>
send(Response response, boost::asio::yield_context yield) override
{
return sendBuffer(response.asWsResponse(), yield);
return sendingQueue_.send(std::move(response), yield);
}

std::expected<Request, Error>
Expand Down
8 changes: 8 additions & 0 deletions tests/common/util/MockAssert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

#include "util/Assert.hpp"

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include <string>
#include <string_view>

Expand All @@ -42,4 +45,9 @@ WithMockAssert::throwOnAssert(std::string_view m)
throw MockAssertException{.message = std::string{m}};
}

WithMockAssertNoThrow::~WithMockAssertNoThrow()
{
::util::impl::OnAssert::resetAction();
}

} // namespace common::util
46 changes: 32 additions & 14 deletions tests/common/util/MockAssert.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#pragma once

#include "util/Assert.hpp" // IWYU pragma: keep

#include <gmock/gmock.h>
#include <gtest/gtest.h>

Expand All @@ -41,19 +43,35 @@ class WithMockAssert : virtual public testing::Test {
throwOnAssert(std::string_view m);
};

class WithMockAssertNoThrow : virtual public testing::Test {
public:
~WithMockAssertNoThrow() override;
};

} // namespace common::util

#define EXPECT_CLIO_ASSERT_FAIL(statement) EXPECT_THROW(statement, MockAssertException)

#define EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE(statement, message_regex) \
EXPECT_THROW( \
{ \
try { \
statement; \
} catch (common::util::WithMockAssert::MockAssertException const& e) { \
EXPECT_THAT(e.message, testing::ContainsRegex(message_regex)); \
throw; \
} \
}, \
common::util::WithMockAssert::MockAssertException \
)
#define EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE(statement, message_regex) \
if (dynamic_cast<common::util::WithMockAssert*>(this) != nullptr) { \
EXPECT_THROW( \
{ \
try { \
statement; \
} catch (common::util::WithMockAssert::MockAssertException const& e) { \
EXPECT_THAT(e.message, testing::ContainsRegex(message_regex)); \
throw; \
} \
}, \
common::util::WithMockAssert::MockAssertException \
); \
} else if (dynamic_cast<common::util::WithMockAssertNoThrow*>(this) != nullptr) { \
testing::StrictMock<testing::MockFunction<void(std::string_view)>> callMock; \
::util::impl::OnAssert::setAction([&callMock](std::string_view m) { callMock.Call(m); }); \
EXPECT_CALL(callMock, Call(testing::ContainsRegex(message_regex))); \
statement; \
::util::impl::OnAssert::resetAction(); \
} else { \
std::cerr << "EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE() can be used only inside test body" << std::endl; \
std::terminate(); \
}

#define EXPECT_CLIO_ASSERT_FAIL(statement) EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE(statement, ".*")
Loading
0