8000 mempool: Limit gossip connections to persistent and non-persistent peers (experimental) by lasarojc · Pull Request #1584 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

mempool: Limit gossip connections to persistent and non-persistent peers (experimental) #1584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
- `[mempool]` Only gossip transactions to a subset of the connected peers,
of size `experimental_max_used_outbound_peers`
- `[mempool]` Add experimental feature to limit the number of persistent peers and non-persistent
peers to which the node gossip transactions.
([\#1558](https://github.com/cometbft/cometbft/pull/1558))
([\#1584](https://github.com/cometbft/cometbft/pull/1584))
- `[config]` Add mempool parameters `experimental_max_gossip_connections_to_persistent_peers` and
`experimental_max_gossip_connections_to_non_persistent_peers` for limiting the number of peers to
which the node gossip transactions.
([\#1558](https://github.com/cometbft/cometbft/pull/1558))
([\#1584](https://github.com/cometbft/cometbft/pull/1584))
40 changes: 24 additions & 16 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,15 +876,19 @@ type MempoolConfig struct {
// Including space needed by encoding (one varint per transaction).
// XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
MaxBatchBytes int `mapstructure:"max_batch_bytes"`
// Experimental parameter to limit broadcast of txs to up to this many peers.
// If we are connected to more than this number of peers, only send txs to the first
// ExperimentalMaxUsedOutboundPeers of them. If one of those peers disconnects, activate another
// peer.
// If set to 0, this feature is disabled, that is, the number of active connections is not
// bounded.
// If enabled, a value of 10 is recommended based on experimental performance results using the
// default P2P configuration.
ExperimentalMaxUsedOutboundPeers int `mapstructure:"experimental_max_used_outbound_peers"`
// Experimental parameters to limit gossiping txs to up to the specified number of peers.
// We use two independent upper values for persistent peers and for non-persistent peers.
// Unconditional peers are not affected by this feature.
// If we are connected to more than the specified number of persistent peers, only send txs to
// the first ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it should not be the first 10, but instead a random 10 peers, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, the first 10 to acquire the semaphore, which will likely be the first 10 connections established, but not necessarily. I will update the comment.

// persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
// peers, with an upper limit of ExperimentalMaxGossipConnectionsToNonPersistentPeers.
// If set to 0, the feature is disabled for the corresponding group of peers, that is, the
// number of active connections to that group of peers is not bounded.
// For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
// performance results using the default P2P configuration.
ExperimentalMaxGossipConnectionsToPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_non_persistent_peers"`
}

// DefaultMempoolConfig returns a default configuration for the CometBFT mempool
Expand All @@ -895,11 +899,12 @@ func DefaultMempoolConfig() *MempoolConfig {
WalPath: "",
// Each signature verification takes .5ms, Size reduced until we implement
// ABCI Recheck
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
ExperimentalMaxUsedOutboundPeers: 0,
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
ExperimentalMaxGossipConnectionsToNonPersistentPeers: 0,
ExperimentalMaxGossipConnectionsToPersistentPeers: 0,
}
}

Expand Down Expand Up @@ -935,8 +940,11 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.MaxTxBytes < 0 {
return cmterrors.ErrNegativeField{Field: "max_tx_bytes"}
}
if cfg.ExperimentalMaxUsedOutboundPeers < 0 {
return cmterrors.ErrNegativeField{Field: "experimental_max_used_outbound_peers"}
if cfg.ExperimentalMaxGossipConnectionsToPersistentPeers < 0 {
return cmterrors.ErrNegativeField{Field: "experimental_max_gossip_connections_to_persistent_peers"}
}
if cfg.ExperimentalMaxGossipConnectionsToNonPersistentPeers < 0 {
return cmterrors.ErrNegativeField{Field: "experimental_max_gossip_connections_to_non_persistent_peers"}
}
return nil
}
Expand Down
20 changes: 13 additions & 7 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,19 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }}
# XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
max_batch_bytes = {{ .Mempool.MaxBatchBytes }}

# Experimental parameter to limit broadcast of txs to up to this many peers
# If we are connected to more than this number of peers, only send txs to
# the first ExperimentalMaxOutboundPeers of them. If one of those peers goes
# offline, activate another peer.
# Value 0 disables the feature by not limiting the number of active connections.
# If you enable this feature, a value of 10 is recommended based on experimental performance results.
experimental_max_used_outbound_peers = {{ .Mempool.ExperimentalMaxUsedOutboundPeers }}
# Experimental parameters to limit gossiping txs to up to the specified number of peers.
# We use two independent upper values for persistent peers and for non-persistent peers.
# Unconditional peers are not affected by this feature.
# If we are connected to more than the specified number of persistent peers, only send txs to
# the first experimental_max_gossip_connections_to_persistent_peers of them. If one of those
# persistent peers disconnects, activate another persistent peer. Similarly for non-persistent
# peers, with an upper limit of experimental_max_gossip_connections_to_non_persistent_peers.
# If set to 0, the feature is disabled for the corresponding group of peers, that is, the
# number of active connections to that group of peers is not bounded.
# For non-persistent peers, if enabled, a value of 10 is recommended based on experimental
# performance results using the default P2P configuration.
experimental_max_gossip_connections_to_persistent_peers = {{ .Mempool.ExperimentalMaxGossipConnectionsToPersistentPeers }}
experimental_max_gossip_connections_to_non_persistent_peers = {{ .Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers }}

#######################################################
### State Sync Configuration Options ###
Expand Down
46 changes: 33 additions & 13 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type Reactor struct {
txSenders map[types.TxKey]map[p2p.ID]bool
txSendersMtx cmtsync.Mutex

activeConnectionsSemaphore *semaphore.Weighted
// Semaphores to keep track of how many connections to peers are active for broadcasting
// transactions. Each semaphore has a capacity that puts an upper bound on the number of
// connections for different groups of peers.
activePersistentPeersSemaphore *semaphore.Weighted
activeNonPersistentPeersSemaphore *semaphore.Weighted
}

// NewReactor returns a new Reactor with the given config and mempool.
Expand All @@ -53,7 +57,8 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool, waitSync bool)
memR.waitSyncCh = make(chan struct{})
}
memR.mempool.SetTxRemovedCallback(func(txKey types.TxKey) { memR.removeSenders(txKey) })
memR.activeConnectionsSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxUsedOutboundPeers))
memR.activePersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers))
6D4E memR.activeNonPersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers))

return memR
}
Expand Down Expand Up @@ -100,19 +105,34 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
func (memR *Reactor) AddPeer(peer p2p.Peer) {
if memR.config.Broadcast {
go func() {
if memR.config.ExperimentalMaxUsedOutboundPeers > 0 {
// Around (MaxOutboundPeers-ExperimentalMaxUsedOutboundPeers) goroutines will be
// blocked here waiting for more peers disconnect and free some slots for running.
if err := memR.activeConnectionsSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
// Always forward transactions to unconditional peers.
if !memR.Switch.IsPeerUnconditional(peer.ID()) {
if peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activePersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activePersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}

if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 {
// Block sending transactions to peer until one of the connections become
// available in the semaphore.
if err := memR.activeNonPersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil {
memR.Logger.Error("Failed to acquire semaphore: %v", err)
return
}
// Release semaphore to allow other peer to start sending transactions.
defer memR.activeNonPersistentPeersSemaphore.Release(1)
defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}
memR.mempool.metrics.ActiveOutboundConnections.Add(1)
defer func() {
memR.activeConnectionsSemaphore.Release(1)
memR.mempool.metrics.ActiveOutboundConnections.Add(-1)
}()
}

memR.mempool.metrics.ActiveOutboundConnections.Add(1)
memR.broadcastTxRoutine(peer)
}()
}
Expand Down
26 changes: 16 additions & 10 deletions mempool/reactor_test.go
F438
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,13 @@ func TestReactorTxSendersMultiNode(t *testing.T) {
}

// Test the experimental feature that limits the number of outgoing connections for gossiping
// transactions.
// transactions (only non-persistent peers).
// Note: in this test we know which gossip connections are active or not because of how the p2p
// functions are currently implemented, which affects the order in which peers are added to the
// mempool reactor.
func TestMempoolReactorMaxActiveOutboundConnections(t *testing.T) {
config := cfg.TestConfig()
config.Mempool.ExperimentalMaxUsedOutboundPeers = 1
config.Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers = 1
reactors, _ := makeAndConnectReactors(config, 4)
defer func() {
for _, r := range reactors {
Expand All @@ -356,20 +359,23 @@ func TestMempoolReactorMaxActiveOutboundConnections(t *testing.T) {
txs := newUniqueTxs(100)
callCheckTx(t, reactors[0].mempool, txs)

// Wait for all txs to be in the mempool of the second reactor; the third and fourth reactor
// should not receive any tx.
// Wait for all txs to be in the mempool of the second reactor; the other reactors should not
// receive any tx. (The second reactor only sends transactions to the first reactor.)
checkTxsInMempool(t, txs, reactors[1], 0)
require.Zero(t, reactors[2].mempool.Size())
require.Zero(t, reactors[3].mempool.Size())
for _, r := range reactors[2:] {
require.Zero(t, r.mempool.Size())
}

// In the first reactor, disconnect the second reactor; the third reactor should become active.
// Disconnect the second reactor from the first reactor.
firstPeer := reactors[0].Switch.Peers().List()[0]
reactors[0].Switch.StopPeerGracefully(firstPeer)

// Now the third reactor should receive the transactions; the fourth reactor's mempool should
// still be empty.
// Now the third reactor should start receiving transactions from the first reactor; the fourth
// reactor's mempool should still be empty.
checkTxsInMempool(t, txs, reactors[2], 0)
require.Zero(t, reactors[3].mempool.Size())
for _, r := range reactors[3:] {
require.Zero(t, r.mempool.Size())
}
}

// Check that the mempool has exactly the given list of txs and, if it's not the
Expand Down
6 changes: 4 additions & 2 deletions test/e2e/pkg/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ type Manifest struct {
// Upper bound of sleep duration then gossipping votes and block parts
PeerGossipIntraloopSleepDuration time.Duration `toml:"peer_gossip_intraloop_sleep_duration"`

// Maximum number of peers to gossip transactions
ExperimentalMaxUsedOutboundPeers uint `toml:"experimental_max_used_outbound_peers"`
// Maximum number of peers to which the node gossip transactions
ExperimentalMaxGossipConnectionsToPersistentPeers uint `toml:"experimental_max_gossip_connections_to_persistent_peers"`
ExperimentalMaxGossipConnectionsToNonPersistentPeers uint `toml:"experimental_max_gossip_connections_to_non_persistent_peers"`

// Enable or disable e2e tests for CometBFT's expected behavior with respect
// to ABCI.
ABCITestsEnabled bool `toml:"abci_tests_enabled"`
Expand Down
117 changes: 59 additions & 58 deletions test/e2e/pkg/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,66 +68,66 @@ const (

// Testnet represents a single testnet.
type Testnet struct {
Name string
File string
Dir string
IP *net.IPNet
InitialHeight int64
InitialState map[string]string
Validators map[*Node]int64
ValidatorUpdates map[int64]map[*Node]int64
Nodes []*Node
DisablePexReactor bool
KeyType string
Evidence int
LoadTxSizeBytes int
LoadTxBatchSize int
LoadTxConnections int
ABCIProtocol string
PrepareProposalDelay time.Duration
ProcessProposalDelay time.Duration
CheckTxDelay time.Duration
VoteExtensionDelay time.Duration
FinalizeBlockDelay time.Duration
UpgradeVersion string
Prometheus bool
VoteExtensionsEnableHeight int64
VoteExtensionSize uint
PeerGossipIntraloopSleepDuration time.Duration
ExperimentalMaxUsedOutboundPeers uint
ABCITestsEnabled bool
Name string
File string
Dir string
IP *net.IPNet
InitialHeight int64
InitialState map[string]string
Validators map[*Node]int64
ValidatorUpdates map[int64]map[*Node]int64
Nodes []*Node
DisablePexReactor bool
KeyType string
Evidence int
LoadTxSizeBytes int
LoadTxBatchSize int
LoadTxConnections int
ABCIProtocol string
PrepareProposalDelay time.Duration
ProcessProposalDelay time.Duration
CheckTxDelay time.Duration
VoteExtensionDelay time.Duration
FinalizeBlockDelay time.Duration
UpgradeVersion string
Prometheus bool
VoteExtensionsEnableHeight int64
VoteExtensionSize uint
PeerGossipIntraloopSleepDuration time.Duration
ExperimentalMaxGossipConnectionsToPersistentPeers uint
ExperimentalMaxGossipConnectionsToNonPersistentPeers uint
ABCITestsEnabled bool
}

// Node represents a CometBFT node in a testnet.
type Node struct {
Name string
Version string
Testnet *Testnet
Mode Mode
PrivvalKey crypto.PrivKey
NodeKey crypto.PrivKey
InternalIP net.IP
ExternalIP net.IP
RPCProxyPort uint32
GRPCProxyPort uint32
GRPCPrivilegedProxyPort uint32
StartAt int64
BlockSyncVersion string
StateSync bool
Database string
ABCIProtocol Protocol
PrivvalProtocol Protocol
PersistInterval uint64
SnapshotInterval uint64
RetainBlocks uint64
EnableCompanionPruning bool
Seeds []*Node
PersistentPeers []*Node
Perturbations []Perturbation
SendNoLoad bool
Prometheus bool
PrometheusProxyPort uint32
ExperimentalMaxOutboundPeers uint16
Name string
Version string
Testnet *Testnet
Mode Mode
PrivvalKey crypto.PrivKey
NodeKey crypto.PrivKey
InternalIP net.IP
ExternalIP net.IP
RPCProxyPort uint32
GRPCProxyPort uint32
GRPCPrivilegedProxyPort uint32
StartAt int64
BlockSyncVersion string
StateSync bool
Database string
ABCIProtocol Protocol
PrivvalProtocol Protocol
PersistInterval uint64
SnapshotInterval uint64
RetainBlocks uint64
EnableCompanionPruning bool
Seeds []*Node
PersistentPeers []*Node
Perturbations []Perturbation
SendNoLoad bool
Prometheus bool
PrometheusProxyPort uint32
}

// LoadTestnet loads a testnet from a manifest file, using the filename to
Expand Down Expand Up @@ -181,8 +181,9 @@ func NewTestnetFromManifest(manifest Manifest, file string, ifd InfrastructureDa
VoteExtensionsEnableHeight: manifest.VoteExtensionsEnableHeight,
VoteExtensionSize: manifest.VoteExtensionSize,
PeerGossipIntraloopSleepDuration: manifest.PeerGossipIntraloopSleepDuration,
ExperimentalMaxUsedOutboundPeers: manifest.ExperimentalMaxUsedOutboundPeers,
ABCITestsEnabled: manifest.ABCITestsEnabled,
ExperimentalMaxGossipConnectionsToPersistentPeers: manifest.ExperimentalMaxGossipConnectionsToPersistentPeers,
ExperimentalMaxGossipConnectionsToNonPersistentPeers: manifest.ExperimentalMaxGossipConnectionsToNonPersistentPeers,
ABCITestsEnabled: manifest.ABCITestsEnabled,
}
if len(manifest.KeyType) != 0 {
testnet.KeyType = manifest.KeyType
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/runner/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
cfg.StateSync.DiscoveryTime = 5 * time.Second
cfg.BlockSync.Version = node.BlockSyncVersion
cfg.Consensus.PeerGossipIntraloopSleepDuration = node.Testnet.PeerGossipIntraloopSleepDuration
cfg.Mempool.ExperimentalMaxUsedOutboundPeers = int(node.Testnet.ExperimentalMaxUsedOutboundPeers)
cfg.Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers = int(node.Testnet.ExperimentalMaxGossipConnectionsToNonPersistentPeers)
cfg.Mempool.ExperimentalMaxGossipConnectionsToPersistentPeers = int(node.Testnet.ExperimentalMaxGossipConnectionsToPersistentPeers)

// Assume that full nodes and validators will have a data companion
// attached, which will need access to the privileged gRPC endpoint.
Expand Down
0