From 4c8d5f8495ed88d79f81cdf63555e2d8d57ad2e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hern=C3=A1n=20Vanzetto?= <15466498+hvanz@users.noreply.github.com> Date: Mon, 19 Aug 2024 15:08:11 +0300 Subject: [PATCH 01/14] feat(mempool/lanes): Store lane in `mempoolTx` and remove `txsLanes` (#3756) We don't need to store the lane in a separate map. We can just extend `mempoolTx` with a `lane` field. --- #### PR checklist - [ ] Tests written/updated - [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments --- mempool/clist_mempool.go | 22 +++++++--------------- mempool/errors.go | 3 --- mempool/mempoolTx.go | 1 + 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 751ac131fc1..e73ef5cb0ff 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -52,7 +52,6 @@ type CListMempool struct { txsMtx cmtsync.RWMutex lanes map[types.Lane]*clist.CList // each lane is a linked-list of (valid) txs txsMap map[types.TxKey]*clist.CElement // for quick access to the mempool entry of a given tx - txLanes map[types.TxKey]types.Lane // for quick access to the lane of a given tx txsBytes int64 // total size of mempool, in bytes numTxs int64 // total number of txs in the mempool @@ -91,7 +90,6 @@ func NewCListMempool( config: cfg, proxyAppConn: proxyAppConn, txsMap: make(map[types.TxKey]*clist.CElement), - txLanes: make(map[types.TxKey]types.Lane), recheck: &recheck{}, logger: log.NewNopLogger(), metrics: NopMetrics(), @@ -160,7 +158,6 @@ func (mem *CListMempool) removeAllTxs(lane types.Lane) { e.DetachPrev() } mem.txsMap = make(map[types.TxKey]*clist.CElement) - mem.txLanes = make(map[types.TxKey]types.Lane) mem.txsBytes = 0 } @@ -453,12 +450,12 @@ func (mem *CListMempool) addTx(memTx *mempoolTx, sender p2p.ID, lane types.Lane) // Add new transaction. _ = memTx.addSender(sender) + memTx.lane = lane e := txs.PushBack(memTx) mem.addTxLaneSeqs[lane] = mem.addTxSeq // Update auxiliary variables. mem.txsMap[txKey] = e - mem.txLanes[txKey] = lane // Update size variables. mem.txsBytes += int64(len(tx)) @@ -495,29 +492,24 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { return ErrTxNotFound } - lane, ok := mem.txLanes[txKey] - if !ok { - return ErrLaneNotFound - } + memTx := elem.Value.(*mempoolTx) // Remove tx from lane. - mem.lanes[lane].Remove(elem) + mem.lanes[memTx.lane].Remove(elem) elem.DetachPrev() // Update auxiliary variables. delete(mem.txsMap, txKey) - delete(mem.txLanes, txKey) // Update size variables. - tx := elem.Value.(*mempoolTx).tx - mem.txsBytes -= int64(len(tx)) + mem.txsBytes -= int64(len(memTx.tx)) mem.numTxs-- mem.logger.Debug( "Removed transaction", - "tx", tx.Hash(), - "lane", lane, - "lane size", mem.lanes[lane].Len(), + "tx", memTx.tx.Hash(), + "lane", memTx.lane, + "lane size", mem.lanes[memTx.lane].Len(), "height", mem.height.Load(), "total", mem.numTxs, ) diff --git a/mempool/errors.go b/mempool/errors.go index a384613aae8..4b912bd4765 100644 --- a/mempool/errors.go +++ b/mempool/errors.go @@ -11,9 +11,6 @@ var ErrTxNotFound = errors.New("transaction not found in mempool") // ErrTxInCache is returned to the client if we saw tx earlier. var ErrTxInCache = errors.New("tx already exists in cache") -// ErrLaneNotFound is returned to the client when a lane is not found. -var ErrLaneNotFound = errors.New("lane not found in mempool") - // ErrTxAlreadyReceivedFromSender is returned if when processing a tx already // received from the same sender. var ErrTxAlreadyReceivedFromSender = errors.New("tx already received from the same sender") diff --git a/mempool/mempoolTx.go b/mempool/mempoolTx.go index cf6015d45a4..94b466b9b13 100644 --- a/mempool/mempoolTx.go +++ b/mempool/mempoolTx.go @@ -13,6 +13,7 @@ type mempoolTx struct { height int64 // height that this tx had been validated in gasWanted int64 // amount of gas this tx states it will require tx types.Tx // validated by the application + lane types.Lane seq int64 // ids of peers who've sent us this tx (as a map for quick lookups). From 1dc1651136f2bf48fadf926813c9d719eb913f49 Mon Sep 17 00:00:00 2001 From: hvanz Date: Thu, 22 Aug 2024 18:07:54 +0300 Subject: [PATCH 02/14] Add `ReapIterator` --- mempool/clist_iterator_test.go | 105 ++++++++++++++++++++++++++++++++ mempool/clist_mempool.go | 106 ++++++++++++++++++++++++++------- mempool/reactor_test.go | 1 + 3 files changed, 189 insertions(+), 23 deletions(-) create mode 100644 mempool/clist_iterator_test.go diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go new file mode 100644 index 00000000000..827a441a9a8 --- /dev/null +++ b/mempool/clist_iterator_test.go @@ -0,0 +1,105 @@ +package mempool + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/cometbft/cometbft/abci/example/kvstore" + "github.com/cometbft/cometbft/internal/clist" + "github.com/cometbft/cometbft/internal/test" + "github.com/cometbft/cometbft/proxy" + "github.com/cometbft/cometbft/types" +) + +func TestReapIterator(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + cfg := test.ResetTestRoot("mempool_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + // Add all txs with id up to n. + n := 100 + for i := 0; i < n; i++ { + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, noSender) + require.NoError(t, err) + rr.Wait() + } + require.Equal(t, n, mp.Size()) + + iter := mp.NewReapIterator() + expectedOrder := []int{ + 0, 11, 22, 33, 44, 55, 66, // lane 7 + 1, 2, 4, // lane 3 + 3, // lane 1 + 77, 88, 99, + 5, 7, 8, + 6, + 10, 13, 14, + 9, + 16, 17, 19, + 12, + 20, 23, 25, + 15, + } + + var next *clist.CElement + counter := 0 + + // Check that txs are picked by the iterator in the expected order. + for _, id := range expectedOrder { + next = iter.Next() + require.NotNil(t, next) + require.Equal(t, types.Tx(kvstore.NewTxFromID(id)), next.Value.(*mempoolTx).Tx(), "id=%v", id) + counter++ + } + + // Check that the rest of the entries are also consumed. + for { + if next = iter.Next(); next == nil { + break + } + counter++ + } + require.Equal(t, n, counter) +} + +func TestReapIteratorOneLane(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + cfg := test.ResetTestRoot("mempool_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + // Add all txs with id up to n to one lane. + n := 100 + for i := 0; i < n; i++ { + if i%11 != 0 { + continue + } + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, noSender) + require.NoError(t, err) + rr.Wait() + } + require.Equal(t, 10, mp.Size()) + + iter := mp.NewReapIterator() + expectedOrder := []int{0, 11, 22, 33, 44, 55, 66, 77, 88, 99} + + var next *clist.CElement + counter := 0 + + // Check that txs are picked by the iterator in the expected order. + for _, id := range expectedOrder { + next = iter.Next() + require.NotNil(t, next) + require.Equal(t, types.Tx(kvstore.NewTxFromID(id)), next.Value.(*mempoolTx).Tx(), "id=%v", id) + counter++ + } + + next = iter.Next() + require.Nil(t, next) +} diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index e73ef5cb0ff..77cc1794766 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -611,31 +611,34 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { // size per tx, and set the initial capacity based off of that. // txs := make([]types.Tx, 0, cmtmath.MinInt(mem.Size(), max/mem.avgTxSize)) txs := make([]types.Tx, 0, mem.Size()) - for _, lane := range mem.sortedLanes { - for e := mem.lanes[lane].Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) + iter := mem.NewReapIterator() + for { + elem := iter.Next() + if elem == nil { + break + } + memTx := elem.Value.(*mempoolTx) - txs = append(txs, memTx.tx) + txs = append(txs, memTx.tx) - dataSize := types.ComputeProtoSizeForTxs([]types.Tx{memTx.tx}) + dataSize := types.ComputeProtoSizeForTxs([]types.Tx{memTx.tx}) - // Check total size requirement - if maxBytes > -1 && runningSize+dataSize > maxBytes { - return txs[:len(txs)-1] - } + // Check total size requirement + if maxBytes > -1 && runningSize+dataSize > maxBytes { + return txs[:len(txs)-1] + } - runningSize += dataSize + runningSize += dataSize - // Check total gas requirement. - // If maxGas is negative, skip this check. - // Since newTotalGas < masGas, which - // must be non-negative, it follows that this won't overflow. - newTotalGas := totalGas + memTx.gasWanted - if maxGas > -1 && newTotalGas > maxGas { - return txs[:len(txs)-1] - } - totalGas = newTotalGas + // Check total gas requirement. + // If maxGas is negative, skip this check. + // Since newTotalGas < masGas, which + // must be non-negative, it follows that this won't overflow. + newTotalGas := totalGas + memTx.gasWanted + if maxGas > -1 && newTotalGas > maxGas { + return txs[:len(txs)-1] } + totalGas = newTotalGas } return txs } @@ -650,11 +653,14 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { } txs := make([]types.Tx, 0, cmtmath.MinInt(mem.Size(), max)) - for _, lane := range mem.sortedLanes { - for e := mem.lanes[lane].Front(); e != nil && len(txs) <= max; e = e.Next() { - memTx := e.Value.(*mempoolTx) - txs = append(txs, memTx.tx) + iter := mem.NewReapIterator() + for len(txs) <= max { + elem := iter.Next() + if elem == nil { + break } + memTx := elem.Value.(*mempoolTx) + txs = append(txs, memTx.tx) } return txs } @@ -911,6 +917,60 @@ func (rc *recheck) consideredFull() bool { return rc.recheckFull.Load() } +// Lock must be held on mempool: it cannot be modified while iterating. +type ReapIterator struct { + sortedLanes []types.Lane + laneIndex int // current lane being iterated; index on sortedLanes + counters map[types.Lane]int // counters of consumed entries, for WRR algorithm + cursors map[types.Lane]*clist.CElement // last accessed entries on each lane +} + +func (mem *CListMempool) NewReapIterator() *ReapIterator { + // Set cursors at the beginning of each lane. + cursors := make(map[types.Lane]*clist.CElement, len(mem.sortedLanes)) + for _, lane := range mem.sortedLanes { + cursors[lane] = mem.lanes[lane].Front() + } + return &ReapIterator{ + sortedLanes: mem.sortedLanes, + counters: make(map[types.Lane]int, len(mem.sortedLanes)), + cursors: cursors, + } +} + +func (iter *ReapIterator) nextLane() types.Lane { + iter.laneIndex = (iter.laneIndex + 1) % len(iter.sortedLanes) + return iter.sortedLanes[iter.laneIndex] +} + +func (iter *ReapIterator) Next() *clist.CElement { + lane := iter.sortedLanes[iter.laneIndex] + numEmptyLanes := 0 + for { + // Skip empty lane or if cursor is at end of lane + if iter.cursors[lane] == nil { + numEmptyLanes++ + if numEmptyLanes >= len(iter.sortedLanes) { + return nil + } + lane = iter.nextLane() + continue + } + // Skip over-consumed lane. + if iter.counters[lane] >= int(lane) { + iter.counters[lane] = 0 + numEmptyLanes = 0 + lane = iter.nextLane() + continue + } + break + } + elem := iter.cursors[lane] + iter.cursors[lane] = iter.cursors[lane].Next() + iter.counters[lane]++ + return elem +} + // CListIterator implements an Iterator that traverses the lanes with the classical Weighted Round // Robin (WRR) algorithm. type CListIterator struct { diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 0f001472b08..d35a9233fa0 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -604,6 +604,7 @@ func checkTxsInOrder(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex // Check that all transactions in the mempool are in the same order as txs. reapedTxs := reactor.mempool.ReapMaxTxs(len(txs)) + require.Equal(t, len(txs), len(reapedTxs)) for i, tx := range txs { assert.Equalf(t, tx, reapedTxs[i], "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i]) From d578a658ed519a08c3162f3a53ff31d20c49184a Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 10:52:32 +0200 Subject: [PATCH 03/14] Add ; renaming the other iterators --- mempool/clist_iterator_test.go | 8 +-- mempool/clist_mempool.go | 89 +++++++++++++++++++--------------- mempool/clist_mempool_test.go | 14 +++--- mempool/reactor.go | 2 +- 4 files changed, 62 insertions(+), 51 deletions(-) diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go index 827a441a9a8..2ff80221410 100644 --- a/mempool/clist_iterator_test.go +++ b/mempool/clist_iterator_test.go @@ -12,7 +12,7 @@ import ( "github.com/cometbft/cometbft/types" ) -func TestReapIterator(t *testing.T) { +func TestIteratorNonBlocking(t *testing.T) { app := kvstore.NewInMemoryApplication() cc := proxy.NewLocalClientCreator(app) cfg := test.ResetTestRoot("mempool_test") @@ -29,7 +29,7 @@ func TestReapIterator(t *testing.T) { } require.Equal(t, n, mp.Size()) - iter := mp.NewReapIterator() + iter := mp.NewWRRIterator() expectedOrder := []int{ 0, 11, 22, 33, 44, 55, 66, // lane 7 1, 2, 4, // lane 3 @@ -66,7 +66,7 @@ func TestReapIterator(t *testing.T) { require.Equal(t, n, counter) } -func TestReapIteratorOneLane(t *testing.T) { +func TestIteratorNonBlockingOneLane(t *testing.T) { app := kvstore.NewInMemoryApplication() cc := proxy.NewLocalClientCreator(app) cfg := test.ResetTestRoot("mempool_test") @@ -86,7 +86,7 @@ func TestReapIteratorOneLane(t *testing.T) { } require.Equal(t, 10, mp.Size()) - iter := mp.NewReapIterator() + iter := mp.NewWRRIterator() expectedOrder := []int{0, 11, 22, 33, 44, 55, 66, 77, 88, 99} var next *clist.CElement diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index c3f0325908a..1bedf590257 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -610,7 +610,7 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { // size per tx, and set the initial capacity based off of that. // txs := make([]types.Tx, 0, cmtmath.MinInt(mem.Size(), max/mem.avgTxSize)) txs := make([]types.Tx, 0, mem.Size()) - iter := mem.NewReapIterator() + iter := mem.NewWRRIterator() for { elem := iter.Next() if elem == nil { @@ -652,7 +652,7 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { } txs := make([]types.Tx, 0, cmtmath.MinInt(mem.Size(), max)) - iter := mem.NewReapIterator() + iter := mem.NewWRRIterator() for len(txs) <= max { elem := iter.Next() if elem == nil { @@ -894,37 +894,48 @@ func (rc *recheck) consideredFull() bool { return rc.recheckFull.Load() } -// Lock must be held on mempool: it cannot be modified while iterating. -type ReapIterator struct { +// WRRIterator is the base struct for implementing iterators that traverse lanes with +// the classical Weighted Round Robin (WRR) algorithm. +type WRRIterator struct { sortedLanes []types.Lane laneIndex int // current lane being iterated; index on sortedLanes - counters map[types.Lane]int // counters of consumed entries, for WRR algorithm + counters map[types.Lane]uint // counters of consumed entries, for WRR algorithm cursors map[types.Lane]*clist.CElement // last accessed entries on each lane } -func (mem *CListMempool) NewReapIterator() *ReapIterator { +func (iter *WRRIterator) nextLane() types.Lane { + iter.laneIndex = (iter.laneIndex + 1) % len(iter.sortedLanes) + return iter.sortedLanes[iter.laneIndex] +} + +// Non-blocking version of a WRR iterator. +// +// Lock must be held to update mempool: it cannot be modified while iterating. +type NonBlockingWRRIterator struct { + WRRIterator +} + +func (mem *CListMempool) NewWRRIterator() *NonBlockingWRRIterator { // Set cursors at the beginning of each lane. cursors := make(map[types.Lane]*clist.CElement, len(mem.sortedLanes)) for _, lane := range mem.sortedLanes { cursors[lane] = mem.lanes[lane].Front() } - return &ReapIterator{ + iter := WRRIterator{ sortedLanes: mem.sortedLanes, - counters: make(map[types.Lane]int, len(mem.sortedLanes)), + counters: make(map[types.Lane]uint, len(mem.sortedLanes)), cursors: cursors, } + return &NonBlockingWRRIterator{ + WRRIterator: iter, + } } -func (iter *ReapIterator) nextLane() types.Lane { - iter.laneIndex = (iter.laneIndex + 1) % len(iter.sortedLanes) - return iter.sortedLanes[iter.laneIndex] -} - -func (iter *ReapIterator) Next() *clist.CElement { +func (iter *NonBlockingWRRIterator) Next() *clist.CElement { lane := iter.sortedLanes[iter.laneIndex] numEmptyLanes := 0 for { - // Skip empty lane or if cursor is at end of lane + // Skip empty lane or if cursor is at end of lane. if iter.cursors[lane] == nil { numEmptyLanes++ if numEmptyLanes >= len(iter.sortedLanes) { @@ -934,7 +945,7 @@ func (iter *ReapIterator) Next() *clist.CElement { continue } // Skip over-consumed lane. - if iter.counters[lane] >= int(lane) { + if iter.counters[lane] >= uint(lane) { iter.counters[lane] = 0 numEmptyLanes = 0 lane = iter.nextLane() @@ -948,20 +959,22 @@ func (iter *ReapIterator) Next() *clist.CElement { return elem } -// CListIterator implements an Iterator that traverses the lanes with the classical Weighted Round -// Robin (WRR) algorithm. -type CListIterator struct { - mp *CListMempool // to wait on and retrieve the first entry - currentLaneIndex int // current lane being iterated; index on mp.sortedLanes - counters map[types.Lane]uint32 // counters of accessed entries for WRR algorithm - cursors map[types.Lane]*clist.CElement // last accessed entries on each lane +// BlockingWRRIterator implements an blocking version of the WRR iterator. When no +// transactions are available, it waits until a new one is added to the mempool. +type BlockingWRRIterator struct { + WRRIterator + mp *CListMempool } -func (mem *CListMempool) NewIterator() Iterator { - return &CListIterator{ - mp: mem, - counters: make(map[types.Lane]uint32, len(mem.sortedLanes)), - cursors: make(map[types.Lane]*clist.CElement, len(mem.sortedLanes)), +func (mem *CListMempool) NewBlockingWRRIterator() Iterator { + iter := WRRIterator{ + sortedLanes: mem.sortedLanes, + counters: make(map[types.Lane]uint, len(mem.sortedLanes)), + cursors: make(map[types.Lane]*clist.CElement, len(mem.sortedLanes)), + } + return &BlockingWRRIterator{ + WRRIterator: iter, + mp: mem, } } @@ -970,7 +983,7 @@ func (mem *CListMempool) NewIterator() Iterator { // the list. // // Unsafe for concurrent use by multiple goroutines. -func (iter *CListIterator) WaitNextCh() <-chan Entry { +func (iter *BlockingWRRIterator) WaitNextCh() <-chan Entry { ch := make(chan Entry) go func() { // Add the next entry to the channel if not nil. @@ -988,11 +1001,11 @@ func (iter *CListIterator) WaitNextCh() <-chan Entry { // PickLane returns a _valid_ lane on which to iterate, according to the WRR algorithm. A lane is // valid if it is not empty or the number of accessed entries in the lane has not yet reached its // priority value. -func (iter *CListIterator) PickLane() types.Lane { +func (iter *BlockingWRRIterator) PickLane() types.Lane { // Loop until finding a valid lanes // If the current lane is not valid, continue with the next lane with lower priority, in a // round robin fashion. - lane := iter.mp.sortedLanes[iter.currentLaneIndex] + lane := iter.sortedLanes[iter.laneIndex] iter.mp.addTxChMtx.RLock() defer iter.mp.addTxChMtx.RUnlock() @@ -1003,10 +1016,9 @@ func (iter *CListIterator) PickLane() types.Lane { (iter.cursors[lane] != nil && iter.cursors[lane].Value.(*mempoolTx).seq == iter.mp.addTxLaneSeqs[lane]) { prevLane := lane - iter.currentLaneIndex = (iter.currentLaneIndex + 1) % len(iter.mp.sortedLanes) - lane = iter.mp.sortedLanes[iter.currentLaneIndex] + lane = iter.nextLane() nIter++ - if nIter >= len(iter.mp.sortedLanes) { + if nIter >= len(iter.sortedLanes) { ch := iter.mp.addTxCh iter.mp.addTxChMtx.RUnlock() iter.mp.logger.Info("YYY PickLane, bef block", "lane", lane, "prevLane", prevLane) @@ -1019,12 +1031,11 @@ func (iter *CListIterator) PickLane() types.Lane { continue } - if iter.counters[lane] >= uint32(lane) { + if iter.counters[lane] >= uint(lane) { // Reset the counter only when the limit on the lane was reached. iter.counters[lane] = 0 - iter.currentLaneIndex = (iter.currentLaneIndex + 1) % len(iter.mp.sortedLanes) prevLane := lane - lane = iter.mp.sortedLanes[iter.currentLaneIndex] + lane = iter.nextLane() nIter = 0 iter.mp.logger.Info("YYY PickLane, skipped lane 2", "lane", prevLane, "new Lane ", lane) continue @@ -1035,7 +1046,7 @@ func (iter *CListIterator) PickLane() types.Lane { } } -// Next implements the classical Weighted Round Robin (WRR) algorithm. +// Next returns the next element according to the WRR algorithm. // // In classical WRR, the iterator cycles over the lanes. When a lane is selected, Next returns an // entry from the selected lane. On subsequent calls, Next will return the next entries from the @@ -1044,7 +1055,7 @@ func (iter *CListIterator) PickLane() types.Lane { // // TODO: Note that this code does not block waiting for an available entry on a CList or a CElement, as // was the case on the original code. Is this the best way to do it? -func (iter *CListIterator) Next() *clist.CElement { +func (iter *BlockingWRRIterator) Next() *clist.CElement { lane := iter.PickLane() // Load the last accessed entry in the lane and set the next one. var next *clist.CElement diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 9f60f5a7863..c13f2ed8ab3 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -173,7 +173,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) { // Ensure gas calculation behaves as expected checkTxs(t, mp, 1) - iter := mp.NewIterator() + iter := mp.NewBlockingWRRIterator() tx0 := <-iter.WaitNextCh() require.NotNil(t, tx0) require.Equal(t, tx0.GasWanted(), int64(1), "transactions gas was set incorrectly") @@ -871,7 +871,7 @@ func TestMempoolIteratorRace(t *testing.T) { defer wg.Done() for counter.Load() < n { - iter := mp.NewIterator() + iter := mp.NewBlockingWRRIterator() entry := <-iter.WaitNextCh() if entry == nil { continue @@ -892,7 +892,7 @@ func TestMempoolIteratorRace(t *testing.T) { defer wg.Done() for counter.Load() < n { - iter := mp.NewIterator() + iter := mp.NewBlockingWRRIterator() entry := <-iter.WaitNextCh() if entry == nil { continue @@ -942,7 +942,7 @@ func TestMempoolEmptyLanes(t *testing.T) { defer cleanup() go func() { - iter := mp.NewIterator() + iter := mp.NewBlockingWRRIterator() require.Equal(t, 0, mp.Size()) entry := <-iter.WaitNextCh() @@ -1229,7 +1229,7 @@ func TestMempoolIteratorExactOrder(t *testing.T) { } t.Log("Mempool full, starting to pick up transactions", mp.Size()) - iter := mp.NewIterator() + iter := mp.NewBlockingWRRIterator() counter := 0 for counter < n { @@ -1288,7 +1288,7 @@ func TestMempoolIteratorCountOnly(t *testing.T) { go func() { defer wg.Done() - iter := mp.NewIterator() + iter := mp.NewBlockingWRRIterator() for counter < n { entry := <-iter.WaitNextCh() if entry == nil { @@ -1331,7 +1331,7 @@ func TestMempoolIteratorNoLanes(t *testing.T) { go func() { defer wg.Done() - iter := mp.NewIterator() + iter := mp.NewBlockingWRRIterator() for counter < n { entry := <-iter.WaitNextCh() if entry == nil { diff --git a/mempool/reactor.go b/mempool/reactor.go index 3a2d03b0870..95c9d09865b 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -217,7 +217,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { } } - iter := memR.mempool.NewIterator() + iter := memR.mempool.NewBlockingWRRIterator() var entry Entry for { // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time From 3e028857ac304c16c65b86764510923be97a8515 Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 10:57:04 +0200 Subject: [PATCH 04/14] change returning type of `Next` to `Entry` --- mempool/clist_iterator_test.go | 9 ++++----- mempool/clist_mempool.go | 24 +++++++++++------------- mempool/clist_mempool_test.go | 4 ++-- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go index 2ff80221410..7951a344370 100644 --- a/mempool/clist_iterator_test.go +++ b/mempool/clist_iterator_test.go @@ -6,7 +6,6 @@ import ( "github.com/stretchr/testify/require" "github.com/cometbft/cometbft/abci/example/kvstore" - "github.com/cometbft/cometbft/internal/clist" "github.com/cometbft/cometbft/internal/test" "github.com/cometbft/cometbft/proxy" "github.com/cometbft/cometbft/types" @@ -45,14 +44,14 @@ func TestIteratorNonBlocking(t *testing.T) { 15, } - var next *clist.CElement + var next Entry counter := 0 // Check that txs are picked by the iterator in the expected order. for _, id := range expectedOrder { next = iter.Next() require.NotNil(t, next) - require.Equal(t, types.Tx(kvstore.NewTxFromID(id)), next.Value.(*mempoolTx).Tx(), "id=%v", id) + require.Equal(t, types.Tx(kvstore.NewTxFromID(id)), next.Tx(), "id=%v", id) counter++ } @@ -89,14 +88,14 @@ func TestIteratorNonBlockingOneLane(t *testing.T) { iter := mp.NewWRRIterator() expectedOrder := []int{0, 11, 22, 33, 44, 55, 66, 77, 88, 99} - var next *clist.CElement + var next Entry counter := 0 // Check that txs are picked by the iterator in the expected order. for _, id := range expectedOrder { next = iter.Next() require.NotNil(t, next) - require.Equal(t, types.Tx(kvstore.NewTxFromID(id)), next.Value.(*mempoolTx).Tx(), "id=%v", id) + require.Equal(t, types.Tx(kvstore.NewTxFromID(id)), next.Tx(), "id=%v", id) counter++ } diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 1bedf590257..9f969393351 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -612,15 +612,13 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { txs := make([]types.Tx, 0, mem.Size()) iter := mem.NewWRRIterator() for { - elem := iter.Next() - if elem == nil { + memTx := iter.Next() + if memTx == nil { break } - memTx := elem.Value.(*mempoolTx) + txs = append(txs, memTx.Tx()) - txs = append(txs, memTx.tx) - - dataSize := types.ComputeProtoSizeForTxs([]types.Tx{memTx.tx}) + dataSize := types.ComputeProtoSizeForTxs([]types.Tx{memTx.Tx()}) // Check total size requirement if maxBytes > -1 && runningSize+dataSize > maxBytes { @@ -633,7 +631,7 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { // If maxGas is negative, skip this check. // Since newTotalGas < masGas, which // must be non-negative, it follows that this won't overflow. - newTotalGas := totalGas + memTx.gasWanted + newTotalGas := totalGas + memTx.GasWanted() if maxGas > -1 && newTotalGas > maxGas { return txs[:len(txs)-1] } @@ -654,12 +652,11 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { txs := make([]types.Tx, 0, cmtmath.MinInt(mem.Size(), max)) iter := mem.NewWRRIterator() for len(txs) <= max { - elem := iter.Next() - if elem == nil { + memTx := iter.Next() + if memTx == nil { break } - memTx := elem.Value.(*mempoolTx) - txs = append(txs, memTx.tx) + txs = append(txs, memTx.Tx()) } return txs } @@ -931,7 +928,7 @@ func (mem *CListMempool) NewWRRIterator() *NonBlockingWRRIterator { } } -func (iter *NonBlockingWRRIterator) Next() *clist.CElement { +func (iter *NonBlockingWRRIterator) Next() Entry { lane := iter.sortedLanes[iter.laneIndex] numEmptyLanes := 0 for { @@ -956,7 +953,8 @@ func (iter *NonBlockingWRRIterator) Next() *clist.CElement { elem := iter.cursors[lane] iter.cursors[lane] = iter.cursors[lane].Next() iter.counters[lane]++ - return elem + // elem is not nil + return elem.Value.(*mempoolTx) } // BlockingWRRIterator implements an blocking version of the WRR iterator. When no diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index c13f2ed8ab3..e4f98251826 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -173,8 +173,8 @@ func TestReapMaxBytesMaxGas(t *testing.T) { // Ensure gas calculation behaves as expected checkTxs(t, mp, 1) - iter := mp.NewBlockingWRRIterator() - tx0 := <-iter.WaitNextCh() + iter := mp.NewWRRIterator() + tx0 := iter.Next() require.NotNil(t, tx0) require.Equal(t, tx0.GasWanted(), int64(1), "transactions gas was set incorrectly") // ensure each tx is 20 bytes long From de03c15e7b2e4d540eb6b591d669032574bd782b Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 11:48:31 +0200 Subject: [PATCH 05/14] Add TestReapOrderMatchesGossipOrder --- mempool/clist_iterator_test.go | 43 ++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go index 7951a344370..2652e960f59 100644 --- a/mempool/clist_iterator_test.go +++ b/mempool/clist_iterator_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cometbft/cometbft/abci/example/kvstore" + abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/internal/test" "github.com/cometbft/cometbft/proxy" "github.com/cometbft/cometbft/types" @@ -102,3 +103,45 @@ func TestIteratorNonBlockingOneLane(t *testing.T) { next = iter.Next() require.Nil(t, next) } + +func TestReapOrderMatchesGossipOrder(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + mp, cleanup := newMempoolWithApp(cc) + defer cleanup() + + n := 10 + + // Add a bunch of txs. + for i := 1; i <= n; i++ { + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, "") + require.NoError(t, err, err) + rr.Wait() + } + require.Equal(t, n, mp.Size()) + + gossipIter := mp.NewBlockingWRRIterator() + reapIter := mp.NewWRRIterator() + + // Check that both iterators return the same entry as in the reaped txs. + txs := make([]types.Tx, n) + reapedTxs := mp.ReapMaxTxs(n) + for i, reapedTx := range reapedTxs { + entry := <-gossipIter.WaitNextCh() + // entry can be nil only when an entry is removed concurrently. + require.NotNil(t, entry) + gossipTx := entry.Tx() + + reapTx := reapIter.Next().Tx() + txs[i] = reapTx + + require.EqualValues(t, reapTx, gossipTx) + require.EqualValues(t, reapTx, reapedTx) + } + require.EqualValues(t, txs, reapedTxs) + + err := mp.Update(1, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil) + require.NoError(t, err) + require.Zero(t, mp.Size()) +} From 66a162119df78d850787ccb21e53ae9393be3092 Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 11:59:08 +0200 Subject: [PATCH 06/14] Move iterator tests to `clist_iterator_tests.go` --- mempool/clist_iterator_test.go | 268 +++++++++++++++++++++++++++++++ mempool/clist_mempool_test.go | 277 --------------------------------- 2 files changed, 268 insertions(+), 277 deletions(-) diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go index 2652e960f59..8a832f9b56a 100644 --- a/mempool/clist_iterator_test.go +++ b/mempool/clist_iterator_test.go @@ -1,10 +1,16 @@ package mempool import ( + "fmt" + "sync" + "sync/atomic" "testing" + "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + abciclimocks "github.com/cometbft/cometbft/abci/client/mocks" "github.com/cometbft/cometbft/abci/example/kvstore" abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/internal/test" @@ -104,6 +110,268 @@ func TestIteratorNonBlockingOneLane(t *testing.T) { require.Nil(t, next) } +// We have two iterators fetching transactions that +// then get removed. +func TestMempoolIteratorRace(t *testing.T) { + mockClient := new(abciclimocks.Client) + mockClient.On("Start").Return(nil) + mockClient.On("SetLogger", mock.Anything) + mockClient.On("Error").Return(nil).Times(100) + mockClient.On("Info", mock.Anything, mock.Anything).Return(&abci.InfoResponse{LanePriorities: []uint32{1, 2, 3}, DefaultLanePriority: 1}, nil) + + mp, cleanup := newMempoolWithAppMock(mockClient) + defer cleanup() + + // Disable rechecking to make sure the recheck logic is not interferint. + mp.config.Recheck = false + + numLanes := 3 + n := int64(100) // Number of transactions + + var wg sync.WaitGroup + + wg.Add(2) + var counter atomic.Int64 + go func() { + // Wait for at least some transactions to get into the mempool + for mp.Size() < int(n) { + time.Sleep(time.Second) + } + fmt.Println("mempool height ", mp.height.Load()) + + go func() { + defer wg.Done() + + for counter.Load() < n { + iter := mp.NewBlockingWRRIterator() + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + tx := entry.Tx() + + txs := []types.Tx{tx} + + resp := abciResponses(1, 0) + err := mp.Update(1, txs, resp, nil, nil) + + require.NoError(t, err, tx) + counter.Add(1) + } + }() + + go func() { + defer wg.Done() + + for counter.Load() < n { + iter := mp.NewBlockingWRRIterator() + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + tx := entry.Tx() + + txs := []types.Tx{tx} + resp := abciResponses(1, 0) + err := mp.Update(1, txs, resp, nil, nil) + + require.NoError(t, err) + counter.Add(1) + } + }() + }() + + // This was introduced because without a separate function + // we have to sleep to wait for all txs to get into the mempool. + // This way we loop in the function above until it is fool + // without arbitrary timeouts. + go func() { + for i := 1; i <= int(n); i++ { + tx := kvstore.NewTxFromID(i) + + currLane := (i % numLanes) + 1 + reqRes := newReqResWithLanes(tx, abci.CodeTypeOK, abci.CHECK_TX_TYPE_CHECK, uint32(currLane)) + require.NotNil(t, reqRes) + + mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil).Once() + _, err := mp.CheckTx(tx, "") + require.NoError(t, err, err) + reqRes.InvokeCallback() + } + }() + + wg.Wait() + + require.Equal(t, counter.Load(), n+1) +} + +func TestMempoolEmptyLanes(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + + cfg := test.ResetTestRoot("mempool_empty_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + go func() { + iter := mp.NewBlockingWRRIterator() + require.Zero(t, mp.Size()) + entry := <-iter.WaitNextCh() + require.NotNil(t, entry) + require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(1)) + }() + time.Sleep(time.Second * 2) + tx := kvstore.NewTxFromID(1) + res := abci.ToCheckTxResponse(&abci.CheckTxResponse{Code: abci.CodeTypeOK}) + mp.handleCheckTxResponse(tx, "")(res) + require.Equal(t, 1, mp.Size(), "pool size mismatch") +} + +// TODO automate the lane numbers so we can change the number of lanes +// and increase the number of transactions. +func TestMempoolIteratorExactOrder(t *testing.T) { + mockClient := new(abciclimocks.Client) + mockClient.On("Start").Return(nil) + mockClient.On("SetLogger", mock.Anything) + mockClient.On("Error").Return(nil).Times(100) + mockClient.On("Info", mock.Anything, mock.Anything).Return(&abci.InfoResponse{LanePriorities: []uint32{1, 2, 3}, DefaultLanePriority: 1}, nil) + + mp, cleanup := newMempoolWithAppMock(mockClient) + defer cleanup() + + // Disable rechecking to make sure the recheck logic is not interferint. + mp.config.Recheck = false + + numLanes := 3 + n := 11 // Number of transactions + // Transactions are ordered into lanes by their IDs. This is the order in + // which they should appear following WRR + expectedTxIDs := []int{2, 5, 8, 1, 4, 3, 11, 7, 10, 6, 9} + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + waitForNumTxsInMempool(n, mp) + t.Log("Mempool full, starting to pick up transactions", mp.Size()) + + iter := mp.NewBlockingWRRIterator() + for i := 0; i < n; i++ { + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(expectedTxIDs[i])) + } + }() + + // This was introduced because without a separate function + // we have to sleep to wait for all txs to get into the mempool. + // This way we loop in the function above until it is fool + // without arbitrary timeouts. + go func() { + for i := 1; i <= n; i++ { + tx := kvstore.NewTxFromID(i) + + currLane := (i % numLanes) + 1 + reqRes := newReqResWithLanes(tx, abci.CodeTypeOK, abci.CHECK_TX_TYPE_CHECK, uint32(currLane)) + require.NotNil(t, reqRes) + + mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil).Once() + _, err := mp.CheckTx(tx, "") + require.NoError(t, err, err) + reqRes.InvokeCallback() + } + }() + + wg.Wait() +} + +// This only tests that all transactions were submitted. +func TestMempoolIteratorCountOnly(t *testing.T) { + app := kvstore.NewInMemoryApplication() + cc := proxy.NewLocalClientCreator(app) + + cfg := test.ResetTestRoot("mempool_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + var wg sync.WaitGroup + wg.Add(1) + + n := numTxs + + // Spawn a goroutine that iterates on the list until counting n entries. + counter := 0 + go func() { + defer wg.Done() + + iter := mp.NewBlockingWRRIterator() + for counter < n { + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + counter++ + } + }() + + // Add n transactions with sequential ids. + for i := 0; i < n; i++ { + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, "") + require.NoError(t, err) + rr.Wait() + } + + wg.Wait() + require.Equal(t, n, counter) +} + +// Without lanes transactions should be returned as they were +// submitted - increasing tx IDs. +func TestMempoolIteratorNoLanes(t *testing.T) { + app := kvstore.NewInMemoryApplication() + app.SetUseLanes(false) + cc := proxy.NewLocalClientCreator(app) + + cfg := test.ResetTestRoot("mempool_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + var wg sync.WaitGroup + wg.Add(1) + + n := 1000 // numTxs + + // Spawn a goroutine that iterates on the list until counting n entries. + counter := 0 + go func() { + defer wg.Done() + + iter := mp.NewBlockingWRRIterator() + for counter < n { + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(counter)) + counter++ + } + }() + + // Add n transactions with sequential ids. + for i := 0; i < n; i++ { + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, "") + require.NoError(t, err) + rr.Wait() + } + + wg.Wait() + require.Equal(t, n, counter) +} + func TestReapOrderMatchesGossipOrder(t *testing.T) { app := kvstore.NewInMemoryApplication() cc := proxy.NewLocalClientCreator(app) diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index e4f98251826..86826f3e149 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -1,7 +1,6 @@ package mempool import ( - "bytes" "context" "encoding/binary" "errors" @@ -10,7 +9,6 @@ import ( "os" "strconv" "sync" - "sync/atomic" "testing" "time" @@ -838,126 +836,6 @@ func TestMempoolConcurrentUpdateAndReceiveCheckTxResponse(t *testing.T) { } } -// We have two iterators fetching transactions that -// then get removed. -func TestMempoolIteratorRace(t *testing.T) { - mockClient := new(abciclimocks.Client) - mockClient.On("Start").Return(nil) - mockClient.On("SetLogger", mock.Anything) - mockClient.On("Error").Return(nil).Times(100) - mockClient.On("Info", mock.Anything, mock.Anything).Return(&abci.InfoResponse{LanePriorities: []uint32{1, 2, 3}, DefaultLanePriority: 1}, nil) - - mp, cleanup := newMempoolWithAppMock(mockClient) - defer cleanup() - - // Disable rechecking to make sure the recheck logic is not interferint. - mp.config.Recheck = false - - numLanes := 3 - n := int64(100) // Number of transactions - - var wg sync.WaitGroup - - wg.Add(2) - var counter atomic.Int64 - go func() { - // Wait for at least some transactions to get into the mempool - for mp.Size() < int(n) { - time.Sleep(time.Second) - } - fmt.Println("mempool height ", mp.height.Load()) - - go func() { - defer wg.Done() - - for counter.Load() < n { - iter := mp.NewBlockingWRRIterator() - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - tx := entry.Tx() - - txs := []types.Tx{tx} - - resp := abciResponses(1, 0) - err := mp.Update(1, txs, resp, nil, nil) - - require.NoError(t, err, tx) - counter.Add(1) - } - }() - - go func() { - defer wg.Done() - - for counter.Load() < n { - iter := mp.NewBlockingWRRIterator() - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - tx := entry.Tx() - - txs := []types.Tx{tx} - resp := abciResponses(1, 0) - err := mp.Update(1, txs, resp, nil, nil) - - require.NoError(t, err) - counter.Add(1) - } - }() - }() - - // This was introduced because without a separate function - // we have to sleep to wait for all txs to get into the mempool. - // This way we loop in the function above until it is fool - // without arbitrary timeouts. - go func() { - for i := 1; i <= int(n); i++ { - tx := kvstore.NewTxFromID(i) - - currLane := (i % numLanes) + 1 - reqRes := newReqResWithLanes(tx, abci.CodeTypeOK, abci.CHECK_TX_TYPE_CHECK, uint32(currLane)) - require.NotNil(t, reqRes) - - mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil).Once() - _, err := mp.CheckTx(tx, "") - require.NoError(t, err, err) - reqRes.InvokeCallback() - } - }() - - wg.Wait() - - require.Equal(t, counter.Load(), n+1) -} - -func TestMempoolEmptyLanes(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - - cfg := test.ResetTestRoot("mempool_empty_test") - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - go func() { - iter := mp.NewBlockingWRRIterator() - require.Equal(t, 0, mp.Size()) - entry := <-iter.WaitNextCh() - - require.NotNil(t, entry) - tx := entry.Tx() - - require.True(t, bytes.Equal(tx, kvstore.NewTxFromID(1))) - }() - time.Sleep(time.Second * 2) - tx := kvstore.NewTxFromID(1) - res := abci.ToCheckTxResponse(&abci.CheckTxResponse{Code: abci.CodeTypeOK}) - mp.handleCheckTxResponse(tx, "")(res) - require.Equal(t, 1, mp.Size(), "pool size mismatch") -} - func TestMempoolNotifyTxsAvailable(t *testing.T) { app := kvstore.NewInMemoryApplication() cc := proxy.NewLocalClientCreator(app) @@ -1200,161 +1078,6 @@ func TestMempoolConcurrentCheckTxAndUpdate(t *testing.T) { require.Zero(t, mp.Size()) } -// TODO automate the lane numbers so we can change the number of lanes -// and increase the number of transactions. -func TestMempoolIteratorExactOrder(t *testing.T) { - mockClient := new(abciclimocks.Client) - mockClient.On("Start").Return(nil) - mockClient.On("SetLogger", mock.Anything) - mockClient.On("Error").Return(nil).Times(100) - mockClient.On("Info", mock.Anything, mock.Anything).Return(&abci.InfoResponse{LanePriorities: []uint32{1, 2, 3}, DefaultLanePriority: 1}, nil) - - mp, cleanup := newMempoolWithAppMock(mockClient) - defer cleanup() - - // Disable rechecking to make sure the recheck logic is not interferint. - mp.config.Recheck = false - - numLanes := 3 - n := 11 // Number of transactions - // Transactions are ordered into lanes by their IDs - // This is the order in which they should appear following WRR - localSortedLanes := []int{2, 5, 8, 1, 4, 3, 11, 7, 10, 6, 9} - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for mp.Size() < n { - time.Sleep(time.Second) - } - t.Log("Mempool full, starting to pick up transactions", mp.Size()) - - iter := mp.NewBlockingWRRIterator() - - counter := 0 - for counter < n { - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - tx := entry.Tx() - - txLocal := kvstore.NewTxFromID(localSortedLanes[counter]) - - require.True(t, bytes.Equal(tx, txLocal)) - - counter++ - } - }() - - // This was introduced because without a separate function - // we have to sleep to wait for all txs to get into the mempool. - // This way we loop in the function above until it is fool - // without arbitrary timeouts. - go func() { - for i := 1; i <= n; i++ { - tx := kvstore.NewTxFromID(i) - - currLane := (i % numLanes) + 1 - reqRes := newReqResWithLanes(tx, abci.CodeTypeOK, abci.CHECK_TX_TYPE_CHECK, uint32(currLane)) - require.NotNil(t, reqRes) - - mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil).Once() - _, err := mp.CheckTx(tx, "") - require.NoError(t, err, err) - reqRes.InvokeCallback() - } - }() - - wg.Wait() -} - -// This only tests that all transactions were submitted. -func TestMempoolIteratorCountOnly(t *testing.T) { - app := kvstore.NewInMemoryApplication() - cc := proxy.NewLocalClientCreator(app) - - cfg := test.ResetTestRoot("mempool_test") - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - var wg sync.WaitGroup - wg.Add(1) - - n := numTxs - - // Spawn a goroutine that iterates on the list until counting n entries. - counter := 0 - go func() { - defer wg.Done() - - iter := mp.NewBlockingWRRIterator() - for counter < n { - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - counter++ - } - }() - - // Add n transactions with sequential ids. - for i := 0; i < n; i++ { - tx := kvstore.NewTxFromID(i) - rr, err := mp.CheckTx(tx, "") - require.NoError(t, err) - rr.Wait() - } - - wg.Wait() - require.Equal(t, n, counter) -} - -// Without lanes transactions should be returned as they were -// submitted - increasing tx IDs. -func TestMempoolIteratorNoLanes(t *testing.T) { - app := kvstore.NewInMemoryApplication() - app.SetUseLanes(false) - cc := proxy.NewLocalClientCreator(app) - - cfg := test.ResetTestRoot("mempool_test") - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - var wg sync.WaitGroup - wg.Add(1) - - n := 1000 // numTxs - - // Spawn a goroutine that iterates on the list until counting n entries. - counter := 0 - go func() { - defer wg.Done() - - iter := mp.NewBlockingWRRIterator() - for counter < n { - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - require.True(t, bytes.Equal(kvstore.NewTxFromID(counter), entry.Tx())) - - counter++ - } - }() - - // Add n transactions with sequential ids. - for i := 0; i < n; i++ { - tx := kvstore.NewTxFromID(i) - rr, err := mp.CheckTx(tx, "") - require.NoError(t, err) - rr.Wait() - } - - wg.Wait() - require.Equal(t, n, counter) -} - func newMempoolWithAsyncConnection(tb testing.TB) (*CListMempool, cleanupFunc) { tb.Helper() sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", cmtrand.Str(6)) From 832be5fcd4308ab25b8bf8d148d5e962b7f88f77 Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 12:02:13 +0200 Subject: [PATCH 07/14] rename tests --- mempool/clist_iterator_test.go | 96 +++++++++++++++++----------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go index 8a832f9b56a..5d81fe8f370 100644 --- a/mempool/clist_iterator_test.go +++ b/mempool/clist_iterator_test.go @@ -112,7 +112,7 @@ func TestIteratorNonBlockingOneLane(t *testing.T) { // We have two iterators fetching transactions that // then get removed. -func TestMempoolIteratorRace(t *testing.T) { +func TestIteratorRace(t *testing.T) { mockClient := new(abciclimocks.Client) mockClient.On("Start").Return(nil) mockClient.On("SetLogger", mock.Anything) @@ -205,7 +205,7 @@ func TestMempoolIteratorRace(t *testing.T) { require.Equal(t, counter.Load(), n+1) } -func TestMempoolEmptyLanes(t *testing.T) { +func TestIteratorEmptyLanes(t *testing.T) { app := kvstore.NewInMemoryApplication() cc := proxy.NewLocalClientCreator(app) @@ -227,9 +227,53 @@ func TestMempoolEmptyLanes(t *testing.T) { require.Equal(t, 1, mp.Size(), "pool size mismatch") } +// Without lanes transactions should be returned as they were +// submitted - increasing tx IDs. +func TestIteratorNoLanes(t *testing.T) { + app := kvstore.NewInMemoryApplication() + app.SetUseLanes(false) + cc := proxy.NewLocalClientCreator(app) + + cfg := test.ResetTestRoot("mempool_test") + mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) + defer cleanup() + + n := numTxs + + var wg sync.WaitGroup + wg.Add(1) + + // Spawn a goroutine that iterates on the list until counting n entries. + counter := 0 + go func() { + defer wg.Done() + + iter := mp.NewBlockingWRRIterator() + for counter < n { + entry := <-iter.WaitNextCh() + if entry == nil { + continue + } + require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(counter)) + counter++ + } + }() + + // Add n transactions with sequential ids. + for i := 0; i < n; i++ { + tx := kvstore.NewTxFromID(i) + rr, err := mp.CheckTx(tx, "") + require.NoError(t, err) + rr.Wait() + } + + wg.Wait() + require.Equal(t, n, counter) +} + // TODO automate the lane numbers so we can change the number of lanes // and increase the number of transactions. -func TestMempoolIteratorExactOrder(t *testing.T) { +func TestIteratorExactOrder(t *testing.T) { mockClient := new(abciclimocks.Client) mockClient.On("Start").Return(nil) mockClient.On("SetLogger", mock.Anything) @@ -288,7 +332,7 @@ func TestMempoolIteratorExactOrder(t *testing.T) { } // This only tests that all transactions were submitted. -func TestMempoolIteratorCountOnly(t *testing.T) { +func TestIteratorCountOnly(t *testing.T) { app := kvstore.NewInMemoryApplication() cc := proxy.NewLocalClientCreator(app) @@ -328,50 +372,6 @@ func TestMempoolIteratorCountOnly(t *testing.T) { require.Equal(t, n, counter) } -// Without lanes transactions should be returned as they were -// submitted - increasing tx IDs. -func TestMempoolIteratorNoLanes(t *testing.T) { - app := kvstore.NewInMemoryApplication() - app.SetUseLanes(false) - cc := proxy.NewLocalClientCreator(app) - - cfg := test.ResetTestRoot("mempool_test") - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - var wg sync.WaitGroup - wg.Add(1) - - n := 1000 // numTxs - - // Spawn a goroutine that iterates on the list until counting n entries. - counter := 0 - go func() { - defer wg.Done() - - iter := mp.NewBlockingWRRIterator() - for counter < n { - entry := <-iter.WaitNextCh() - if entry == nil { - continue - } - require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(counter)) - counter++ - } - }() - - // Add n transactions with sequential ids. - for i := 0; i < n; i++ { - tx := kvstore.NewTxFromID(i) - rr, err := mp.CheckTx(tx, "") - require.NoError(t, err) - rr.Wait() - } - - wg.Wait() - require.Equal(t, n, counter) -} - func TestReapOrderMatchesGossipOrder(t *testing.T) { app := kvstore.NewInMemoryApplication() cc := proxy.NewLocalClientCreator(app) From 74b11fc7b59bb5f81564f84e8e1567b0716c2260 Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 12:19:38 +0200 Subject: [PATCH 08/14] Alloc one iterator for reaping --- mempool/clist_mempool.go | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 9f969393351..8348e876059 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -64,6 +64,8 @@ type CListMempool struct { defaultLane types.Lane sortedLanes []types.Lane // lanes sorted by priority + reapIter *NonBlockingWRRIterator + // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. cache TxCache @@ -118,6 +120,8 @@ func NewCListMempool( slices.Reverse(mp.sortedLanes) } + mp.reapIter = mp.NewWRRIterator() + if cfg.CacheSize > 0 { mp.cache = NewLRUTxCache(cfg.CacheSize) } else { @@ -610,9 +614,9 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { // size per tx, and set the initial capacity based off of that. // txs := make([]types.Tx, 0, cmtmath.MinInt(mem.Size(), max/mem.avgTxSize)) txs := make([]types.Tx, 0, mem.Size()) - iter := mem.NewWRRIterator() + mem.reapIter.Reset(mem.lanes) for { - memTx := iter.Next() + memTx := mem.reapIter.Next() if memTx == nil { break } @@ -650,9 +654,9 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { } txs := make([]types.Tx, 0, cmtmath.MinInt(mem.Size(), max)) - iter := mem.NewWRRIterator() + mem.reapIter.Reset(mem.lanes) for len(txs) <= max { - memTx := iter.Next() + memTx := mem.reapIter.Next() if memTx == nil { break } @@ -913,18 +917,26 @@ type NonBlockingWRRIterator struct { } func (mem *CListMempool) NewWRRIterator() *NonBlockingWRRIterator { - // Set cursors at the beginning of each lane. - cursors := make(map[types.Lane]*clist.CElement, len(mem.sortedLanes)) - for _, lane := range mem.sortedLanes { - cursors[lane] = mem.lanes[lane].Front() - } - iter := WRRIterator{ + baseIter := WRRIterator{ sortedLanes: mem.sortedLanes, - counters: make(map[types.Lane]uint, len(mem.sortedLanes)), - cursors: cursors, + counters: make(map[types.Lane]uint, len(mem.lanes)), + cursors: make(map[types.Lane]*clist.CElement, len(mem.lanes)), } - return &NonBlockingWRRIterator{ - WRRIterator: iter, + iter := &NonBlockingWRRIterator{ + WRRIterator: baseIter, + } + iter.Reset(mem.lanes) + return iter +} + +func (iter *NonBlockingWRRIterator) Reset(lanes map[types.Lane]*clist.CList) { + iter.laneIndex = 0 + for i := range iter.counters { + iter.counters[i] = 0 + } + // Set cursors at the beginning of each lane. + for lane := range lanes { + iter.cursors[lane] = lanes[lane].Front() } } From 35e1d11e4b475b2b5cf5b563600f32ae4e99f508 Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 12:30:07 +0200 Subject: [PATCH 09/14] don't sleep on test --- mempool/clist_iterator_test.go | 53 +++++++++++++--------------------- 1 file changed, 20 insertions(+), 33 deletions(-) diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go index 5d81fe8f370..2bd2e9e81df 100644 --- a/mempool/clist_iterator_test.go +++ b/mempool/clist_iterator_test.go @@ -5,7 +5,6 @@ import ( "sync" "sync/atomic" "testing" - "time" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -125,36 +124,28 @@ func TestIteratorRace(t *testing.T) { // Disable rechecking to make sure the recheck logic is not interferint. mp.config.Recheck = false - numLanes := 3 - n := int64(100) // Number of transactions + const numLanes = 3 + const numTxs = 100 var wg sync.WaitGroup - wg.Add(2) + var counter atomic.Int64 go func() { - // Wait for at least some transactions to get into the mempool - for mp.Size() < int(n) { - time.Sleep(time.Second) - } + waitForNumTxsInMempool(numTxs, mp) fmt.Println("mempool height ", mp.height.Load()) go func() { defer wg.Done() - for counter.Load() < n { + for counter.Load() < int64(numTxs) { iter := mp.NewBlockingWRRIterator() entry := <-iter.WaitNextCh() if entry == nil { continue } tx := entry.Tx() - - txs := []types.Tx{tx} - - resp := abciResponses(1, 0) - err := mp.Update(1, txs, resp, nil, nil) - + err := mp.Update(1, []types.Tx{tx}, abciResponses(1, 0), nil, nil) require.NoError(t, err, tx) counter.Add(1) } @@ -163,19 +154,15 @@ func TestIteratorRace(t *testing.T) { go func() { defer wg.Done() - for counter.Load() < n { + for counter.Load() < int64(numTxs) { iter := mp.NewBlockingWRRIterator() entry := <-iter.WaitNextCh() if entry == nil { continue } tx := entry.Tx() - - txs := []types.Tx{tx} - resp := abciResponses(1, 0) - err := mp.Update(1, txs, resp, nil, nil) - - require.NoError(t, err) + err := mp.Update(1, []types.Tx{tx}, abciResponses(1, 0), nil, nil) + require.NoError(t, err, tx) counter.Add(1) } }() @@ -186,7 +173,7 @@ func TestIteratorRace(t *testing.T) { // This way we loop in the function above until it is fool // without arbitrary timeouts. go func() { - for i := 1; i <= int(n); i++ { + for i := 1; i <= int(numTxs); i++ { tx := kvstore.NewTxFromID(i) currLane := (i % numLanes) + 1 @@ -202,7 +189,7 @@ func TestIteratorRace(t *testing.T) { wg.Wait() - require.Equal(t, counter.Load(), n+1) + require.Equal(t, counter.Load(), int64(numTxs+1)) } func TestIteratorEmptyLanes(t *testing.T) { @@ -220,7 +207,7 @@ func TestIteratorEmptyLanes(t *testing.T) { require.NotNil(t, entry) require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(1)) }() - time.Sleep(time.Second * 2) + tx := kvstore.NewTxFromID(1) res := abci.ToCheckTxResponse(&abci.CheckTxResponse{Code: abci.CodeTypeOK}) mp.handleCheckTxResponse(tx, "")(res) @@ -238,7 +225,7 @@ func TestIteratorNoLanes(t *testing.T) { mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) defer cleanup() - n := numTxs + const n = numTxs var wg sync.WaitGroup wg.Add(1) @@ -286,8 +273,8 @@ func TestIteratorExactOrder(t *testing.T) { // Disable rechecking to make sure the recheck logic is not interferint. mp.config.Recheck = false - numLanes := 3 - n := 11 // Number of transactions + const numLanes = 3 + const numTxs = 11 // Transactions are ordered into lanes by their IDs. This is the order in // which they should appear following WRR expectedTxIDs := []int{2, 5, 8, 1, 4, 3, 11, 7, 10, 6, 9} @@ -296,11 +283,11 @@ func TestIteratorExactOrder(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - waitForNumTxsInMempool(n, mp) + waitForNumTxsInMempool(numTxs, mp) t.Log("Mempool full, starting to pick up transactions", mp.Size()) iter := mp.NewBlockingWRRIterator() - for i := 0; i < n; i++ { + for i := 0; i < numTxs; i++ { entry := <-iter.WaitNextCh() if entry == nil { continue @@ -314,7 +301,7 @@ func TestIteratorExactOrder(t *testing.T) { // This way we loop in the function above until it is fool // without arbitrary timeouts. go func() { - for i := 1; i <= n; i++ { + for i := 1; i <= numTxs; i++ { tx := kvstore.NewTxFromID(i) currLane := (i % numLanes) + 1 @@ -343,7 +330,7 @@ func TestIteratorCountOnly(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - n := numTxs + const n = numTxs // Spawn a goroutine that iterates on the list until counting n entries. counter := 0 @@ -378,7 +365,7 @@ func TestReapOrderMatchesGossipOrder(t *testing.T) { mp, cleanup := newMempoolWithApp(cc) defer cleanup() - n := 10 + const n = 10 // Add a bunch of txs. for i := 1; i <= n; i++ { From 3ba7828efe743ca755011fac92d763a6f551025a Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 12:47:02 +0200 Subject: [PATCH 10/14] panic on nil entry --- mempool/clist_mempool.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 8348e876059..aecc7388ed8 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -963,9 +963,11 @@ func (iter *NonBlockingWRRIterator) Next() Entry { break } elem := iter.cursors[lane] + if elem == nil { + panic(fmt.Errorf("Iterator picked a nil entry on lane %d", lane)) + } iter.cursors[lane] = iter.cursors[lane].Next() iter.counters[lane]++ - // elem is not nil return elem.Value.(*mempoolTx) } From b5f614a5cbae6045bd1fe1c7e24fd93ecfe6c05a Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 12:49:44 +0200 Subject: [PATCH 11/14] comments --- mempool/clist_mempool.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index aecc7388ed8..15f818714d3 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -909,9 +909,11 @@ func (iter *WRRIterator) nextLane() types.Lane { return iter.sortedLanes[iter.laneIndex] } -// Non-blocking version of a WRR iterator. +// Non-blocking version of the WRR iterator to be used for reaping and +// rechecking transactions. // -// Lock must be held to update mempool: it cannot be modified while iterating. +// Lock must be held on the mempool when iterating: the mempool cannot be +// modified while iterating. type NonBlockingWRRIterator struct { WRRIterator } @@ -929,6 +931,7 @@ func (mem *CListMempool) NewWRRIterator() *NonBlockingWRRIterator { return iter } +// Reset must be called before every use of the iterator. func (iter *NonBlockingWRRIterator) Reset(lanes map[types.Lane]*clist.CList) { iter.laneIndex = 0 for i := range iter.counters { @@ -940,6 +943,7 @@ func (iter *NonBlockingWRRIterator) Reset(lanes map[types.Lane]*clist.CList) { } } +// Next returns the next element according to the WRR algorithm. func (iter *NonBlockingWRRIterator) Next() Entry { lane := iter.sortedLanes[iter.laneIndex] numEmptyLanes := 0 @@ -971,8 +975,9 @@ func (iter *NonBlockingWRRIterator) Next() Entry { return elem.Value.(*mempoolTx) } -// BlockingWRRIterator implements an blocking version of the WRR iterator. When no -// transactions are available, it waits until a new one is added to the mempool. +// BlockingWRRIterator implements a blocking version of the WRR iterator, +// meaning that when no transaction is available, it will wait until a new one +// is added to the mempool. type BlockingWRRIterator struct { WRRIterator mp *CListMempool @@ -1010,9 +1015,10 @@ func (iter *BlockingWRRIterator) WaitNextCh() <-chan Entry { return ch } -// PickLane returns a _valid_ lane on which to iterate, according to the WRR algorithm. A lane is -// valid if it is not empty or the number of accessed entries in the lane has not yet reached its -// priority value. +// PickLane returns a _valid_ lane on which to iterate, according to the WRR +// algorithm. A lane is valid if it is not empty or it is not over-consumed, +// meaning that the number of accessed entries in the lane has not yet reached +// its priority value in the current WRR iteration. func (iter *BlockingWRRIterator) PickLane() types.Lane { // Loop until finding a valid lanes // If the current lane is not valid, continue with the next lane with lower priority, in a From a365e2aed92c60a801c5f8534793f993913c6b35 Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 12:56:45 +0200 Subject: [PATCH 12/14] sleep in TestIteratorEmptyLanes --- mempool/clist_iterator_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go index 2bd2e9e81df..951c5a182e1 100644 --- a/mempool/clist_iterator_test.go +++ b/mempool/clist_iterator_test.go @@ -5,6 +5,7 @@ import ( "sync" "sync/atomic" "testing" + "time" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -207,6 +208,7 @@ func TestIteratorEmptyLanes(t *testing.T) { require.NotNil(t, entry) require.EqualValues(t, entry.Tx(), kvstore.NewTxFromID(1)) }() + time.Sleep(100 * time.Millisecond) tx := kvstore.NewTxFromID(1) res := abci.ToCheckTxResponse(&abci.CheckTxResponse{Code: abci.CodeTypeOK}) From e7c87387229c73dabee17dea4a5c67c07620b13f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hern=C3=A1n=20Vanzetto?= <15466498+hvanz@users.noreply.github.com> Date: Mon, 26 Aug 2024 12:59:29 +0200 Subject: [PATCH 13/14] Update mempool/clist_iterator_test.go Co-authored-by: Jasmina Malicevic --- mempool/clist_iterator_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go index 951c5a182e1..80de4cb6d93 100644 --- a/mempool/clist_iterator_test.go +++ b/mempool/clist_iterator_test.go @@ -134,7 +134,6 @@ func TestIteratorRace(t *testing.T) { var counter atomic.Int64 go func() { waitForNumTxsInMempool(numTxs, mp) - fmt.Println("mempool height ", mp.height.Load()) go func() { defer wg.Done() From 408ca6ab003afbe1228c024b71a2af9d09e36d27 Mon Sep 17 00:00:00 2001 From: hvanz Date: Mon, 26 Aug 2024 13:02:17 +0200 Subject: [PATCH 14/14] unused import --- mempool/clist_iterator_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/mempool/clist_iterator_test.go b/mempool/clist_iterator_test.go index 80de4cb6d93..59ce4bcf8cd 100644 --- a/mempool/clist_iterator_test.go +++ b/mempool/clist_iterator_test.go @@ -1,7 +1,6 @@ package mempool import ( - "fmt" "sync" "sync/atomic" "testing"