8000 Do not block indefinitely on the semaphore (backport #1654) by mergify[bot] · Pull Request #1688 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Do not block indefinitely on the semaphore (backport #1654) #1688

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,12 +750,13 @@ type MempoolConfig struct {
// XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
MaxBatchBytes int `mapstructure:"max_batch_bytes"`
// Experimental parameters to limit gossiping txs to up to the specified number of peers.
// We use two independent upper values for persistent peers and for non-persistent peers.
// We use two independent upper values for persistent and non-persistent peers.
// Unconditional peers are not affected by this feature.
// If we are connected to more than the specified number of persistent peers, only send txs to
// the first ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
// persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
// peers, with an upper limit of ExperimentalMaxGossipConnectionsToNonPersistentPeers.
// ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
// persistent peers disconnects, activate another persistent peer.
// Similarly for non-persistent peers, with an upper limit of
// ExperimentalMaxGossipConnectionsToNonPersistentPeers.
// If set to 0, the feature is disabled for the corresponding group of peers, that is, the
// number of active connections to that group of peers is not bounded.
// For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
Expand Down
9 changes: 5 additions & 4 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,13 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }}
max_batch_bytes = {{ .Mempool.MaxBatchBytes }}

# Experimental parameters to limit gossiping txs to up to the specified number of peers.
# We use two independent upper values for persistent peers and for non-persistent peers.
# We use two independent upper values for persistent and non-persistent peers.
# Unconditional peers are not affected by this feature.
# If we are connected to more than the specified number of persistent peers, only send txs to
# the first experimental_max_gossip_connections_to_persistent_peers of them. If one of those
# persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
# peers, with an upper limit of experimental_max_gossip_connections_to_non_persistent_peers.
# ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
# persistent peers disconnects, activate another persistent peer.
# Similarly for non-persistent peers, with an upper limit of
# ExperimentalMaxGossipConnectionsToNonPersistentPeers.
# If set to 0, the feature is disabled for the corresponding group of peers, that is, the
# number of active connections to that group of peers is not bounded.
# For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
Expand Down
41 changes: 23 additions & 18 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,37 @@ func (memR *Reactor) AddPeer(peer p2p.Peer) {
go func() {
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
// Depending on the type of peer, we choose a semaphore to limit the gossiping peers.
var peerSemaphore *semaphore.Weighted
if peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activePersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activePersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
peerSemaphore = memR.activePersistentPeersSemaphore
} else if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 {
peerSemaphore = memR.activeNonPersistentPeersSemaphore
}

if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activeNonPersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
if peerSemaphore != nil {
for peer.IsRunning() {
// Block on the semaphore until a slot is available to start gossiping with this peer.
// Do not block indefinitely, in case the peer is disconnected before gossiping starts.
ctxTimeout, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
err := peerSemaphore.Acquire(ctxTimeout, 1)
cancel()

if err != nil {
continue
}

// Release semaphore to allow other peer to start sending transactions.
defer peerSemaphore.Release(1)
break
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activeNonPersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}
}

memR.mempool.metrics.ActiveOutboundConnections.Add(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
memR.broadcastTxRoutine(peer)
}()
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/pkg/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Manifest struct {
// Defaults to false (disabled).
Prometheus bool `toml:"prometheus"`

// Maximum number of peers to which the node gossip transactions
// Maximum number of peers to which the node gossips transactions
ExperimentalMaxGossipConnectionsToPersistentPeers uint `toml:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers uint `toml:"experimental_max_gossip_connections_to_non_persistent_peers"`
}
Expand Down
0