From d5e675267a718ae1125296a861fda090eccf1f11 Mon Sep 17 00:00:00 2001 From: lasarojc Date: Wed, 8 Nov 2023 09:46:03 -0300 Subject: [PATCH 1/8] Ignore persistent peers from limiting of outbound connections --- .../improvements/1558-experimental-gossip-limiting.md | 2 +- config/config.go | 1 + config/toml.go | 1 + mempool/metrics.go | 1 + mempool/reactor.go | 10 +++++----- test/e2e/pkg/manifest.go | 2 +- 6 files changed, 10 insertions(+), 7 deletions(-) diff --git a/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md b/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md index a69444171ee..49834c45e50 100644 --- a/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md +++ b/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md @@ -1,3 +1,3 @@ - `[mempool]` Only gossip transactions to a subset of the connected peers, -of size `experimental_max_used_outbound_peers` +of size `experimental_max_used_outbound_peers`, not counting persistent peers. ([\#1558](https://github.com/cometbft/cometbft/pull/1558)) diff --git a/config/config.go b/config/config.go index ed0572375a7..f3a348e6563 100644 --- a/config/config.go +++ b/config/config.go @@ -880,6 +880,7 @@ type MempoolConfig struct { // If we are connected to more than this number of peers, only send txs to the first // ExperimentalMaxUsedOutboundPeers of them. If one of those peers disconnects, activate another // peer. + // Persistent peers do not count towards this limit. // If set to 0, this feature is disabled, that is, the number of active connections is not // bounded. // If enabled, a value of 10 is recommended based on experimental performance results using the diff --git a/config/toml.go b/config/toml.go index 40b4e55b7e6..e8c93c2f384 100644 --- a/config/toml.go +++ b/config/toml.go @@ -429,6 +429,7 @@ max_batch_bytes = {{ .Mempool.MaxBatchBytes }} # If we are connected to more than this number of peers, only send txs to # the first ExperimentalMaxOutboundPeers of them. If one of those peers goes # offline, activate another peer. +# Persistent peers do not count towards this limit. # Value 0 disables the feature by not limiting the number of active connections. # If you enable this feature, a value of 10 is recommended based on experimental performance results. experimental_max_used_outbound_peers = {{ .Mempool.ExperimentalMaxUsedOutboundPeers }} diff --git a/mempool/metrics.go b/mempool/metrics.go index 47de43fffbf..384de04ffe6 100644 --- a/mempool/metrics.go +++ b/mempool/metrics.go @@ -42,6 +42,7 @@ type Metrics struct { AlreadyReceivedTxs metrics.Counter // Number of connections being actively used for gossiping transactions + // excluding peristent peers. // (experimental feature). ActiveOutboundConnections metrics.Gauge } diff --git a/mempool/reactor.go b/mempool/reactor.go index c840fc71ac8..72e6748e704 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -100,18 +100,18 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { func (memR *Reactor) AddPeer(peer p2p.Peer) { if memR.config.Broadcast { go func() { - if memR.config.ExperimentalMaxUsedOutboundPeers > 0 { + //Persistent peers do not count towards this limit. + if !peer.IsPersistent() && memR.config.ExperimentalMaxUsedOutboundPeers > 0 { // Around (MaxOutboundPeers-ExperimentalMaxUsedOutboundPeers) goroutines will be // blocked here waiting for more peers disconnect and free some slots for running. if err := memR.activeConnectionsSemaphore.Acquire(context.TODO(), 1); err != nil { memR.Logger.Error("Failed to acquire semaphore: %v", err) return } + defer memR.activeConnectionsSemaphore.Release(1) + memR.mempool.metrics.ActiveOutboundConnections.Add(1) - defer func() { - memR.activeConnectionsSemaphore.Release(1) - memR.mempool.metrics.ActiveOutboundConnections.Add(-1) - }() + defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1) } memR.broadcastTxRoutine(peer) }() diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index b7abcc696c5..2a4637b9c6c 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -102,7 +102,7 @@ type Manifest struct { // Upper bound of sleep duration then gossipping votes and block parts PeerGossipIntraloopSleepDuration time.Duration `toml:"peer_gossip_intraloop_sleep_duration"` - // Maximum number of peers to gossip transactions + // Maximum number of non-persistent peers to gossip transactions ExperimentalMaxUsedOutboundPeers uint `toml:"experimental_max_used_outbound_peers"` // Enable or disable e2e tests for CometBFT's expected behavior with respect // to ABCI. From 441ec4f06bad58f27280d4733698c73745a24f7b Mon Sep 17 00:00:00 2001 From: lasaro Date: Wed, 8 Nov 2023 09:49:33 -0300 Subject: [PATCH 2/8] Update 1558-experimental-gossip-limiting.md Update changeling --- .../unreleased/improvements/1558-experimental-gossip-limiting.md | 1 + 1 file changed, 1 insertion(+) diff --git a/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md b/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md index 49834c45e50..cbfbeeaae66 100644 --- a/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md +++ b/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md @@ -1,3 +1,4 @@ - `[mempool]` Only gossip transactions to a subset of the connected peers, of size `experimental_max_used_outbound_peers`, not counting persistent peers. ([\#1558](https://github.com/cometbft/cometbft/pull/1558)) + ([\#1584](https://github.com/cometbft/cometbft/pull/1584)) From 21f78e9078e4d9a04849dbbd71608fb1752de1c1 Mon Sep 17 00:00:00 2001 From: lasaro Date: Wed, 8 Nov 2023 09:51:45 -0300 Subject: [PATCH 3/8] Fix typo in mempool/metrics.go --- mempool/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mempool/metrics.go b/mempool/metrics.go index 384de04ffe6..6f174548301 100644 --- a/mempool/metrics.go +++ b/mempool/metrics.go @@ -42,7 +42,7 @@ type Metrics struct { AlreadyReceivedTxs metrics.Counter // Number of connections being actively used for gossiping transactions - // excluding peristent peers. + // excluding persistent peers. // (experimental feature). ActiveOutboundConnections metrics.Gauge } From 8282f8d25605a4ff58bd733465fd242036137a7d Mon Sep 17 00:00:00 2001 From: hvanz Date: Wed, 8 Nov 2023 22:47:47 +0100 Subject: [PATCH 4/8] Use two independent configs and semaphores for persistent and non-persistent peers --- .../1558-experimental-gossip-limiting.md | 9 +- config/config.go | 41 +++--- config/toml.go | 21 ++-- mempool/reactor.go | 44 +++++-- test/e2e/pkg/manifest.go | 6 +- test/e2e/pkg/testnet.go | 117 +++++++++--------- test/e2e/runner/setup.go | 3 +- 7 files changed, 141 insertions(+), 100 deletions(-) diff --git a/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md b/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md index cbfbeeaae66..6931cef8274 100644 --- a/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md +++ b/.changelog/unreleased/improvements/1558-experimental-gossip-limiting.md @@ -1,4 +1,9 @@ -- `[mempool]` Only gossip transactions to a subset of the connected peers, -of size `experimental_max_used_outbound_peers`, not counting persistent peers. +- `[mempool]` Add experimental feature to limit the number of persistent peers and non-persistent + peers to which the node gossip transactions. + ([\#1558](https://github.com/cometbft/cometbft/pull/1558)) + ([\#1584](https://github.com/cometbft/cometbft/pull/1584)) +- `[config]` Add mempool parameters `experimental_max_gossip_connections_to_persistent_peers` and + `experimental_max_gossip_connections_to_non_persistent_peers` for limiting the number of peers to + which the node gossip transactions. ([\#1558](https://github.com/cometbft/cometbft/pull/1558)) ([\#1584](https://github.com/cometbft/cometbft/pull/1584)) diff --git a/config/config.go b/config/config.go index f3a348e6563..245271d0ec3 100644 --- a/config/config.go +++ b/config/config.go @@ -876,16 +876,19 @@ type MempoolConfig struct { // Including space needed by encoding (one varint per transaction). // XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 MaxBatchBytes int `mapstructure:"max_batch_bytes"` - // Experimental parameter to limit broadcast of txs to up to this many peers. - // If we are connected to more than this number of peers, only send txs to the first - // ExperimentalMaxUsedOutboundPeers of them. If one of those peers disconnects, activate another - // peer. - // Persistent peers do not count towards this limit. - // If set to 0, this feature is disabled, that is, the number of active connections is not - // bounded. - // If enabled, a value of 10 is recommended based on experimental performance results using the - // default P2P configuration. - ExperimentalMaxUsedOutboundPeers int `mapstructure:"experimental_max_used_outbound_peers"` + // Experimental parameters to limit gossiping txs to up to the specified number of peers. + // We use two independent upper values for persistent peers and for non-persistent peers. + // Unconditional peers are not affected by this feature. + // If we are connected to more than the specified number of persistent peers, only send txs to + // the first ExperimentalMaxGossipConnectionsToPersistentPeers of them. If one of those + // persistent peers disconnects, activate another persistent peer. Similarly for non-persistent + // peers, with an upper limit of ExperimentalMaxGossipConnectionsToNonPersistentPeers. + // If set to 0, the feature is disabled for the corresponding group of peers, that is, the + // number of active connections to that group of peers is not bounded. + // For non-persistent peers, if enabled, a value of 10 is recommended based on experimental + // performance results using the default P2P configuration. + ExperimentalMaxGossipConnectionsToPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_persistent_peers"` + ExperimentalMaxGossipConnectionsToNonPersistentPeers int `mapstructure:"experimental_max_gossip_connections_to_non_persistent_peers"` } // DefaultMempoolConfig returns a default configuration for the CometBFT mempool @@ -896,11 +899,12 @@ func DefaultMempoolConfig() *MempoolConfig { WalPath: "", // Each signature verification takes .5ms, Size reduced until we implement // ABCI Recheck - Size: 5000, - MaxTxsBytes: 1024 * 1024 * 1024, // 1GB - CacheSize: 10000, - MaxTxBytes: 1024 * 1024, // 1MB - ExperimentalMaxUsedOutboundPeers: 0, + Size: 5000, + MaxTxsBytes: 1024 * 1024 * 1024, // 1GB + CacheSize: 10000, + MaxTxBytes: 1024 * 1024, // 1MB + ExperimentalMaxGossipConnectionsToNonPersistentPeers: 0, + ExperimentalMaxGossipConnectionsToPersistentPeers: 0, } } @@ -936,8 +940,11 @@ func (cfg *MempoolConfig) ValidateBasic() error { if cfg.MaxTxBytes < 0 { return cmterrors.ErrNegativeField{Field: "max_tx_bytes"} } - if cfg.ExperimentalMaxUsedOutboundPeers < 0 { - return cmterrors.ErrNegativeField{Field: "experimental_max_used_outbound_peers"} + if cfg.ExperimentalMaxGossipConnectionsToPersistentPeers < 0 { + return cmterrors.ErrNegativeField{Field: "experimental_max_gossip_connections_to_persistent_peers"} + } + if cfg.ExperimentalMaxGossipConnectionsToNonPersistentPeers < 0 { + return cmterrors.ErrNegativeField{Field: "experimental_max_gossip_connections_to_non_persistent_peers"} } return nil } diff --git a/config/toml.go b/config/toml.go index e8c93c2f384..37122893265 100644 --- a/config/toml.go +++ b/config/toml.go @@ -425,14 +425,19 @@ max_tx_bytes = {{ .Mempool.MaxTxBytes }} # XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796 max_batch_bytes = {{ .Mempool.MaxBatchBytes }} -# Experimental parameter to limit broadcast of txs to up to this many peers -# If we are connected to more than this number of peers, only send txs to -# the first ExperimentalMaxOutboundPeers of them. If one of those peers goes -# offline, activate another peer. -# Persistent peers do not count towards this limit. -# Value 0 disables the feature by not limiting the number of active connections. -# If you enable this feature, a value of 10 is recommended based on experimental performance results. -experimental_max_used_outbound_peers = {{ .Mempool.ExperimentalMaxUsedOutboundPeers }} +# Experimental parameters to limit gossiping txs to up to the specified number of peers. +# We use two independent upper values for persistent peers and for non-persistent peers. +# Unconditional peers are not affected by this feature. +# If we are connected to more than the specified number of persistent peers, only send txs to +# the first experimental_max_gossip_connections_to_persistent_peers of them. If one of those +# persistent peers disconnects, activate another persistent peer. Similarly for non-persistent +# peers, with an upper limit of experimental_max_gossip_connections_to_non_persistent_peers. +# If set to 0, the feature is disabled for the corresponding group of peers, that is, the +# number of active connections to that group of peers is not bounded. +# For non-persistent peers, if enabled, a value of 10 is recommended based on experimental +# performance results using the default P2P configuration. +experimental_max_gossip_connections_to_persistent_peers = {{ .Mempool.ExperimentalMaxGossipConnectionsToPersistentPeers }} +experimental_max_gossip_connections_to_non_persistent_peers = {{ .Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers }} ####################################################### ### State Sync Configuration Options ### diff --git a/mempool/reactor.go b/mempool/reactor.go index 72e6748e704..03ae5881d43 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -36,7 +36,11 @@ type Reactor struct { txSenders map[types.TxKey]map[p2p.ID]bool txSendersMtx cmtsync.Mutex - activeConnectionsSemaphore *semaphore.Weighted + // Semaphores to keep track of how many connections to peers are active for broadcasting + // transactions. Each semaphore has a capacity that puts an upper bound on the number of + // connections for different groups of peers. + activePersistentConnectionsSemaphore *semaphore.Weighted + activeNonPersistentConnectionsSemaphore *semaphore.Weighted } // NewReactor returns a new Reactor with the given config and mempool. @@ -53,7 +57,8 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool, waitSync bool) memR.waitSyncCh = make(chan struct{}) } memR.mempool.SetTxRemovedCallback(func(txKey types.TxKey) { memR.removeSenders(txKey) }) - memR.activeConnectionsSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxUsedOutboundPeers)) + memR.activePersistentConnectionsSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers)) + memR.activeNonPersistentConnectionsSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers)) return memR } @@ -100,19 +105,34 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { func (memR *Reactor) AddPeer(peer p2p.Peer) { if memR.config.Broadcast { go func() { - //Persistent peers do not count towards this limit. - if !peer.IsPersistent() && memR.config.ExperimentalMaxUsedOutboundPeers > 0 { - // Around (MaxOutboundPeers-ExperimentalMaxUsedOutboundPeers) goroutines will be - // blocked here waiting for more peers disconnect and free some slots for running. - if err := memR.activeConnectionsSemaphore.Acquire(context.TODO(), 1); err != nil { - memR.Logger.Error("Failed to acquire semaphore: %v", err) - return + // Always forward transactions to unconditional peers. + if !memR.Switch.IsPeerUnconditional(peer.ID()) { + if peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers > 0 { + // Block sending transactions to peer until one of the connections become + // available in the semaphore. + if err := memR.activePersistentConnectionsSemaphore.Acquire(context.TODO(), 1); err != nil { + memR.Logger.Error("Failed to acquire semaphore: %v", err) + return + } + // Release semaphore to allow other peer to start sending transactions. + defer memR.activePersistentConnectionsSemaphore.Release(1) + defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1) } - defer memR.activeConnectionsSemaphore.Release(1) - memR.mempool.metrics.ActiveOutboundConnections.Add(1) - defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1) + if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 { + // Block sending transactions to peer until one of the connections become + // available in the semaphore. + if err := memR.activeNonPersistentConnectionsSemaphore.Acquire(context.TODO(), 1); err != nil { + memR.Logger.Error("Failed to acquire semaphore: %v", err) + return + } + // Release semaphore to allow other peer to start sending transactions. + defer memR.activeNonPersistentConnectionsSemaphore.Release(1) + defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1) + } } + + memR.mempool.metrics.ActiveOutboundConnections.Add(1) memR.broadcastTxRoutine(peer) }() } diff --git a/test/e2e/pkg/manifest.go b/test/e2e/pkg/manifest.go index 2a4637b9c6c..3c8bad1daaf 100644 --- a/test/e2e/pkg/manifest.go +++ b/test/e2e/pkg/manifest.go @@ -102,8 +102,10 @@ type Manifest struct { // Upper bound of sleep duration then gossipping votes and block parts PeerGossipIntraloopSleepDuration time.Duration `toml:"peer_gossip_intraloop_sleep_duration"` - // Maximum number of non-persistent peers to gossip transactions - ExperimentalMaxUsedOutboundPeers uint `toml:"experimental_max_used_outbound_peers"` + // Maximum number of peers to which the node gossip transactions + ExperimentalMaxGossipConnectionsToPersistentPeers uint `toml:"experimental_max_gossip_connections_to_persistent_peers"` + ExperimentalMaxGossipConnectionsToNonPersistentPeers uint `toml:"experimental_max_gossip_connections_to_non_persistent_peers"` + // Enable or disable e2e tests for CometBFT's expected behavior with respect // to ABCI. ABCITestsEnabled bool `toml:"abci_tests_enabled"` diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 2a611615137..9fe417ac736 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -68,66 +68,66 @@ const ( // Testnet represents a single testnet. type Testnet struct { - Name string - File string - Dir string - IP *net.IPNet - InitialHeight int64 - InitialState map[string]string - Validators map[*Node]int64 - ValidatorUpdates map[int64]map[*Node]int64 - Nodes []*Node - DisablePexReactor bool - KeyType string - Evidence int - LoadTxSizeBytes int - LoadTxBatchSize int - LoadTxConnections int - ABCIProtocol string - PrepareProposalDelay time.Duration - ProcessProposalDelay time.Duration - CheckTxDelay time.Duration - VoteExtensionDelay time.Duration - FinalizeBlockDelay time.Duration - UpgradeVersion string - Prometheus bool - VoteExtensionsEnableHeight int64 - VoteExtensionSize uint - PeerGossipIntraloopSleepDuration time.Duration - ExperimentalMaxUsedOutboundPeers uint - ABCITestsEnabled bool + Name string + File string + Dir string + IP *net.IPNet + InitialHeight int64 + InitialState map[string]string + Validators map[*Node]int64 + ValidatorUpdates map[int64]map[*Node]int64 + Nodes []*Node + DisablePexReactor bool + KeyType string + Evidence int + LoadTxSizeBytes int + LoadTxBatchSize int + LoadTxConnections int + ABCIProtocol string + PrepareProposalDelay time.Duration + ProcessProposalDelay time.Duration + CheckTxDelay time.Duration + VoteExtensionDelay time.Duration + FinalizeBlockDelay time.Duration + UpgradeVersion string + Prometheus bool + VoteExtensionsEnableHeight int64 + VoteExtensionSize uint + PeerGossipIntraloopSleepDuration time.Duration + ExperimentalMaxGossipConnectionsToPersistentPeers uint + ExperimentalMaxGossipConnectionsToNonPersistentPeers uint + ABCITestsEnabled bool } // Node represents a CometBFT node in a testnet. type Node struct { - Name string - Version string - Testnet *Testnet - Mode Mode - PrivvalKey crypto.PrivKey - NodeKey crypto.PrivKey - InternalIP net.IP - ExternalIP net.IP - RPCProxyPort uint32 - GRPCProxyPort uint32 - GRPCPrivilegedProxyPort uint32 - StartAt int64 - BlockSyncVersion string - StateSync bool - Database string - ABCIProtocol Protocol - PrivvalProtocol Protocol - PersistInterval uint64 - SnapshotInterval uint64 - RetainBlocks uint64 - EnableCompanionPruning bool - Seeds []*Node - PersistentPeers []*Node - Perturbations []Perturbation - SendNoLoad bool - Prometheus bool - PrometheusProxyPort uint32 - ExperimentalMaxOutboundPeers uint16 + Name string + Version string + Testnet *Testnet + Mode Mode + PrivvalKey crypto.PrivKey + NodeKey crypto.PrivKey + InternalIP net.IP + ExternalIP net.IP + RPCProxyPort uint32 + GRPCProxyPort uint32 + GRPCPrivilegedProxyPort uint32 + StartAt int64 + BlockSyncVersion string + StateSync bool + Database string + ABCIProtocol Protocol + PrivvalProtocol Protocol + PersistInterval uint64 + SnapshotInterval uint64 + RetainBlocks uint64 + EnableCompanionPruning bool + Seeds []*Node + PersistentPeers []*Node + Perturbations []Perturbation + SendNoLoad bool + Prometheus bool + PrometheusProxyPort uint32 } // LoadTestnet loads a testnet from a manifest file, using the filename to @@ -181,8 +181,9 @@ func NewTestnetFromManifest(manifest Manifest, file string, ifd InfrastructureDa VoteExtensionsEnableHeight: manifest.VoteExtensionsEnableHeight, VoteExtensionSize: manifest.VoteExtensionSize, PeerGossipIntraloopSleepDuration: manifest.PeerGossipIntraloopSleepDuration, - ExperimentalMaxUsedOutboundPeers: manifest.ExperimentalMaxUsedOutboundPeers, - ABCITestsEnabled: manifest.ABCITestsEnabled, + ExperimentalMaxGossipConnectionsToPersistentPeers: manifest.ExperimentalMaxGossipConnectionsToPersistentPeers, + ExperimentalMaxGossipConnectionsToNonPersistentPeers: manifest.ExperimentalMaxGossipConnectionsToNonPersistentPeers, + ABCITestsEnabled: manifest.ABCITestsEnabled, } if len(manifest.KeyType) != 0 { testnet.KeyType = manifest.KeyType diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index 8c8aa9b30fd..9eec9fe21bb 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -182,7 +182,8 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { cfg.StateSync.DiscoveryTime = 5 * time.Second cfg.BlockSync.Version = node.BlockSyncVersion cfg.Consensus.PeerGossipIntraloopSleepDuration = node.Testnet.PeerGossipIntraloopSleepDuration - cfg.Mempool.ExperimentalMaxUsedOutboundPeers = int(node.Testnet.ExperimentalMaxUsedOutboundPeers) + cfg.Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers = int(node.Testnet.ExperimentalMaxGossipConnectionsToNonPersistentPeers) + cfg.Mempool.ExperimentalMaxGossipConnectionsToPersistentPeers = int(node.Testnet.ExperimentalMaxGossipConnectionsToPersistentPeers) // Assume that full nodes and validators will have a data companion // attached, which will need access to the privileged gRPC endpoint. From d62d5953a12d39beff13d326bebdd2d1ee169eba Mon Sep 17 00:00:00 2001 From: hvanz Date: Wed, 8 Nov 2023 22:58:12 +0100 Subject: [PATCH 5/8] Forgot to rename in test --- mempool/reactor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 43a152e1b68..592e0b95efb 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -337,7 +337,7 @@ func TestReactorTxSendersMultiNode(t *testing.T) { // transactions. func TestMempoolReactorMaxActiveOutboundConnections(t *testing.T) { config := cfg.TestConfig() - config.Mempool.ExperimentalMaxUsedOutboundPeers = 1 + config.Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers = 1 reactors, _ := makeAndConnectReactors(config, 4) defer func() { for _, r := range reactors { From d1189fdbc892aece072c35207eba94496cd19abb Mon Sep 17 00:00:00 2001 From: hvanz Date: Wed, 8 Nov 2023 23:22:35 +0100 Subject: [PATCH 6/8] Update metric description --- mempool/metrics.go | 1 - 1 file changed, 1 deletion(-) diff --git a/mempool/metrics.go b/mempool/metrics.go index 6f174548301..47de43fffbf 100644 --- a/mempool/metrics.go +++ b/mempool/metrics.go @@ -42,7 +42,6 @@ type Metrics struct { AlreadyReceivedTxs metrics.Counter // Number of connections being actively used for gossiping transactions - // excluding persistent peers. // (experimental feature). ActiveOutboundConnections metrics.Gauge } From 1dc431115e8bb16b302806ca7fbaf46a6d618a3e Mon Sep 17 00:00:00 2001 From: hvanz Date: Wed, 8 Nov 2023 23:24:32 +0100 Subject: [PATCH 7/8] Rename semaphores --- mempool/reactor.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/mempool/reactor.go b/mempool/reactor.go index 03ae5881d43..4fe6085875a 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -39,8 +39,8 @@ type Reactor struct { // Semaphores to keep track of how many connections to peers are active for broadcasting // transactions. Each semaphore has a capacity that puts an upper bound on the number of // connections for different groups of peers. - activePersistentConnectionsSemaphore *semaphore.Weighted - activeNonPersistentConnectionsSemaphore *semaphore.Weighted + activePersistentPeersSemaphore *semaphore.Weighted + activeNonPersistentPeersSemaphore *semaphore.Weighted } // NewReactor returns a new Reactor with the given config and mempool. @@ -57,8 +57,8 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool, waitSync bool) memR.waitSyncCh = make(chan struct{}) } memR.mempool.SetTxRemovedCallback(func(txKey types.TxKey) { memR.removeSenders(txKey) }) - memR.activePersistentConnectionsSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers)) - memR.activeNonPersistentConnectionsSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers)) + memR.activePersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers)) + memR.activeNonPersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers)) return memR } @@ -110,24 +110,24 @@ func (memR *Reactor) AddPeer(peer p2p.Peer) { if peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers > 0 { // Block sending transactions to peer until one of the connections become // available in the semaphore. - if err := memR.activePersistentConnectionsSemaphore.Acquire(context.TODO(), 1); err != nil { + if err := memR.activePersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil { memR.Logger.Error("Failed to acquire semaphore: %v", err) return } // Release semaphore to allow other peer to start sending transactions. - defer memR.activePersistentConnectionsSemaphore.Release(1) + defer memR.activePersistentPeersSemaphore.Release(1) defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1) } if !peer.IsPersistent() && memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers > 0 { // Block sending transactions to peer until one of the connections become // available in the semaphore. - if err := memR.activeNonPersistentConnectionsSemaphore.Acquire(context.TODO(), 1); err != nil { + if err := memR.activeNonPersistentPeersSemaphore.Acquire(context.TODO(), 1); err != nil { memR.Logger.Error("Failed to acquire semaphore: %v", err) return } // Release semaphore to allow other peer to start sending transactions. - defer memR.activeNonPersistentConnectionsSemaphore.Release(1) + defer memR.activeNonPersistentPeersSemaphore.Release(1) defer memR.mempool.metrics.ActiveOutboundConnections.Add(-1) } } From 48669e93fae207565746ceeb64e2ef43b92d1abd Mon Sep 17 00:00:00 2001 From: hvanz Date: Thu, 9 Nov 2023 18:19:08 +0100 Subject: [PATCH 8/8] Add comment to unit test --- mempool/reactor_test.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 592e0b95efb..8bd77e0e1d2 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -334,7 +334,10 @@ func TestReactorTxSendersMultiNode(t *testing.T) { } // Test the experimental feature that limits the number of outgoing connections for gossiping -// transactions. +// transactions (only non-persistent peers). +// Note: in this test we know which gossip connections are active or not because of how the p2p +// functions are currently implemented, which affects the order in which peers are added to the +// mempool reactor. func TestMempoolReactorMaxActiveOutboundConnections(t *testing.T) { config := cfg.TestConfig() config.Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers = 1 @@ -356,20 +359,23 @@ func TestMempoolReactorMaxActiveOutboundConnections(t *testing.T) { txs := newUniqueTxs(100) callCheckTx(t, reactors[0].mempool, txs) - // Wait for all txs to be in the mempool of the second reactor; the third and fourth reactor - // should not receive any tx. + // Wait for all txs to be in the mempool of the second reactor; the other reactors should not + // receive any tx. (The second reactor only sends transactions to the first reactor.) checkTxsInMempool(t, txs, reactors[1], 0) - require.Zero(t, reactors[2].mempool.Size()) - require.Zero(t, reactors[3].mempool.Size()) + for _, r := range reactors[2:] { + require.Zero(t, r.mempool.Size()) + } - // In the first reactor, disconnect the second reactor; the third reactor should become active. + // Disconnect the second reactor from the first reactor. firstPeer := reactors[0].Switch.Peers().List()[0] reactors[0].Switch.StopPeerGracefully(firstPeer) - // Now the third reactor should receive the transactions; the fourth reactor's mempool should - // still be empty. + // Now the third reactor should start receiving transactions from the first reactor; the fourth + // reactor's mempool should still be empty. checkTxsInMempool(t, txs, reactors[2], 0) - require.Zero(t, reactors[3].mempool.Size()) + for _, r := range reactors[3:] { + require.Zero(t, r.mempool.Size()) + } } // Check that the mempool has exactly the given list of txs and, if it's not the