From 9ae9597b6d2b1f88ee7f7633bfeee11811962924 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hern=C3=A1n=20Vanzetto?= <15466498+hvanz@users.noreply.github.com> Date: Wed, 31 May 2023 19:01:36 +0200 Subject: [PATCH] mempool: slight refactor for improving readability (#894) * Move mempoolTx to new file; add methods for sender * renaming * Pass TxInfo as argument instead of individual fields * Unexport sender methods * Fix logging; restore comment (cherry picked from commit 1d51250b06f0ca3605a19886bdbe97d3dab96a33) # Conflicts: # mempool/v0/clist_mempool.go # mempool/v0/reactor.go --- mempool/mempoolTx.go | 34 ++++++++++++ mempool/v0/clist_mempool.go | 108 ++++++++++++++++++++++++------------ mempool/v0/reactor.go | 6 ++ 3 files changed, 113 insertions(+), 35 deletions(-) create mode 100644 mempool/mempoolTx.go diff --git a/mempool/mempoolTx.go b/mempool/mempoolTx.go new file mode 100644 index 00000000000..5c160ce2f7f --- /dev/null +++ b/mempool/mempoolTx.go @@ -0,0 +1,34 @@ +package mempool + +import ( + "sync" + "sync/atomic" + + "github.com/cometbft/cometbft/types" +) + +// mempoolTx is an entry in the mempool +type mempoolTx struct { + height int64 // height that this tx had been validated in + gasWanted int64 // amount of gas this tx states it will require + tx types.Tx // validated by the application + + // ids of peers who've sent us this tx (as a map for quick lookups). + // senders: PeerID -> bool + senders sync.Map +} + +// Height returns the height for this transaction +func (memTx *mempoolTx) Height() int64 { + return atomic.LoadInt64(&memTx.height) +} + +func (memTx *mempoolTx) isSender(peerID uint16) bool { + _, ok := memTx.senders.Load(peerID) + return ok +} + +func (memTx *mempoolTx) addSender(senderID uint16) bool { + _, added := memTx.senders.LoadOrStore(senderID, true) + return added +} diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index 9500a3fb46d..6620e02b24b 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -12,8 +12,11 @@ import ( "github.com/cometbft/cometbft/libs/log" cmtmath "github.com/cometbft/cometbft/libs/math" cmtsync "github.com/cometbft/cometbft/libs/sync" +<<<<<<< HEAD:mempool/v0/clist_mempool.go "github.com/cometbft/cometbft/mempool" "github.com/cometbft/cometbft/p2p" +======= +>>>>>>> 1d51250b0 (mempool: slight refactor for improving readability (#894)):mempool/clist_mempool.go "github.com/cometbft/cometbft/proxy" "github.com/cometbft/cometbft/types" ) @@ -101,6 +104,32 @@ func NewCListMempool( return mp } +func (mem *CListMempool) getCElement(txKey types.TxKey) (*clist.CElement, bool) { + if e, ok := mem.txsMap.Load(txKey); ok { + return e.(*clist.CElement), true + } + return nil, false +} + +func (mem *CListMempool) getMemTx(txKey types.TxKey) *mempoolTx { + if e, ok := mem.getCElement(txKey); ok { + return e.Value.(*mempoolTx) + } + return nil +} + +func (mem *CListMempool) removeAllTxs() { + for e := mem.txs.Front(); e != nil; e = e.Next() { + mem.txs.Remove(e) + e.DetachPrev() + } + + mem.txsMap.Range(func(key, _ interface{}) bool { + mem.txsMap.Delete(key) + return true + }) +} + // NOTE: not thread safe - should only be called once, on startup func (mem *CListMempool) EnableTxsAvailable() { mem.txsAvailable = make(chan struct{}, 1) @@ -163,15 +192,7 @@ func (mem *CListMempool) Flush() { _ = atomic.SwapInt64(&mem.txsBytes, 0) mem.cache.Reset() - for e := mem.txs.Front(); e != nil; e = e.Next() { - mem.txs.Remove(e) - e.DetachPrev() - } - - mem.txsMap.Range(func(key, _ interface{}) bool { - mem.txsMap.Delete(key) - return true - }) + mem.removeAllTxs() } // TxsFront returns the first transaction in the ordered list for peer @@ -241,9 +262,8 @@ func (mem *CListMempool) CheckTx( // Note it's possible a tx is still in the cache but no longer in the mempool // (eg. after committing a block, txs are removed from mempool but not cache), // so we only record the sender for txs still in the mempool. - if e, ok := mem.txsMap.Load(tx.Key()); ok { - memTx := e.(*clist.CElement).Value.(*mempoolTx) - memTx.senders.LoadOrStore(txInfo.SenderID, true) + if memTx := mem.getMemTx(tx.Key()); memTx != nil { + memTx.addSender(txInfo.SenderID) // TODO: consider punishing peer for dups, // its non-trivial since invalid txs can become valid, // but they can spam the same tx with little cost to them atm. @@ -251,8 +271,16 @@ func (mem *CListMempool) CheckTx( return mempool.ErrTxInCache } +<<<<<<< HEAD:mempool/v0/clist_mempool.go reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) +======= + reqRes, err := mem.proxyAppConn.CheckTxAsync(context.TODO(), &abci.RequestCheckTx{Tx: tx}) + if err != nil { + return err + } + reqRes.SetCallback(mem.reqResCb(tx, txInfo, cb)) +>>>>>>> 1d51250b0 (mempool: slight refactor for improving readability (#894)):mempool/clist_mempool.go return nil } @@ -289,9 +317,14 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { // Used in CheckTx to record PeerID who sent us the tx. func (mem *CListMempool) reqResCb( tx []byte, +<<<<<<< HEAD:mempool/v0/clist_mempool.go peerID uint16, peerP2PID p2p.ID, externalCb func(*abci.Response), +======= + txInfo TxInfo, + externalCb func(*abci.ResponseCheckTx), +>>>>>>> 1d51250b0 (mempool: slight refactor for improving readability (#894)):mempool/clist_mempool.go ) func(res *abci.Response) { return func(res *abci.Response) { if mem.recheckCursor != nil { @@ -299,7 +332,7 @@ func (mem *CListMempool) reqResCb( panic("recheck cursor is not nil in reqResCb") } - mem.resCbFirstTime(tx, peerID, peerP2PID, res) + mem.resCbFirstTime(tx, txInfo, res) // update metrics mem.metrics.Size.Set(float64(mem.Size())) @@ -320,9 +353,11 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) { mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) } +// RemoveTxByKey removes a transaction from the mempool by its TxKey index. // Called from: // - Update (lock held) if tx was committed // - resCbRecheck (lock not held) if tx was invalidated +<<<<<<< HEAD:mempool/v0/clist_mempool.go func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { mem.txs.Remove(elem) elem.DetachPrev() @@ -345,6 +380,18 @@ func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { return errors.New("transaction not found") } return errors.New("invalid transaction found") +======= +func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { + if elem, ok := mem.getCElement(txKey); ok { + mem.txs.Remove(elem) + elem.DetachPrev() + mem.txsMap.Delete(txKey) + tx := elem.Value.(*mempoolTx).tx + atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) + return nil + } + return errors.New("transaction not found in mempool") +>>>>>>> 1d51250b0 (mempool: slight refactor for improving readability (#894)):mempool/clist_mempool.go } func (mem *CListMempool) isFull(txSize int) error { @@ -371,8 +418,7 @@ func (mem *CListMempool) isFull(txSize int) error { // handled by the resCbRecheck callback. func (mem *CListMempool) resCbFirstTime( tx []byte, - peerID uint16, - peerP2PID p2p.ID, + txInfo TxInfo, res *abci.Response, ) { switch r := res.Value.(type) { @@ -396,7 +442,7 @@ func (mem *CListMempool) resCbFirstTime( gasWanted: r.CheckTx.GasWanted, tx: tx, } - memTx.senders.Store(peerID, true) + memTx.addSender(txInfo.SenderID) mem.addTx(memTx) mem.logger.Debug( "added good transaction", @@ -411,7 +457,7 @@ func (mem *CListMempool) resCbFirstTime( mem.logger.Debug( "rejected bad transaction", "tx", types.Tx(tx).Hash(), - "peerID", peerP2PID, + "peerID", txInfo.SenderP2PID, "res", r, "err", postCheckErr, ) @@ -476,8 +522,18 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { } else { // Tx became invalidated due to newly committed block. mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr) +<<<<<<< HEAD:mempool/v0/clist_mempool.go // NOTE: we remove tx from the cache because it might be good later mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache) +======= + if err := mem.RemoveTxByKey(memTx.tx.Key()); err != nil { + mem.logger.Debug("Transaction could not be removed from mempool", "err", err) + } + // We remove the invalid tx from the cache because it might be good later + if !mem.config.KeepInvalidTxsInCache { + mem.cache.Remove(tx) + } +>>>>>>> 1d51250b0 (mempool: slight refactor for improving readability (#894)):mempool/clist_mempool.go } if mem.recheckCursor == mem.recheckEnd { mem.recheckCursor = nil @@ -658,21 +714,3 @@ func (mem *CListMempool) recheckTxs() { mem.proxyAppConn.FlushAsync() } - -//-------------------------------------------------------------------------------- - -// mempoolTx is a transaction that successfully ran -type mempoolTx struct { - height int64 // height that this tx had been validated in - gasWanted int64 // amount of gas this tx states it will require - tx types.Tx // - - // ids of peers who've sent us this tx (as a map for quick lookups). - // senders: PeerID -> bool - senders sync.Map -} - -// Height returns the height for this transaction -func (memTx *mempoolTx) Height() int64 { - return atomic.LoadInt64(&memTx.height) -} diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index 508d76666f8..c9ab2699a03 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -241,9 +241,15 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 +<<<<<<< HEAD:mempool/v0/reactor.go if _, ok := memTx.senders.Load(peerID); !ok { success := peer.SendEnvelope(p2p.Envelope{ ChannelID: mempool.MempoolChannel, +======= + if !memTx.isSender(peerID) { + success := peer.Send(p2p.Envelope{ + ChannelID: MempoolChannel, +>>>>>>> 1d51250b0 (mempool: slight refactor for improving readability (#894)):mempool/reactor.go Message: &protomem.Txs{Txs: [][]byte{memTx.tx}}, }) if !success {