diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go new file mode 100644 index 00000000000..59ce4bcf8cd --- /dev/null +++ b/mempool/clist_iterator_test.go @@ -0,0 +1,402 @@ +package mempool + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + abciclimocks "github.com/cometbft/cometbft/abci/client/mocks" + "github.com/cometbft/cometbft/abci/example/kvstore" + abci "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/internal/test" + "github.com/cometbft/cometbft/proxy" + "github.com/cometbft/cometbft/types" +) + +func TestIteratorNonBlocking(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + cfg := test.ResetTestRoot("mempool_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + // Add all txs with id up to n. + n := 100 + for i := 0; i < n; i++ { + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, noSender) + require.NoError(t, err) + rr.Wait() + } + require.Equal(t, n, mp.Size()) + + iter := mp.NewWRRIterator() + expectedOrder := []int{ + 0, 11, 22, 33, 44, 55, 66, // lane 7 + 1, 2, 4, // lane 3 + 3, // lane 1 + 77, 88, 99, + 5, 7, 8, + 6, + 10, 13, 14, + 9, + 16, 17, 19, + 12, + 20, 23, 25, + 15, + } + + var next Entry + counter := 0 + + // Check that txs are picked by the iterator in the expected order. + for _, id := range expectedOrder { + next = iter.Next() + require.NotNil(t, next) + require.Equal(t, types.Tx(kvstore.NewTxFromID(id)), next.Tx(), "id=%v", id) + counter++ + } + + // Check that the rest of the entries are also consumed. + for { + if next = iter.Next(); next == nil { + break + } + counter++ + } + require.Equal(t, n, counter) +} + +func TestIteratorNonBlockingOneLane(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + cfg := test.ResetTestRoot("mempool_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + // Add all txs with id up to n to one lane. + n := 100 + for i := 0; i < n; i++ { + if i%11 != 0 { + continue + } + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, noSender) + require.NoError(t, err) + rr.Wait() + } + require.Equal(t, 10, mp.Size()) + + iter := mp.NewWRRIterator() + expectedOrder := []int{0, 11, 22, 33, 44, 55, 66, 77, 88, 99} + + var next Entry + counter := 0 + + // Check that txs are picked by the iterator in the expected order. + for _, id := range expectedOrder { + next = iter.Next() + require.NotNil(t, next) + require.Equal(t, types.Tx(kvstore.NewTxFromID(id)), next.Tx(), "id=%v", id) + counter++ + } + + next = iter.Next() + require.Nil(t, next) +} + +// We have two iterators fetching transactions that +// then get removed. +func TestIteratorRace(t *testing.T) { + mockClient := new(abciclimocks.Client) + mockClient.On("Start").Return(nil) + mockClient.On("SetLogger", mock.Anything) + mockClient.On("Error").Return(nil).Times(100) + mockClient.On("Info", mock.Anything, mock.Anything).Return(&abci.InfoResponse{LanePriorities: []uint32{1, 2, 3}, DefaultLanePriority: 1}, nil) + + mp, cleanup := newMempoolWithAppMock(mockClient) + defer cleanup() + + // Disable rechecking to make sure the recheck logic is not interferint. + mp.config.Recheck = false + + const numLanes = 3 + const numTxs = 100 + + var wg sync.WaitGroup + wg.Add(2) + + var counter atomic.Int64 + go func() { + waitForNumTxsInMempool(numTxs, mp) + + go func() { + defer wg.Done() + + for counter.Load() < int64(numTxs) { + iter := mp.NewBlockingWRRIterator() + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + tx := entry.Tx() + err := mp.Update(1, []types.Tx{tx}, abciResponses(1, 0), nil, nil) + require.NoError(t, err, tx) + counter.Add(1) + } + }() + + go func() { + defer wg.Done() + + for counter.Load() < int64(numTxs) { + iter := mp.NewBlockingWRRIterator() + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + tx := entry.Tx() + err := mp.Update(1, []types.Tx{tx}, abciResponses(1, 0), nil, nil) + require.NoError(t, err, tx) + counter.Add(1) + } + }() + }() + + // This was introduced because without a separate function + // we have to sleep to wait for all txs to get into the mempool. + // This way we loop in the function above until it is fool + // without arbitrary timeouts. + go func() { + for i := 1; i <= int(numTxs); i++ { + tx := kvstore.NewTxFromID(i) + + currLane := (i % numLanes) + 1 + reqRes := newReqResWithLanes(tx, abci.CodeTypeOK, abci.CHECK_TX_TYPE_CHECK, uint32(currLane)) + require.NotNil(t, reqRes) + + mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil).Once() + _, err := mp.CheckTx(tx, "") + require.NoError(t, err, err) + reqRes.InvokeCallback() + } + }() + + wg.Wait() + + require.Equal(t, counter.Load(), int64(numTxs+1)) +} + +func TestIteratorEmptyLanes(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + + cfg := test.ResetTestRoot("mempool_empty_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + go func() { + iter := mp.NewBlockingWRRIterator() + require.Zero(t, mp.Size()) + entry := <-iter.WaitNextCh() + require.NotNil(t, entry) + require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(1)) + }() + time.Sleep(100 * time.Millisecond) + + tx := kvstore.NewTxFromID(1) + res := abci.ToCheckTxResponse(&abci.CheckTxResponse{Code: abci.CodeTypeOK}) + mp.handleCheckTxResponse(tx, "")(res) + require.Equal(t, 1, mp.Size(), "pool size mismatch") +} + +// Without lanes transactions should be returned as they were +// submitted - increasing tx IDs. +func TestIteratorNoLanes(t *testing.T) { + app := kvstore.NewInMemoryApplication() + app.SetUseLanes(false) + cc := proxy.NewLocalClientCreator(app) + + cfg := test.ResetTestRoot("mempool_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + const n = numTxs + + var wg sync.WaitGroup + wg.Add(1) + + // Spawn a goroutine that iterates on the list until counting n entries. + counter := 0 + go func() { + defer wg.Done() + + iter := mp.NewBlockingWRRIterator() + for counter < n { + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(counter)) + counter++ + } + }() + + // Add n transactions with sequential ids. + for i := 0; i < n; i++ { + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, "") + require.NoError(t, err) + rr.Wait() + } + + wg.Wait() + require.Equal(t, n, counter) +} + +// TODO automate the lane numbers so we can change the number of lanes +// and increase the number of transactions. +func TestIteratorExactOrder(t *testing.T) { + mockClient := new(abciclimocks.Client) + mockClient.On("Start").Return(nil) + mockClient.On("SetLogger", mock.Anything) + mockClient.On("Error").Return(nil).Times(100) + mockClient.On("Info", mock.Anything, mock.Anything).Return(&abci.InfoResponse{LanePriorities: []uint32{1, 2, 3}, DefaultLanePriority: 1}, nil) + + mp, cleanup := newMempoolWithAppMock(mockClient) + defer cleanup() + + // Disable rechecking to make sure the recheck logic is not interferint. + mp.config.Recheck = false + + const numLanes = 3 + const numTxs = 11 + // Transactions are ordered into lanes by their IDs. This is the order in + // which they should appear following WRR + expectedTxIDs := []int{2, 5, 8, 1, 4, 3, 11, 7, 10, 6, 9} + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + waitForNumTxsInMempool(numTxs, mp) + t.Log("Mempool full, starting to pick up transactions", mp.Size()) + + iter := mp.NewBlockingWRRIterator() + for i := 0; i < numTxs; i++ { + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(expectedTxIDs[i])) + } + }() + + // This was introduced because without a separate function + // we have to sleep to wait for all txs to get into the mempool. + // This way we loop in the function above until it is fool + // without arbitrary timeouts. + go func() { + for i := 1; i <= numTxs; i++ { + tx := kvstore.NewTxFromID(i) + + currLane := (i % numLanes) + 1 + reqRes := newReqResWithLanes(tx, abci.CodeTypeOK, abci.CHECK_TX_TYPE_CHECK, uint32(currLane)) + require.NotNil(t, reqRes) + + mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil).Once() + _, err := mp.CheckTx(tx, "") + require.NoError(t, err, err) + reqRes.InvokeCallback() + } + }() + + wg.Wait() +} + +// This only tests that all transactions were submitted. +func TestIteratorCountOnly(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + + cfg := test.ResetTestRoot("mempool_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + var wg sync.WaitGroup + wg.Add(1) + + const n = numTxs + + // Spawn a goroutine that iterates on the list until counting n entries. + counter := 0 + go func() { + defer wg.Done() + + iter := mp.NewBlockingWRRIterator() + for counter < n { + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + counter++ + } + }() + + // Add n transactions with sequential ids. + for i := 0; i < n; i++ { + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, "") + require.NoError(t, err) + rr.Wait() + } + + wg.Wait() + require.Equal(t, n, counter) +} + +func TestReapOrderMatchesGossipOrder(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + mp, cleanup := newMempoolWithApp(cc) + defer cleanup() + + const n = 10 + + // Add a bunch of txs. + for i := 1; i <= n; i++ { + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, "") + require.NoError(t, err, err) + rr.Wait() + } + require.Equal(t, n, mp.Size()) + + gossipIter := mp.NewBlockingWRRIterator() + reapIter := mp.NewWRRIterator() + + // Check that both iterators return the same entry as in the reaped txs. + txs := make([]types.Tx, n) + reapedTxs := mp.ReapMaxTxs(n) + for i, reapedTx := range reapedTxs { + entry := <-gossipIter.WaitNextCh() + // entry can be nil only when an entry is removed concurrently. + require.NotNil(t, entry) + gossipTx := entry.Tx() + + reapTx := reapIter.Next().Tx() + txs[i] = reapTx + + require.EqualValues(t, reapTx, gossipTx) + require.EqualValues(t, reapTx, reapedTx) + } + require.EqualValues(t, txs, reapedTxs) + + err := mp.Update(1, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil) + require.NoError(t, err) + require.Zero(t, mp.Size()) +} diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 4b227f27c20..15f818714d3 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -64,6 +64,8 @@ type CListMempool struct { defaultLane types.Lane sortedLanes []types.Lane // lanes sorted by priority + reapIter *NonBlockingWRRIterator + // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. cache TxCache @@ -118,6 +120,8 @@ func NewCListMempool( slices.Reverse(mp.sortedLanes) } + mp.reapIter = mp.NewWRRIterator() + if cfg.CacheSize > 0 { mp.cache = NewLRUTxCache(cfg.CacheSize) } else { @@ -610,31 +614,32 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { // size per tx, and set the initial capacity based off of that. // txs := make([]types.Tx, 0, cmtmath.MinInt(mem.Size(), max/mem.avgTxSize)) txs := make([]types.Tx, 0, mem.Size()) - for _, lane := range mem.sortedLanes { - for e := mem.lanes[lane].Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) - - txs = append(txs, memTx.tx) + mem.reapIter.Reset(mem.lanes) + for { + memTx := mem.reapIter.Next() + if memTx == nil { + break + } + txs = append(txs, memTx.Tx()) - dataSize := types.ComputeProtoSizeForTxs([]types.Tx{memTx.tx}) + dataSize := types.ComputeProtoSizeForTxs([]types.Tx{memTx.Tx()}) - // Check total size requirement - if maxBytes > -1 && runningSize+dataSize > maxBytes { - return txs[:len(txs)-1] - } + // Check total size requirement + if maxBytes > -1 && runningSize+dataSize > maxBytes { + return txs[:len(txs)-1] + } - runningSize += dataSize + runningSize += dataSize - // Check total gas requirement. - // If maxGas is negative, skip this check. - // Since newTotalGas < masGas, which - // must be non-negative, it follows that this won't overflow. - newTotalGas := totalGas + memTx.gasWanted - if maxGas > -1 && newTotalGas > maxGas { - return txs[:len(txs)-1] - } - totalGas = newTotalGas + // Check total gas requirement. + // If maxGas is negative, skip this check. + // Since newTotalGas < masGas, which + // must be non-negative, it follows that this won't overflow. + newTotalGas := totalGas + memTx.GasWanted() + if maxGas > -1 && newTotalGas > maxGas { + return txs[:len(txs)-1] } + totalGas = newTotalGas } return txs } @@ -649,11 +654,13 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { } txs := make([]types.Tx, 0, cmtmath.MinInt(mem.Size(), max)) - for _, lane := range mem.sortedLanes { - for e := mem.lanes[lane].Front(); e != nil && len(txs) <= max; e = e.Next() { - memTx := e.Value.(*mempoolTx) - txs = append(txs, memTx.tx) + mem.reapIter.Reset(mem.lanes) + for len(txs) <= max { + memTx := mem.reapIter.Next() + if memTx == nil { + break } + txs = append(txs, memTx.Tx()) } return txs } @@ -888,20 +895,103 @@ func (rc *recheck) consideredFull() bool { return rc.recheckFull.Load() } -// CListIterator implements an Iterator that traverses the lanes with the classical Weighted Round -// Robin (WRR) algorithm. -type CListIterator struct { - mp *CListMempool // to wait on and retrieve the first entry - currentLaneIndex int // current lane being iterated; index on mp.sortedLanes - counters map[types.Lane]uint32 // counters of accessed entries for WRR algorithm - cursors map[types.Lane]*clist.CElement // last accessed entries on each lane +// WRRIterator is the base struct for implementing iterators that traverse lanes with +// the classical Weighted Round Robin (WRR) algorithm. +type WRRIterator struct { + sortedLanes []types.Lane + laneIndex int // current lane being iterated; index on sortedLanes + counters map[types.Lane]uint // counters of consumed entries, for WRR algorithm + cursors map[types.Lane]*clist.CElement // last accessed entries on each lane +} + +func (iter *WRRIterator) nextLane() types.Lane { + iter.laneIndex = (iter.laneIndex + 1) % len(iter.sortedLanes) + return iter.sortedLanes[iter.laneIndex] +} + +// Non-blocking version of the WRR iterator to be used for reaping and +// rechecking transactions. +// +// Lock must be held on the mempool when iterating: the mempool cannot be +// modified while iterating. +type NonBlockingWRRIterator struct { + WRRIterator +} + +func (mem *CListMempool) NewWRRIterator() *NonBlockingWRRIterator { + baseIter := WRRIterator{ + sortedLanes: mem.sortedLanes, + counters: make(map[types.Lane]uint, len(mem.lanes)), + cursors: make(map[types.Lane]*clist.CElement, len(mem.lanes)), + } + iter := &NonBlockingWRRIterator{ + WRRIterator: baseIter, + } + iter.Reset(mem.lanes) + return iter +} + +// Reset must be called before every use of the iterator. +func (iter *NonBlockingWRRIterator) Reset(lanes map[types.Lane]*clist.CList) { + iter.laneIndex = 0 + for i := range iter.counters { + iter.counters[i] = 0 + } + // Set cursors at the beginning of each lane. + for lane := range lanes { + iter.cursors[lane] = lanes[lane].Front() + } +} + +// Next returns the next element according to the WRR algorithm. +func (iter *NonBlockingWRRIterator) Next() Entry { + lane := iter.sortedLanes[iter.laneIndex] + numEmptyLanes := 0 + for { + // Skip empty lane or if cursor is at end of lane. + if iter.cursors[lane] == nil { + numEmptyLanes++ + if numEmptyLanes >= len(iter.sortedLanes) { + return nil + } + lane = iter.nextLane() + continue + } + // Skip over-consumed lane. + if iter.counters[lane] >= uint(lane) { + iter.counters[lane] = 0 + numEmptyLanes = 0 + lane = iter.nextLane() + continue + } + break + } + elem := iter.cursors[lane] + if elem == nil { + panic(fmt.Errorf("Iterator picked a nil entry on lane %d", lane)) + } + iter.cursors[lane] = iter.cursors[lane].Next() + iter.counters[lane]++ + return elem.Value.(*mempoolTx) } -func (mem *CListMempool) NewIterator() Iterator { - return &CListIterator{ - mp: mem, - counters: make(map[types.Lane]uint32, len(mem.sortedLanes)), - cursors: make(map[types.Lane]*clist.CElement, len(mem.sortedLanes)), +// BlockingWRRIterator implements a blocking version of the WRR iterator, +// meaning that when no transaction is available, it will wait until a new one +// is added to the mempool. +type BlockingWRRIterator struct { + WRRIterator + mp *CListMempool +} + +func (mem *CListMempool) NewBlockingWRRIterator() Iterator { + iter := WRRIterator{ + sortedLanes: mem.sortedLanes, + counters: make(map[types.Lane]uint, len(mem.sortedLanes)), + cursors: make(map[types.Lane]*clist.CElement, len(mem.sortedLanes)), + } + return &BlockingWRRIterator{ + WRRIterator: iter, + mp: mem, } } @@ -910,7 +1000,7 @@ func (mem *CListMempool) NewIterator() Iterator { // the list. // // Unsafe for concurrent use by multiple goroutines. -func (iter *CListIterator) WaitNextCh() <-chan Entry { +func (iter *BlockingWRRIterator) WaitNextCh() <-chan Entry { ch := make(chan Entry) go func() { // Add the next entry to the channel if not nil. @@ -925,14 +1015,15 @@ func (iter *CListIterator) WaitNextCh() <-chan Entry { return ch } -// PickLane returns a _valid_ lane on which to iterate, according to the WRR algorithm. A lane is -// valid if it is not empty or the number of accessed entries in the lane has not yet reached its -// priority value. -func (iter *CListIterator) PickLane() types.Lane { +// PickLane returns a _valid_ lane on which to iterate, according to the WRR +// algorithm. A lane is valid if it is not empty or it is not over-consumed, +// meaning that the number of accessed entries in the lane has not yet reached +// its priority value in the current WRR iteration. +func (iter *BlockingWRRIterator) PickLane() types.Lane { // Loop until finding a valid lanes // If the current lane is not valid, continue with the next lane with lower priority, in a // round robin fashion. - lane := iter.mp.sortedLanes[iter.currentLaneIndex] + lane := iter.sortedLanes[iter.laneIndex] iter.mp.addTxChMtx.RLock() defer iter.mp.addTxChMtx.RUnlock() @@ -943,10 +1034,9 @@ func (iter *CListIterator) PickLane() types.Lane { (iter.cursors[lane] != nil && iter.cursors[lane].Value.(*mempoolTx).seq == iter.mp.addTxLaneSeqs[lane]) { prevLane := lane - iter.currentLaneIndex = (iter.currentLaneIndex + 1) % len(iter.mp.sortedLanes) - lane = iter.mp.sortedLanes[iter.currentLaneIndex] + lane = iter.nextLane() nIter++ - if nIter >= len(iter.mp.sortedLanes) { + if nIter >= len(iter.sortedLanes) { ch := iter.mp.addTxCh iter.mp.addTxChMtx.RUnlock() iter.mp.logger.Info("YYY PickLane, bef block", "lane", lane, "prevLane", prevLane) @@ -959,12 +1049,11 @@ func (iter *CListIterator) PickLane() types.Lane { continue } - if iter.counters[lane] >= uint32(lane) { + if iter.counters[lane] >= uint(lane) { // Reset the counter only when the limit on the lane was reached. iter.counters[lane] = 0 - iter.currentLaneIndex = (iter.currentLaneIndex + 1) % len(iter.mp.sortedLanes) prevLane := lane - lane = iter.mp.sortedLanes[iter.currentLaneIndex] + lane = iter.nextLane() nIter = 0 iter.mp.logger.Info("YYY PickLane, skipped lane 2", "lane", prevLane, "new Lane ", lane) continue @@ -975,7 +1064,7 @@ func (iter *CListIterator) PickLane() types.Lane { } } -// Next implements the classical Weighted Round Robin (WRR) algorithm. +// Next returns the next element according to the WRR algorithm. // // In classical WRR, the iterator cycles over the lanes. When a lane is selected, Next returns an // entry from the selected lane. On subsequent calls, Next will return the next entries from the @@ -984,7 +1073,7 @@ func (iter *CListIterator) PickLane() types.Lane { // // TODO: Note that this code does not block waiting for an available entry on a CList or a CElement, as // was the case on the original code. Is this the best way to do it? -func (iter *CListIterator) Next() *clist.CElement { +func (iter *BlockingWRRIterator) Next() *clist.CElement { lane := iter.PickLane() // Load the last accessed entry in the lane and set the next one. var next *clist.CElement diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 9f60f5a7863..86826f3e149 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -1,7 +1,6 @@ package mempool import ( - "bytes" "context" "encoding/binary" "errors" @@ -10,7 +9,6 @@ import ( "os" "strconv" "sync" - "sync/atomic" "testing" "time" @@ -173,8 +171,8 @@ func TestReapMaxBytesMaxGas(t *testing.T) { // Ensure gas calculation behaves as expected checkTxs(t, mp, 1) - iter := mp.NewIterator() - tx0 := <-iter.WaitNextCh() + iter := mp.NewWRRIterator() + tx0 := iter.Next() require.NotNil(t, tx0) require.Equal(t, tx0.GasWanted(), int64(1), "transactions gas was set incorrectly") // ensure each tx is 20 bytes long @@ -838,126 +836,6 @@ func TestMempoolConcurrentUpdateAndReceiveCheckTxResponse(t *testing.T) { } } -// We have two iterators fetching transactions that -// then get removed. -func TestMempoolIteratorRace(t *testing.T) { - mockClient := new(abciclimocks.Client) - mockClient.On("Start").Return(nil) - mockClient.On("SetLogger", mock.Anything) - mockClient.On("Error").Return(nil).Times(100) - mockClient.On("Info", mock.Anything, mock.Anything).Return(&abci.InfoResponse{LanePriorities: []uint32{1, 2, 3}, DefaultLanePriority: 1}, nil) - - mp, cleanup := newMempoolWithAppMock(mockClient) - defer cleanup() - - // Disable rechecking to make sure the recheck logic is not interferint. - mp.config.Recheck = false - - numLanes := 3 - n := int64(100) // Number of transactions - - var wg sync.WaitGroup - - wg.Add(2) - var counter atomic.Int64 - go func() { - // Wait for at least some transactions to get into the mempool - for mp.Size() < int(n) { - time.Sleep(time.Second) - } - fmt.Println("mempool height ", mp.height.Load()) - - go func() { - defer wg.Done() - - for counter.Load() < n { - iter := mp.NewIterator() - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - tx := entry.Tx() - - txs := []types.Tx{tx} - - resp := abciResponses(1, 0) - err := mp.Update(1, txs, resp, nil, nil) - - require.NoError(t, err, tx) - counter.Add(1) - } - }() - - go func() { - defer wg.Done() - - for counter.Load() < n { - iter := mp.NewIterator() - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - tx := entry.Tx() - - txs := []types.Tx{tx} - resp := abciResponses(1, 0) - err := mp.Update(1, txs, resp, nil, nil) - - require.NoError(t, err) - counter.Add(1) - } - }() - }() - - // This was introduced because without a separate function - // we have to sleep to wait for all txs to get into the mempool. - // This way we loop in the function above until it is fool - // without arbitrary timeouts. - go func() { - for i := 1; i <= int(n); i++ { - tx := kvstore.NewTxFromID(i) - - currLane := (i % numLanes) + 1 - reqRes := newReqResWithLanes(tx, abci.CodeTypeOK, abci.CHECK_TX_TYPE_CHECK, uint32(currLane)) - require.NotNil(t, reqRes) - - mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil).Once() - _, err := mp.CheckTx(tx, "") - require.NoError(t, err, err) - reqRes.InvokeCallback() - } - }() - - wg.Wait() - - require.Equal(t, counter.Load(), n+1) -} - -func TestMempoolEmptyLanes(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - - cfg := test.ResetTestRoot("mempool_empty_test") - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - go func() { - iter := mp.NewIterator() - require.Equal(t, 0, mp.Size()) - entry := <-iter.WaitNextCh() - - require.NotNil(t, entry) - tx := entry.Tx() - - require.True(t, bytes.Equal(tx, kvstore.NewTxFromID(1))) - }() - time.Sleep(time.Second * 2) - tx := kvstore.NewTxFromID(1) - res := abci.ToCheckTxResponse(&abci.CheckTxResponse{Code: abci.CodeTypeOK}) - mp.handleCheckTxResponse(tx, "")(res) - require.Equal(t, 1, mp.Size(), "pool size mismatch") -} - func TestMempoolNotifyTxsAvailable(t *testing.T) { app := kvstore.NewInMemoryApplication() cc := proxy.NewLocalClientCreator(app) @@ -1200,161 +1078,6 @@ func TestMempoolConcurrentCheckTxAndUpdate(t *testing.T) { require.Zero(t, mp.Size()) } -// TODO automate the lane numbers so we can change the number of lanes -// and increase the number of transactions. -func TestMempoolIteratorExactOrder(t *testing.T) { - mockClient := new(abciclimocks.Client) - mockClient.On("Start").Return(nil) - mockClient.On("SetLogger", mock.Anything) - mockClient.On("Error").Return(nil).Times(100) - mockClient.On("Info", mock.Anything, mock.Anything).Return(&abci.InfoResponse{LanePriorities: []uint32{1, 2, 3}, DefaultLanePriority: 1}, nil) - - mp, cleanup := newMempoolWithAppMock(mockClient) - defer cleanup() - - // Disable rechecking to make sure the recheck logic is not interferint. - mp.config.Recheck = false - - numLanes := 3 - n := 11 // Number of transactions - // Transactions are ordered into lanes by their IDs - // This is the order in which they should appear following WRR - localSortedLanes := []int{2, 5, 8, 1, 4, 3, 11, 7, 10, 6, 9} - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for mp.Size() < n { - time.Sleep(time.Second) - } - t.Log("Mempool full, starting to pick up transactions", mp.Size()) - - iter := mp.NewIterator() - - counter := 0 - for counter < n { - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - tx := entry.Tx() - - txLocal := kvstore.NewTxFromID(localSortedLanes[counter]) - - require.True(t, bytes.Equal(tx, txLocal)) - - counter++ - } - }() - - // This was introduced because without a separate function - // we have to sleep to wait for all txs to get into the mempool. - // This way we loop in the function above until it is fool - // without arbitrary timeouts. - go func() { - for i := 1; i <= n; i++ { - tx := kvstore.NewTxFromID(i) - - currLane := (i % numLanes) + 1 - reqRes := newReqResWithLanes(tx, abci.CodeTypeOK, abci.CHECK_TX_TYPE_CHECK, uint32(currLane)) - require.NotNil(t, reqRes) - - mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil).Once() - _, err := mp.CheckTx(tx, "") - require.NoError(t, err, err) - reqRes.InvokeCallback() - } - }() - - wg.Wait() -} - -// This only tests that all transactions were submitted. -func TestMempoolIteratorCountOnly(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - - cfg := test.ResetTestRoot("mempool_test") - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - var wg sync.WaitGroup - wg.Add(1) - - n := numTxs - - // Spawn a goroutine that iterates on the list until counting n entries. - counter := 0 - go func() { - defer wg.Done() - - iter := mp.NewIterator() - for counter < n { - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - counter++ - } - }() - - // Add n transactions with sequential ids. - for i := 0; i < n; i++ { - tx := kvstore.NewTxFromID(i) - rr, err := mp.CheckTx(tx, "") - require.NoError(t, err) - rr.Wait() - } - - wg.Wait() - require.Equal(t, n, counter) -} - -// Without lanes transactions should be returned as they were -// submitted - increasing tx IDs. -func TestMempoolIteratorNoLanes(t *testing.T) { - app := kvstore.NewInMemoryApplication() - app.SetUseLanes(false) - cc := proxy.NewLocalClientCreator(app) - - cfg := test.ResetTestRoot("mempool_test") - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - var wg sync.WaitGroup - wg.Add(1) - - n := 1000 // numTxs - - // Spawn a goroutine that iterates on the list until counting n entries. - counter := 0 - go func() { - defer wg.Done() - - iter := mp.NewIterator() - for counter < n { - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - require.True(t, bytes.Equal(kvstore.NewTxFromID(counter), entry.Tx())) - - counter++ - } - }() - - // Add n transactions with sequential ids. - for i := 0; i < n; i++ { - tx := kvstore.NewTxFromID(i) - rr, err := mp.CheckTx(tx, "") - require.NoError(t, err) - rr.Wait() - } - - wg.Wait() - require.Equal(t, n, counter) -} - func newMempoolWithAsyncConnection(tb testing.TB) (*CListMempool, cleanupFunc) { tb.Helper() sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", cmtrand.Str(6)) diff --git a/mempool/reactor.go b/mempool/reactor.go index 3a2d03b0870..95c9d09865b 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -217,7 +217,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { } } - iter := memR.mempool.NewIterator() + iter := memR.mempool.NewBlockingWRRIterator() var entry Entry for { // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 113987e2b78..d3372e6bb62 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -614,6 +614,7 @@ func checkTxsInOrder(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex // Check that all transactions in the mempool are in the same order as txs. reapedTxs := reactor.mempool.ReapMaxTxs(len(txs)) + require.Equal(t, len(txs), len(reapedTxs)) for i, tx := range txs { assert.Equalf(t, tx, reapedTxs[i], "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i])