-
Notifications
You must be signed in to change notification settings - Fork 37.4k
net: disconnect inside AttemptToEvictConnection #27912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The following sections might be updated with supplementary metadata relevant to reviewers and maintainers. Code CoverageFor detailed information about the code coverage, see the test coverage report. ReviewsSee the guideline for information on the review process. ConflictsReviewers, this pull request conflicts with the following ones:
If you consider this pull request important, please also help to review the conflicting pull requests. Ideally, start with the one that should be merged first. |
src/net.cpp
Outdated
// Delete disconnected nodes | ||
std::list<CNode*> nodes_disconnected_copy = m_nodes_disconnected; | ||
for (CNode* pnode : nodes_disconnected_copy) | ||
{ | ||
// Destroy the object only after other threads have stopped using it. | ||
if (pnode->GetRefCount() <= 0) { | ||
// Prevent two threads trying to delete the same node: set nRefCount to -1 first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think m_disconnect_mutex
already prevents this from happening
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm you may be right, I might have gone too far with that. What I was trying to protect against was checking the number of references in one instruction, and permitting another thread to increase the refCount
in the meantime before we remove the node in the subsequent lines. But, it looks like we don't have any functions which would increase the refCount
during normal operation (only incremented on new connections), so probably unnecessary here.
If I'm honest, I kind of prefer this belt-and-suspenders way myself, but would be easily persuaded that it's superfluous or worse than current behaviour (useless extra locking).
I felt a bit nevous about nesting m_disconnect_mutex
inside of m_nodes_mutex
, but it seemed like it was slightly preferable to just locking m_nodes_mutex
for the whole of DisconnectNodes()
as that's used in many other operations. I don't think there is any way StopNodes()
and DisconnectNodes()
can lock each other here though. Would you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CNode reference counting is a total nightmare and we should (and have in the past: #10738) try to get rid of it at some point.
permitting another thread to increase the refCount in the meantime before we remove the node in the subsequent lines.
This is actually possible with and without your patch and the way we protect against this is by not calling CNode::AddRef
after nodes where moved to the disconnection queue.
if (pnode->nRefCount.compare_exchange_strong(expectedRefCount, -1)) {
// Theoretically, nothing stops a different thread from calling `AddRef()` while we are here
m_nodes_disconnected.remove(pnode);
DeleteNode(pnode);
}
But, it looks like we don't have any functions which would increase the refCount during normal operation (only incremented on new connections), so probably unnecessary here.
Our RAII helper NodesSnapshot
does call AddRef
during normal operation (e.g. before processing messages) but NodesSnapshot
doesn't take a snapshot of m_nodes_disconnected
so it's fine.
I don't think there is any way StopNodes() and DisconnectNodes() can lock each other here though.
Agreed, as long as you always lock in the same order (which afaict you do).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @dergoegge. Don't think StopNodes()
can race with DisconnectNodes()
since StopThreads()
is called first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested it and this patch does the fix the issue
src/net.cpp
Outdated
{ | ||
// remove from m_nodes | ||
m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end()); | ||
LOCK(m_disconnect_mutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to lock m_disconnect_mutex
for the duration of the node_copy
loop or to move it right before m_nodes_disconnected
? Not sure about the performance hit of repeatedly locking+unlocking in the latter case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's as currently implemented (perhap GitHub diff throwing you again)?
Lines 1125 to 1143 in 2d33e62
{ | |
LOCK(m_disconnect_mutex); | |
for (CNode* pnode : nodes_copy) { | |
if (pnode->fDisconnect) { | |
// remove from m_nodes | |
m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end()); | |
// release outbound grant (if any) | |
pnode->grantOutbound.Release(); | |
// close socket and cleanup | |
pnode->CloseSocketDisconnect(); | |
// hold in disconnected pool until all refs are released | |
pnode->Release(); | |
m_nodes_disconnected.push_back(pnode); | |
} | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant like rightttt before m_nodes_disconnect.push_back(pnode)
. But don't think it matters so feel free to ignore
src/net.cpp
Outdated
nPrevNodeCount = nodes_size; | ||
if (m_client_interface) { | ||
m_client_interface->NotifyNumConnectionsChanged(nodes_size); | ||
if(nodes_size != nPrevNodeCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nPrevNodeCount
needs a mutex now since it's called from ThreadI2PSocketHandler
also
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignore this, I was confused by github's indentation
src/net.cpp
Outdated
} | ||
} | ||
if (should_notify && m_client_interface) { | ||
m_client_interface->NotifyNumConnectionsChanged(nodes_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not familiar with the GUI code, but could this call to m_client_interface->NotifyNumConnectionsChanged
race?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also not familiar with the GUI code, but IIUC I don't think this should race as the lines above lock the mutex and update nPrevNodeCount
inside it, so even if two threads called this function, t1 would lock first and update the nPrevNodeCount``, and when it unlocked (and notified) t2 would find that
nodes_size()was equal to
nPrevNodeCountand so would exit without calling
NotifyNumConnectionsChanged`.
But this should still send a notification per connection count change on either thread.
That said, it's easy enough to move this block inside the m_nodes_mutex
block to be safer, and I may do just that..
Could change the function to:
void CConnman::NotifyNumConnectionsChanged()
{
size_t nodes_size;
bool should_notify{false};
{
LOCK(m_nodes_mutex);
nodes_size = m_nodes.size();
if(nodes_size != nPrevNodeCount) {
nPrevNodeCount = nodes_size;
should_notify = true;
}
if (should_notify && m_client_interface) {
m_client_interface->NotifyNumConnectionsChanged(nodes_size);
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible that:
- t1 locks
m_nodes_mutex
, checksnodes_size
and updatesnPrevNodeCount
&should_notify
- t1 unlocks the mutex and something else gets scheduled by the CPU
- the connection count is updated again
- t2 locks
m_nodes_mutex
and setsshould_notify
- t2 unlocks the mutex and calls
NotifyNumConnectionsChanged
- t1 gets scheduled and calls
NotifyNumConnectionsChanged
My concern here was whether it's ok if NotifyNumConnectionsChanged
is called simultaneously from 2 threads? I think it'd be good to have nPrevNodeCount
to have a GUARDED_BY(m_nodes_mutex)
annotation
src/net.cpp
Outdated
// Delete disconnected nodes | ||
std::list<CNode*> nodes_disconnected_copy = m_nodes_disconnected; | ||
for (CNode* pnode : nodes_disconnected_copy) | ||
{ | ||
// Destroy the object only after other threads have stopped using it. | ||
if (pnode->GetRefCount() <= 0) { | ||
// Prevent two threads trying to delete the same node: set nRefCount to -1 first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @dergoegge. Don't think StopNodes()
can race with DisconnectNodes()
since StopThreads()
is called first
src/net.cpp
Outdated
SetSyscallSandboxPolicy(SyscallSandboxPolicy::NET); | ||
while (!interruptNet) { | ||
DisconnectNodes(); | ||
NotifyNumConnectionsChanged(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think NotifyNumConnectionsChanged
is necessary here, but I could understand the consistency argument
src/net.cpp
Outdated
@@ -1390,6 +1396,18 @@ void CConnman::ThreadSocketHandler() | |||
} | |||
} | |||
|
|||
void CConnman::ThreadI2PSocketHandler() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
github won't let me comment on untouched lines, but minor nit: threadI2PAcceptIncoming
could be renamed to threadI2PSocketHandler
?
src/net.cpp
Outdated
return; | ||
} | ||
|
||
if (!advertising_listen_addr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting advertising_listen_addr
to false each time at the top of the function causes each I2P connection to log an AddLocal(...)
line. I think this could be used to perform a disk-fill attack (my debug.log grew several MB in several minutes) . Prior to this patch, it was set once before the while(!interrupt)
so it wouldn't trigger every time on a new connection
1d28dfb
to
21dae96
Compare
Thanks @Crypt-iQ for the review. I pushed 21dae96 which includes most of the suggested changes:
|
I acknowledge the problem described in #27843: I intended I did not look deeply in the code in this PR, but I gather it drags some high level logic (cap number of connections) to What about changing --- i/src/net.cpp
+++ w/src/net.cpp
@@ -927,12 +927,13 @@ bool CConnman::AttemptToEvictConnection()
}
LOCK(m_nodes_mutex);
for (CNode* pnode : m_nodes) {
if (pnode->GetId() == *node_id_to_evict) {
LogPrint(BCLog::NET, "selected %s connection for eviction peer=%d; disconnecting\n", pnode->ConnectionType
pnode->fDisconnect = true;
+ DisconnectNodes();
return true;
}
}
return false;
} (modulo trying to avoid the recursive lock of |
Yes I agree and like this approach better. I had created a |
f8da79c
to
a3903cc
Compare
OK I've pushed a new set of changes which now disconnects nodes synchronously inside of @Crypt-iQ I'd be curious if you still see these new changes as resolving the issue in #27843? I havent' gotten your test patch working to my satisfaction yet (or at least, I don't see positive eviction candidate selection during it so it wouldn't overflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure yet what would be the best approach to resolve the issues below.
One way would be to hold m_nodes_disconnected_mutex
for the entire iteration of the m_nodes_disconnected
list. But this would mean to call DeleteNode()
and thus PeerManagerImpl::FinalizeNode()
under that mutex. The latter locks cs_main
:-(
src/net.h
Outdated
@@ -1043,7 +1045,8 @@ class CConnman | |||
std::vector<std::string> m_added_nodes GUARDED_BY(m_added_nodes_mutex); | |||
mutable Mutex m_added_nodes_mutex; | |||
std::vector<CNode*> m_nodes GUARDED_BY(m_nodes_mutex); | |||
std::list<CNode*> m_nodes_disconnected; | |||
mutable RecursiveMutex m_nodes_disconnected_mutex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This need not be RecursiveMutex
?
mutable RecursiveMutex m_nodes_disconnected_mutex; | |
mutable Mutex m_nodes_disconnected_mutex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I've used a GlobalMutex
for this now. I never Recursive behaviour but was getting clang static analysis errors without. Looking at sync.h it seems that GlobalMutex
should be the correct type for this I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GlobalMutex
silences some of the thread safety analysis and was introduced to overcome some limitations on those, is supposed to be used only for mutexes that are defined globally. I agree it is confusing. I don't like it and have a plan to remove GlobalMutex
but it is stuck at #25390.
[patch] change to Mutex
diff --git i/src/net.cpp w/src/net.cpp
index c2160e945f..c8ee95a3d0 100644
--- i/src/net.cpp
+++ w/src/net.cpp
@@ -886,12 +886,14 @@ size_t CConnman::SocketSendData(CNode& node) const
* to forge. In order to partition a node the attacker must be
* simultaneously better at all of them than honest peers.
* If we find a candidate perform the eviction.
*/
bool CConnman::AttemptToEvictConnection()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
std::vector<NodeEvictionCandidate> vEvictionCandidates;
{
LOCK(m_nodes_mutex);
for (const CNode* node : m_nodes) {
if (node->fDisconnect)
@@ -937,12 +939,14 @@ bool CConnman::AttemptToEvictConnection()
return true;
}
return false;
}
void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
struct sockaddr_storage sockaddr;
socklen_t len = sizeof(sockaddr);
auto sock = hListenSocket.sock->Accept((struct sockaddr*)&sockaddr, &len);
CAddress addr;
if (!sock) {
@@ -1107,23 +1111,27 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
OpenNetworkConnection(CAddress(), false, &grant, address.c_str(), conn_type);
return true;
}
void CConnman::DeleteDisconnectedNode(CNode* pnode)
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
// Destroy the object only after other threads have stopped using it.
// Prevent double free by setting nRefCount to -1 before delete.
int expectedRefCount = 0;
if (pnode->nRefCount.compare_exchange_strong(expectedRefCount, -1)) {
WITH_LOCK(m_nodes_disconnected_mutex, m_nodes_disconnected.remove(pnode));
DeleteNode(pnode);
}
}
void CConnman::DisconnectAndReleaseNode(CNode* pnode)
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
LOCK(m_nodes_mutex);
if (std::find(m_nodes.begin(), m_nodes.end(), pnode) != m_nodes.end()) {
// remove from m_nodes
m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end());
@@ -1138,12 +1146,14 @@ void CConnman::DisconnectAndReleaseNode(CNode* pnode)
WITH_LOCK(m_nodes_disconnected_mutex, m_nodes_disconnected.push_back(pnode));
}
}
void CConnman::DisconnectNodes()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
{
LOCK(m_nodes_mutex);
if (!fNetworkActive) {
// Disconnect any connected nodes
for (CNode* pnode : m_nodes) {
@@ -1270,12 +1280,13 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
return events_per_sock;
}
void CConnman::SocketHandler()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
AssertLockNotHeld(m_total_bytes_sent_mutex);
Sock::EventsPerSock events_per_sock;
{
const NodesSnapshot snap{*this, /*shuffle=*/false};
@@ -1381,12 +1392,14 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
if (InactivityCheck(*pnode)) pnode->fDisconnect = true;
}
}
void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock)
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
for (const ListenSocket& listen_socket : vhListenSocket) {
if (interruptNet) {
return;
}
const auto it = events_per_sock.find(listen_socket.sock);
if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) {
@@ -1394,12 +1407,13 @@ void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock
}
}
}
void CConnman::ThreadSocketHandler()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
AssertLockNotHeld(m_total_bytes_sent_mutex);
while (!interruptNet)
{
DisconnectNodes();
NotifyNumConnectionsChanged();
@@ -2093,12 +2107,14 @@ void CConnman::ThreadMessageHandler()
fMsgProcWake = false;
}
}
void CConnman::ThreadI2PAcceptIncoming()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
static constexpr auto err_wait_begin = 1s;
static constexpr auto err_wait_cap = 5min;
auto err_wait = err_wait_begin;
bool advertising_listen_addr = false;
i2p::Connection conn;
@@ -2481,12 +2497,14 @@ void CConnman::StopThreads()
if (threadSocketHandler.joinable())
threadSocketHandler.join();
}
void CConnman::StopNodes()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
if (fAddressesInitialized) {
DumpAddresses();
fAddressesInitialized = false;
if (m_use_addrman_outgoing) {
// Anchor connections are only dumped during clean shutdown.
diff --git i/src/net.h w/src/net.h
index d5833d7e2d..d45f3c606f 100644
--- i/src/net.h
+++ w/src/net.h
@@ -760,15 +760,17 @@ public:
~CConnman();
bool Start(CScheduler& scheduler, const Options& options) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !m_added_nodes_mutex, !m_addr_fetches_mutex, !mutexMsgProc);
void StopThreads();
- void StopNodes();
- void Stop()
+ void StopNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
+ void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex)
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
StopThreads();
StopNodes();
};
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
bool GetNetworkActive() const { return fNetworkActive; };
@@ -917,31 +919,31 @@ private:
void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex);
void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex);
void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex);
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
- void ThreadI2PAcceptIncoming();
- void AcceptConnection(const ListenSocket& hListenSocket);
+ void ThreadI2PAcceptIncoming() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
+ void AcceptConnection(const ListenSocket& hListenSocket) EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
/**
* Create a `CNode` object from a socket that has just been accepted and add the node to
* the `m_nodes` member.
* @param[in] sock Connected socket to communicate with the peer.
* @param[in] permission_flags The peer's permissions.
* @param[in] addr_bind The address and port at our side of the connection.
* @param[in] addr The address and port at the peer's side of the connection.
*/
void CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
NetPermissionFlags permission_flags,
const CAddress& addr_bind,
- const CAddress& addr);
+ const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
- void DeleteDisconnectedNode(CNode* pnode);
- void DisconnectAndReleaseNode(CNode* pnode);
- void DisconnectNodes();
+ void DeleteDisconnectedNode(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
+ void DisconnectAndReleaseNode(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
+ void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
void NotifyNumConnectionsChanged();
/** Return true if the peer is inactive and should be disconnected. */
bool InactivityCheck(const CNode& node) const;
/**
* Generate a collection of sockets to check for IO readiness.
@@ -950,13 +952,13 @@ private:
*/
Sock::EventsPerSock GenerateWaitSockets(Span<CNode* const> nodes);
/**
* Check connected and listening sockets for IO readiness and process them accordingly.
*/
- void SocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
+ void SocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex, !m_total_bytes_sent_mutex, !mutexMsgProc);
/**
* Do the read/write for connected sockets that are ready for IO.
* @param[in] nodes Nodes to process. The socket of each node is checked against `what`.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
@@ -965,15 +967,15 @@ private:
EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
/**
* Accept incoming connections, one from each read-ready listening socket.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
- void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock);
+ void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
- void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
+ void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex, !m_total_bytes_sent_mutex, !mutexMsgProc);
void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex);
uint64_t CalculateKeyedNetGroup(const CAddress& ad) const;
CNode* FindNode(const CNetAddr& ip);
CNode* FindNode(const CSubNet& subNet);
@@ -983,13 +985,13 @@ private:
/**
* Determine whether we're already connected to a given address, in order to
* avoid initiating duplicate connections.
*/
bool AlreadyConnectedToAddress(const CAddress& addr);
- bool AttemptToEvictConnection();
+ bool AttemptToEvictConnection() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const;
void DeleteNode(CNode* pnode);
NodeId GetNewNodeId();
@@ -1042,13 +1044,13 @@ private:
const NetGroupManager& m_netgroupman;
std::deque<std::string> m_addr_fetches GUARDED_BY(m_addr_fetches_mutex);
Mutex m_addr_fetches_mutex;
std::vector<std::string> m_added_nodes GUARDED_BY(m_added_nodes_mutex);
mutable Mutex m_added_nodes_mutex;
std::vector<CNode*> m_nodes GUARDED_BY(m_nodes_mutex);
- GlobalMutex m_nodes_disconnected_mutex;
+ Mutex m_nodes_disconnected_mutex;
std::list<CNode*> m_nodes_disconnected GUARDED_BY(m_nodes_disconnected_mutex);
mutable RecursiveMutex m_nodes_mutex;
std::atomic<NodeId> nLastNodeId{0};
unsigned int nPrevNodeCount{0};
/**
WITH_LOCK(m_nodes_disconnected_mutex, disconnected_nodes_copy = m_nodes_disconnected); | ||
for (CNode* pnode : disconnected_nodes_copy) | ||
{ | ||
// Delete disconnected nodes | ||
std::list<CNode*> nodes_disconnected_copy = m_nodes_disconnected; | ||
for (CNode* pnode : nodes_disconnected_copy) | ||
{ | ||
// Destroy the object only after other threads have stopped using it. | ||
if (pnode->GetRefCount() <= 0) { | ||
m_nodes_disconnected.remove(pnode); | ||
DeleteNode(pnode); | ||
} | ||
} | ||
DeleteDisconnectedNode(pnode); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can end up with a double-free if two threads concurrently execute it.
- Thread1 makes a copy of
m_nodes_disconnected
and releasesm_nodes_disconnected_mutex
- Thread2 does the same
- Thread1 starts iterating on its own copy and calls
DeleteDisconnectedNode()
on the first element which callsDeleteNode()
which callsdelete pnode;
- Thread2 does the same on its own copy, a second
delete
on the sameCNode
object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, this is what I tried to prevent against in an earlier version, but in that version I think it was superfluous, however here it would be appropriate to use so that only one thread would actually perform the deletion ever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeleteNode()
now gated behind if (pnode->nRefCount.compare_exchange_strong(expectedRefCount, -1))
so it can only be called once (by a single thread).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love this lock free pattern!
The variable used as a guard (nRefCount
) has to be outside of the object being protected. Otherwise if two threads execute it concurrently the first one may swap from 0 to -1 and delete
the object. Then the second thread will end up reading freed memory when it tries to use nRefCount
.
This should be safe:
// Delete disconnected nodes. Call DeleteNode() without holding m_nodes_mutex or m_nodes_disconnected_mutex.
std::vector<CNode*> to_delete;
{
LOCK(m_nodes_disconnected_mutex);
for (auto it = m_nodes_disconnected.begin(); it != m_nodes_disconnected.end();) {
CNode* node = *it;
if (node->GetRefCount() == 0) {
it = m_nodes_disconnected.erase(it);
to_delete.push_back(node);
} else {
++it;
}
}
}
for (CNode* node : to_delete) {
DeleteNode(node);
}
for (CNode* pnode : m_nodes_disconnected) { | ||
std::list<CNode*> disconnected_nodes{}; | ||
WITH_LOCK(m_nodes_disconnected_mutex, disconnected_nodes = m_nodes_disconnected); | ||
for (CNode* pnode : disconnected_nodes) { | ||
DeleteNode(pnode); | ||
} | ||
m_nodes_disconnected.clear(); | ||
WITH_LOCK(m_nodes_disconnected_mutex, m_nodes_disconnected.clear()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same double-free as above (even though when this code in StopNodes()
is executed, then the other threads that could access m_nodes_disconnected
should have been exited by StopThreads()
already, but better not rely on that).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with lock: copy x to temp
process temp without lock
with lock: clear x <-- some element may have been inserted between the locks and it will be removed without processing
better do:
{
LOCK(m_nodes_disconnected_mutex);
disconnected_nodes = m_nodes_disconnected;
m_nodes_disconnected.clear();
}
for (CNode* pnode : disconnected_nodes) {
DeleteNode(pnode);
}
this will avoid the double-free as well
if (evicted_node) { | ||
DisconnectAndReleaseNode(evicted_node); | ||
DeleteDisconnectedNode(evicted_node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be called concurrently by two threads for the same CNode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DisconnectAndReleaseNode()
is safe because its body is protected by m_nodes_mutex
. But DeleteDisconnectedNode()
is not - two threads could race to call delete
on the CNode
object. It looks ok to iterate the whole m_disconnected_nodes
here to avoid that, like in the other place.
a3903cc
to
70f9af5
Compare
This will permit safely removing nodes from multiple threads
Add DeleteDisconnectedNode() and DisconnectAndReleaseNode() functions. Permit disconnection and deletion of a single node.
Disconnect nodes using new helpers both in DisconnectNodes() and as part of AttemptToEvictConnection(). Previously it was possible for multiple new connections to be accepted in one thread (e.g. ThreadI2PAccept) and evicted nodes to not be processed and removed atomically, instead waiting on ThreadSocketHandler() to run DisconnectNodes(), allowing overflow of maxconnections in the interim. Under the new behvaiour, as part of accepting a new connection we disconnect and delete the node inside of AttemptToEvictConnection(), dropping the new connection if this doesn't succeed. This ensures we don't exceed maxconnections.
70f9af5
to
912d8e4
Compare
Sorry I have been a bit short on time recently, but let me know when to test it and I'll do it |
Here is a patch on top of this PR that should address all concerns above about thread safetyness. [patch] thread safediff --git i/src/net.cpp w/src/net.cpp
index c2160e945f..058f615dd5 100644
--- i/src/net.cpp
+++ w/src/net.cpp
@@ -886,12 +886,14 @@ size_t CConnman::SocketSendData(CNode& node) const
* to forge. In order to partition a node the attacker must be
* simultaneously better at all of them than honest peers.
* If we find a candidate perform the eviction.
*/
bool CConnman::AttemptToEvictConnection()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
std::vector<NodeEvictionCandidate> vEvictionCandidates;
{
LOCK(m_nodes_mutex);
for (const CNode* node : m_nodes) {
if (node->fDisconnect)
@@ -916,33 +918,37 @@ bool CConnman::AttemptToEvictConnection()
}
}
const std::optional<NodeId> node_id_to_evict = SelectNodeToEvict(std::move(vEvictionCandidates));
if (!node_id_to_evict) {
return false;
}
- CNode* evicted_node{nullptr};
+ bool disconnected{false};
{
LOCK(m_nodes_mutex);
- for (CNode* pnode : m_nodes) {
- if (pnode->GetId() == *node_id_to_evict) {
- LogPrint(BCLog::NET, "selected %s connection for eviction peer=%d; disconnecting\n", pnode->ConnectionTypeAsString(), pnode->GetId());
- pnode->fDisconnect = true;
- evicted_node = pnode;
+ for (auto it = m_nodes.begin(); it != m_nodes.end(); ++it) {
+ CNode* node = *it;
+ if (node->GetId() == *node_id_to_evict) {
+ LogPrint(BCLog::NET, "selected %s connection for eviction peer=%d; disconnecting\n", node->ConnectionTypeAsString(), node->GetId());
+ node->fDisconnect = true;
+ DisconnectAndReleaseNode(node);
+ disconnected = true;
+ m_nodes.erase(it);
break;
}
}
}
- if (evicted_node) {
- DisconnectAndReleaseNode(evicted_node);
- DeleteDisconnectedNode(evicted_node);
+ if (disconnected) {
+ DeleteDisconnectedNodes();
return true;
}
return false;
}
void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
struct sockaddr_storage sockaddr;
socklen_t len = sizeof(sockaddr);
auto sock = hListenSocket.sock->Accept((struct sockaddr*)&sockaddr, &len);
CAddress addr;
if (!sock) {
@@ -1105,45 +1111,56 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
if (!grant) return false;
OpenNetworkConnection(CAddress(), false, &grant, address.c_str(), conn_type);
return true;
}
-void CConnman::DeleteDisconnectedNode(CNode* pnode)
+void CConnman::DeleteDisconnectedNodes()
{
- // Destroy the object only after other threads have stopped using it.
- // Prevent double free by setting nRefCount to -1 before delete.
- int expectedRefCount = 0;
- if (pnode->nRefCount.compare_exchange_strong(expectedRefCount, -1)) {
- WITH_LOCK(m_nodes_disconnected_mutex, m_nodes_disconnected.remove(pnode));
- DeleteNode(pnode);
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
+ // Delete disconnected nodes. Call DeleteNode() without holding m_nodes_mutex or m_nodes_disconnected_mutex.
+ std::vector<CNode*> to_delete;
+
+ {
+ LOCK(m_nodes_disconnected_mutex);
+ for (auto it = m_nodes_disconnected.begin(); it != m_nodes_disconnected.end();) {
+ CNode* node = *it;
+ if (node->GetRefCount() == 0) {
+ it = m_nodes_disconnected.erase(it);
+ to_delete.push_back(node);
+ } else {
+ ++it;
+ }
+ }
+ }
+
+ for (CNode* node : to_delete) {
+ DeleteNode(node);
}
}
-void CConnman::DisconnectAndReleaseNode(CNode* pnode)
+void CConnman::DisconnectAndReleaseNode(CNode* node)
{
- LOCK(m_nodes_mutex);
- if (std::find(m_nodes.begin(), m_nodes.end(), pnode) != m_nodes.end()) {
-
- // remove from m_nodes
- m_nodes.erase(remove(m_nodes.begin(), m_nodes.end(), pnode), m_nodes.end());
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
- // release outbound grant (if any)
- pnode->grantOutbound.Release();
+ // release outbound grant (if any)
+ node->grantOutbound.Release();
- // close socket and cleanup
- pnode->CloseSocketDisconnect();
+ // close socket and cleanup
+ node->CloseSocketDisconnect();
- // hold in disconnected pool until all refs are released
- pnode->Release();
- WITH_LOCK(m_nodes_disconnected_mutex, m_nodes_disconnected.push_back(pnode));
- }
+ // hold in disconnected pool until all refs are released
+ node->Release();
+ WITH_LOCK(m_nodes_disconnected_mutex, m_nodes_disconnected.push_back(node));
}
void CConnman::DisconnectNodes()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
{
LOCK(m_nodes_mutex);
if (!fNetworkActive) {
// Disconnect any connected nodes
for (CNode* pnode : m_nodes) {
@@ -1152,28 +1169,25 @@ void CConnman::DisconnectNodes()
pnode->fDisconnect = true;
}
}
}
// Disconnect unused nodes
- std::vector<CNode*> nodes_copy = m_nodes;
- for (CNode* pnode : nodes_copy)
- {
- if (pnode->fDisconnect)
- {
- DisconnectAndReleaseNode(pnode);
- }
- }
- }
- // Delete disconnected nodes
- std::list<CNode*> disconnected_nodes_copy{};
- WITH_LOCK(m_nodes_disconnected_mutex, disconnected_nodes_copy = m_nodes_disconnected);
- for (CNode* pnode : disconnected_nodes_copy)
- {
- DeleteDisconnectedNode(pnode);
+ m_nodes.erase(std::remove_if(m_nodes.begin(),
+ m_nodes.end(),
+ [this](CNode* node) {
+
10000
if (node->fDisconnect) {
+ DisconnectAndReleaseNode(node);
+ return true;
+ }
+ return false;
+ }),
+ m_nodes.end());
}
+
+ DeleteDisconnectedNodes();
}
void CConnman::NotifyNumConnectionsChanged()
{
size_t nodes_size;
{
@@ -1270,12 +1284,13 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
return events_per_sock;
}
void CConnman::SocketHandler()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
AssertLockNotHeld(m_total_bytes_sent_mutex);
Sock::EventsPerSock events_per_sock;
{
const NodesSnapshot snap{*this, /*shuffle=*/false};
@@ -1381,12 +1396,14 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
if (InactivityCheck(*pnode)) pnode->fDisconnect = true;
}
}
void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock)
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
for (const ListenSocket& listen_socket : vhListenSocket) {
if (interruptNet) {
return;
}
const auto it = events_per_sock.find(listen_socket.sock);
if (it != events_per_sock.end() && it->second.occurred & Sock::RECV) {
@@ -1394,12 +1411,13 @@ void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock
}
}
}
void CConnman::ThreadSocketHandler()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
AssertLockNotHeld(m_total_bytes_sent_mutex);
while (!interruptNet)
{
DisconnectNodes();
NotifyNumConnectionsChanged();
@@ -2093,12 +2111,14 @@ void CConnman::ThreadMessageHandler()
fMsgProcWake = false;
}
}
void CConnman::ThreadI2PAcceptIncoming()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
static constexpr auto err_wait_begin = 1s;
static constexpr auto err_wait_cap = 5min;
auto err_wait = err_wait_begin;
bool advertising_listen_addr = false;
i2p::Connection conn;
@@ -2481,12 +2501,14 @@ void CConnman::StopThreads()
if (threadSocketHandler.joinable())
threadSocketHandler.join();
}
void CConnman::StopNodes()
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
if (fAddressesInitialized) {
DumpAddresses();
fAddressesInitialized = false;
if (m_use_addrman_outgoing) {
// Anchor connections are only dumped during clean shutdown.
diff --git i/src/net.h w/src/net.h
index d5833d7e2d..6cdf8cb462 100644
--- i/src/net.h
+++ w/src/net.h
@@ -760,15 +760,17 @@ public:
~CConnman();
bool Start(CScheduler& scheduler, const Options& options) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !m_added_nodes_mutex, !m_addr_fetches_mutex, !mutexMsgProc);
void StopThreads();
- void StopNodes();
- void Stop()
+ void StopNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
+ void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex)
{
+ AssertLockNotHeld(m_nodes_disconnected_mutex);
+
StopThreads();
StopNodes();
};
void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
bool GetNetworkActive() const { return fNetworkActive; };
@@ -917,31 +919,31 @@ private:
void ThreadOpenAddedConnections() EXCLUSIVE_LOCKS_REQUIRED(!m_added_nodes_mutex, !m_unused_i2p_sessions_mutex);
void AddAddrFetch(const std::string& strDest) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex);
void ProcessAddrFetch() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_unused_i2p_sessions_mutex);
void ThreadOpenConnections(std::vector<std::string> connect) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex);
void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc);
- void ThreadI2PAcceptIncoming();
- void AcceptConnection(const ListenSocket& hListenSocket);
+ void ThreadI2PAcceptIncoming() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
+ void AcceptConnection(const ListenSocket& hListenSocket) EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
/**
* Create a `CNode` object from a socket that has just been accepted and add the node to
* the `m_nodes` member.
* @param[in] sock Connected socket to communicate with the peer.
* @param[in] permission_flags The peer's permissions.
* @param[in] addr_bind The address and port at our side of the connection.
* @param[in] addr The address and port at the peer's side of the connection.
*/
void CreateNodeFromAcceptedSocket(std::unique_ptr<Sock>&& sock,
NetPermissionFlags permission_flags,
const CAddress& addr_bind,
- const CAddress& addr);
+ const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
- void DeleteDisconnectedNode(CNode* pnode);
- void DisconnectAndReleaseNode(CNode* pnode);
- void DisconnectNodes();
+ void DisconnectAndReleaseNode(CNode* node) EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
+ void DeleteDisconnectedNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
+ void DisconnectNodes() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
void NotifyNumConnectionsChanged();
/** Return true if the peer is inactive and should be disconnected. */
bool InactivityCheck(const CNode& node) const;
/**
* Generate a collection of sockets to check for IO readiness.
@@ -950,13 +952,13 @@ private:
*/
Sock::EventsPerSock GenerateWaitSockets(Span<CNode* const> nodes);
/**
* Check connected and listening sockets for IO readiness and process them accordingly.
*/
- void SocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
+ void SocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex, !m_total_bytes_sent_mutex, !mutexMsgProc);
/**
* Do the read/write for connected sockets that are ready for IO.
* @param[in] nodes Nodes to process. The socket of each node is checked against `what`.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
@@ -965,15 +967,15 @@ private:
EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
/**
* Accept incoming connections, one from each read-ready listening socket.
* @param[in] events_per_sock Sockets that are ready for IO.
*/
- void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock);
+ void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock) EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
- void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc);
+ void ThreadSocketHandler() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex, !m_total_bytes_sent_mutex, !mutexMsgProc);
void ThreadDNSAddressSeed() EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_nodes_mutex);
uint64_t CalculateKeyedNetGroup(const CAddress& ad) const;
CNode* FindNode(const CNetAddr& ip);
CNode* FindNode(const CSubNet& subNet);
@@ -983,13 +985,13 @@ private:
/**
* Determine whether we're already connected to a given address, in order to
* avoid initiating duplicate connections.
*/
bool AlreadyConnectedToAddress(const CAddress& addr);
- bool AttemptToEvictConnection();
+ bool AttemptToEvictConnection() EXCLUSIVE_LOCKS_REQUIRED(!m_nodes_disconnected_mutex);
CNode* ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex);
void AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const;
void DeleteNode(CNode* pnode);
NodeId GetNewNodeId();
@@ -1042,13 +1044,13 @@ private:
const NetGroupManager& m_netgroupman;
std::deque<std::string> m_addr_fetches GUARDED_BY(m_addr_fetches_mutex);
Mutex m_addr_fetches_mutex;
std::vector<std::string> m_added_nodes GUARDED_BY(m_added_nodes_mutex);
mutable Mutex m_added_nodes_mutex;
std::vector<CNode*> m_nodes GUARDED_BY(m_nodes_mutex);
- GlobalMutex m_nodes_disconnected_mutex;
+ Mutex m_nodes_disconnected_mutex;
std::list<CNode*> m_nodes_disconnected GUARDED_BY(m_nodes_disconnected_mutex);
mutable RecursiveMutex m_nodes_mutex;
std::atomic<NodeId> nLastNodeId{0};
unsigned int nPrevNodeCount{0};
/** |
Thanks for the review and patch @vasild, I will try to review it over the weekend and keep this moving forward. I have in the mean time been considering a totally different approach to thread-safe deletion of nodes from If you are also interested in this approach, I'd be curious to know what you thought of it. Althought "cleaner" in the end I think the change is too expensive for too-little benefit... https://github.com/willcl-ark/bitcoin/tree/2023-07_shared-ptr-cnode |
Yes! I was thinking about the same! We are doing some manual refcounting here which is creating all kinds of headaches and is prone to bugs. There is a neat solution already for this To minimize the size of the initial PR, I guess you can just change Later I think we don't even need PS 2023-07_shared-ptr-cnode does not look too bad. |
🐙 This pull request conflicts with the target branch and needs rebase. |
There hasn't been much activity lately and the patch still needs rebase. What is the status here?
|
⌛ There hasn't been much activity lately and the patch still needs rebase. What is the status here?
|
1 similar comment
⌛ There hasn't been much activity lately and the patch still needs rebase. What is the status here?
|
Going to close this for now as I don't have time to finish it. |
Fixes #27843
To avoid overflowing
-maxconnections
disconnect nodes marked for eviction directly inside ofAttemptToEvictConnection()
. This has the end result that new connections will only be accepted after an existing connection is dropped, otherwise the new connection is dropped.Previously the number of connected nodes could overflow
nMaxInbound
as (multiple) new connections could be accepted fromThreadI2PAccept
-- each marking an existing connection to drop (in the future) -- beforeThreadSocketHandler
looped through toDisconnectNodes()
and took care of the disconnections.Node disconnection and deletion are broken out into individual functions which handle a single node so they can be called both from
DisconnectNodes()
andAttemptToEvictConnection
. This will result in more un/locking operations to perform mass disconnections, but as this only really happens when the network becomes inactive it should not be a problem.