8000 Adds tests that check for FIFO ordering being broken by gossip by lasarojc · Pull Request #1628 · cometbft/cometbft · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Adds tests that check for FIFO ordering being broken by gossip #1628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,10 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) CheckTx(tx types.Tx) (*abcicli.ReqRes, error) {
mem.updateMtx.RLock()
mem.logger.Debug("Locked updateMtx for read", "tx", tx)
// use defer to unlock mutex because application (*local client*) might panic
defer mem.updateMtx.RUnlock()
defer mem.logger.Debug("Released updateMtx for read", "tx", tx)

txSize := len(tx)

Expand All @@ -278,12 +280,14 @@ func (mem *CListMempool) CheckTx(tx types.Tx) (*abcicli.ReqRes, error) {
}

if added := mem.addToCache(tx); !added {
mem.logger.Debug("Not cached", "tx", tx)
mem.metrics.AlreadyReceivedTxs.Add(1)
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
// but they can spam the same tx with little cost to them atm.
return nil, ErrTxInCache
}
mem.logger.Debug("Cached", "tx", tx)

reqRes, err := mem.proxyAppConn.CheckTxAsync(context.TODO(), &abci.RequestCheckTx{Tx: tx})
if err != nil {
Expand Down Expand Up @@ -330,6 +334,7 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) {
mem.txsMap.Store(memTx.tx.Key(), e)
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
mem.logger.Debug("Clisted", "tx", memTx.tx)
}

// RemoveTxByKey removes a transaction from the mempool by its TxKey index.
Expand Down
154 changes: 154 additions & 0 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mempool
import (
"encoding/hex"
"errors"
"os"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -333,6 +334,41 @@ func TestReactorTxSendersMultiNode(t *testing.T) {
require.Zero(t, len(firstReactor.txSenders))
}

func TestMempoolFIFOWithParallelCheckTx(t *testing.T) {
if os.Getenv("CI") != "" {
t.Skip("FIFO is not supposed to be guaranteed and this this is just used to evidence one of the cases where it does not happen. Hence we skip this test during CI.")
}

config := cfg.TestConfig()
reactors, _ := makeAndConnectReactors(config, 4)
defer func() {
for _, r := range reactors {
if err := r.Stop(); err != nil {
assert.NoError(t, err)
}
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
peer.Set(types.PeerStateKey, peerState{1})
}
}

// Deliver the same sequence of transactions from multiple sources, in parallel.
txs := newUniqueTxs(200)
mp := reactors[0].mempool
for i := 0; i < 3; i++ {
go func() {
for _, tx := range txs {
mp.CheckTx(tx) //nolint:errcheck
}
}()
}

// Confirm that FIFO order was respected.
checkTxsInOrder(t, txs, reactors[0], 0)
}

// Test the experimental feature that limits the number of outgoing connections for gossiping
// transactions (only non-persistent peers).
// Note: in this test we know which gossip connections are active or not because of how the p2p
Expand Down Expand Up @@ -378,6 +414,103 @@ func TestMempoolReactorMaxActiveOutboundConnections(t *testing.T) {
}
}

// Test the experimental feature that limits the number of outgoing connections for gossiping
// transactions (only non-persistent peers).
// Given the disconnections, no transaction should be received in duplicate.
// Note: in this test we know which gossip connections are active or not because of how the p2p
// functions are currently implemented, which affects the order in which peers are added to the
// mempool reactor.
func TestMempoolReactorMaxActiveOutboundConnectionsNoDuplicate(t *testing.T) {
config := cfg.TestConfig()
config.Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers = 1
reactors, _ := makeAndConnectReactors(config, 4)
defer func() {
for _, r := range reactors {
if err := r.Stop(); err != nil {
assert.NoError(t, err)
}
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
peer.Set(types.PeerStateKey, peerState{1})
}
}

// Disconnect the second reactor from the third reactor.
pCon1_2 := reactors[1].Switch.Peers().List()[1]
reactors[1].Switch.StopPeerGracefully(pCon1_2)

// Add a bunch transactions to the first reactor.
txs := newUniqueTxs(100)
callCheckTx(t, reactors[0].mempool, txs)

// Wait for all txs to be in the mempool of the second reactor; the other reactors should not
// receive any tx. (The second reactor only sends transactions to the first reactor.)
checkTxsInOrder(t, txs, reactors[1], 0)
for _, r := range reactors[2:] {
require.Zero(t, r.mempool.Size())
}

// Disconnect the second reactor from the first reactor.
pCon0_1 := reactors[0].Switch.Peers().List()[0]
reactors[0].Switch.StopPeerGracefully(pCon0_1)

// Now the third reactor should start receiving transactions from the first reactor and
// the fourth reactor from the second
checkTxsInOrder(t, txs, reactors[2], 0)
checkTxsInOrder(t, txs, reactors[3], 0)
}

// Test the experimental feature that limits the number of outgoing connections for gossiping
// transactions (only non-persistent peers) on a star shaped network.
// The star center will need to deliver the transactions to each point.
// Note: in this test we know which gossip connections are active or not because of how the p2p
// functions are currently implemented, which affects the order in which peers are added to the
// mempool reactor.
func TestMempoolReactorMaxActiveOutboundConnectionsStar(t *testing.T) {
config := cfg.TestConfig()
config.Mempool.ExperimentalMaxGossipConnectionsToNonPersistentPeers = 1
reactors, _ := makeAndConnectReactorsStar(config, 0, 4)
defer func() {
for _, r := range reactors {
if err := r.Stop(); err != nil {
assert.NoError(t, err)
}
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
// Add a bunch transactions to the first reactor.
txs := newUniqueTxs(5)
callCheckTx(t, reactors[0].mempool, txs)

// Wait for all txs to be in the mempool of the second reactor; the other reactors should not
// receive any tx. (The second reactor only sends transactions to the first reactor.)
checkTxsInOrder(t, txs, reactors[0], 0)
checkTxsInOrder(t, txs, reactors[1], 0)

for _, r := range reactors[2:] {
require.Zero(t, r.mempool.Size())
}

// Disconnect the second reactor from the first reactor.
firstPeer := reactors[0].Switch.Peers().List()[0]
reactors[0].Switch.StopPeerGracefully(firstPeer)

// Now the third reactor should start receiving transactions from the first reactor; the fourth
// reactor's mempool should still be empty.
checkTxsInOrder(t, txs, reactors[0], 0)
checkTxsInOrder(t, txs, reactors[1], 0)
checkTxsInOrder(t, txs, reactors[2], 0)
for _, r := range reactors[3:] {
require.Zero(t, r.mempool.Size())
}
}

// Check that the mempool has exactly the given list of txs and, if it's not the
// first reactor (reactorIndex == 0), then each tx has a non-empty list of senders.
func checkTxsInMempoolAndSenders(t *testing.T, r *Reactor, txs types.Txs, reactorIndex int) {
Expand Down Expand Up @@ -438,6 +571,27 @@ func makeAndConnectReactors(config *cfg.Config, n int) ([]*Reactor, []*p2p.Switc
return reactors, switches
}

// connect N mempool reactors through N switches as a star centered in c
func makeAndConnectReactorsStar(config *cfg.Config, c, n int) ([]*Reactor, []*p2p.Switch) {
reactors := make([]*Reactor, n)
logger := mempoolLogger()
for i := 0; i < n; i++ {
app := kvstore.NewInMemoryApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

reactors[i] = NewReactor(config.Mempool, mempool, false) // so we dont start the consensus states
reactors[i].SetLogger(logger.With("validator", i))
}

switches := p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("MEMPOOL", reactors[i])
return s
}, p2p.ConnectStarSwitches(c))
return reactors, switches
}

func newUniqueTxs(n int) types.Txs {
txs := make(types.Txs, n)
for i := 0; i < n; i++ {
Expand Down
34 changes: 34 additions & 0 deletions p2p/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,40 @@ func Connect2Switches(switches []*Switch, i, j int) {
<-doneCh
}

// ConnectStartSwitches will connect switches c and j via net.Pipe().
func ConnectStarSwitches(c int) func([]*Switch, int, int) {
// Blocks until a connection is established.
// NOTE: caller ensures i and j is within bounds.
return func(switches []*Switch, i, j int) {
if i != c {
return
}

switchI := switches[i]
switchJ := switches[j]

c1, c2 := conn.NetPipe()

doneCh := make(chan struct{})
go func() {
err := switchI.addPeerWithConnection(c1)
if err != nil {
panic(err)
}
doneCh <- struct{}{}
}()
go func() {
err := switchJ.addPeerWithConnection(c2)
if err != nil {
panic(err)
}
doneCh <- struct{}{}
}()
<-doneCh
<-doneCh
}
}

func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
pc, err := testInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey)
if err != nil {
Expand Down
0