From 571596a4a1071c07d9e291c6f6d708be8e80b984 Mon Sep 17 00:00:00 2001 From: hvanz Date: Tue, 30 May 2023 12:09:09 +0200 Subject: [PATCH 1/5] Move mempoolTx to new file; add methods for sender --- mempool/clist_mempool.go | 89 ++++++++++++++++++---------------------- mempool/mempoolTx.go | 34 +++++++++++++++ mempool/reactor.go | 2 +- 3 files changed, 76 insertions(+), 49 deletions(-) create mode 100644 mempool/mempoolTx.go diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index c054cae32dc..f1ae40c5ad4 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -100,6 +100,32 @@ func NewCListMempool( return mp } +func (mem *CListMempool) getElement(txKey types.TxKey) (*clist.CElement, bool) { + if e, ok := mem.txsMap.Load(txKey); ok { + return e.(*clist.CElement), true + } + return nil, false +} + +func (mem *CListMempool) getEntry(txKey types.TxKey) *mempoolTx { + if e, ok := mem.getElement(txKey); ok { + return e.Value.(*mempoolTx) + } + return nil +} + +func (mem *CListMempool) removeAllTxs() { + for e := mem.txs.Front(); e != nil; e = e.Next() { + mem.txs.Remove(e) + e.DetachPrev() + } + + mem.txsMap.Range(func(key, _ interface{}) bool { + mem.txsMap.Delete(key) + return true + }) +} + // NOTE: not thread safe - should only be called once, on startup func (mem *CListMempool) EnableTxsAvailable() { mem.txsAvailable = make(chan struct{}, 1) @@ -162,15 +188,7 @@ func (mem *CListMempool) Flush() { _ = atomic.SwapInt64(&mem.txsBytes, 0) mem.cache.Reset() - for e := mem.txs.Front(); e != nil; e = e.Next() { - mem.txs.Remove(e) - e.DetachPrev() - } - - mem.txsMap.Range(func(key, _ interface{}) bool { - mem.txsMap.Delete(key) - return true - }) + mem.removeAllTxs() } // TxsFront returns the first transaction in the ordered list for peer @@ -240,9 +258,8 @@ func (mem *CListMempool) CheckTx( // Note it's possible a tx is still in the cache but no longer in the mempool // (eg. after committing a block, txs are removed from mempool but not cache), // so we only record the sender for txs still in the mempool. - if e, ok := mem.txsMap.Load(tx.Key()); ok { - memTx := e.(*clist.CElement).Value.(*mempoolTx) - memTx.senders.LoadOrStore(txInfo.SenderID, true) + if memTx := mem.getEntry(tx.Key()); memTx != nil { + memTx.AddSender(txInfo.SenderID) // TODO: consider punishing peer for dups, // its non-trivial since invalid txs can become valid, // but they can spam the same tx with little cost to them atm. @@ -325,24 +342,16 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) { // Called from: // - Update (lock held) if tx was committed // - resCbRecheck (lock not held) if tx was invalidated -func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement) { - mem.txs.Remove(elem) - elem.DetachPrev() - mem.txsMap.Delete(tx.Key()) - atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) -} - -// RemoveTxByKey removes a transaction from the mempool by its TxKey index. func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { - if e, ok := mem.txsMap.Load(txKey); ok { - memTx := e.(*clist.CElement).Value.(*mempoolTx) - if memTx != nil { - mem.removeTx(memTx.tx, e.(*clist.CElement)) - return nil - } - return errors.New("found empty transaction") + if elem, ok := mem.getElement(txKey); ok { + mem.txs.Remove(elem) + elem.DetachPrev() + mem.txsMap.Delete(txKey) + tx := elem.Value.(*mempoolTx).tx + atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) + return nil } - return errors.New("transaction not found") + return errors.New("transaction not found in mempool") } func (mem *CListMempool) isFull(txSize int) error { @@ -394,7 +403,7 @@ func (mem *CListMempool) resCbFirstTime( gasWanted: r.CheckTx.GasWanted, tx: tx, } - memTx.senders.Store(peerID, true) + memTx.AddSender(peerID) mem.addTx(memTx) mem.logger.Debug( "added good transaction", @@ -472,7 +481,9 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { if (r.CheckTx.Code != abci.CodeTypeOK) || postCheckErr != nil { // Tx became invalidated due to newly committed block. mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr) - mem.removeTx(tx, mem.recheckCursor) + if err := mem.RemoveTxByKey(memTx.tx.Key()); err != nil { + mem.logger.Debug("Transaction could not be removed from mempool", err) + } // We remove the invalid tx from the cache because it might be good later if !mem.config.KeepInvalidTxsInCache { mem.cache.Remove(tx) @@ -665,21 +676,3 @@ func (mem *CListMempool) recheckTxs() { // all pending messages to the app. There doesn't seem to be any need here as the buffer // will get flushed regularly or when filled. } - -//-------------------------------------------------------------------------------- - -// mempoolTx is a transaction that successfully ran -type mempoolTx struct { - height int64 // height that this tx had been validated in - gasWanted int64 // amount of gas this tx states it will require - tx types.Tx // - - // ids of peers who've sent us this tx (as a map for quick lookups). - // senders: PeerID -> bool - senders sync.Map -} - -// Height returns the height for this transaction -func (memTx *mempoolTx) Height() int64 { - return atomic.LoadInt64(&memTx.height) -} diff --git a/mempool/mempoolTx.go b/mempool/mempoolTx.go new file mode 100644 index 00000000000..3cb0da0a27d --- /dev/null +++ b/mempool/mempoolTx.go @@ -0,0 +1,34 @@ +package mempool + +import ( + "sync" + "sync/atomic" + + "github.com/cometbft/cometbft/types" +) + +// mempoolTx is an entry in the mempool +type mempoolTx struct { + height int64 // height that this tx had been validated in + gasWanted int64 // amount of gas this tx states it will require + tx types.Tx // validated by the application + + // ids of peers who've sent us this tx (as a map for quick lookups). + // senders: PeerID -> bool + senders sync.Map +} + +// Height returns the height for this transaction +func (memTx *mempoolTx) Height() int64 { + return atomic.LoadInt64(&memTx.height) +} + +func (memTx *mempoolTx) IsSender(peerID uint16) bool { + _, ok := memTx.senders.Load(peerID) + return ok +} + +func (memTx *mempoolTx) AddSender(senderID uint16) bool { + _, added := memTx.senders.LoadOrStore(senderID, true) + return added +} diff --git a/mempool/reactor.go b/mempool/reactor.go index f72b802737c..72f4b2868b9 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -181,7 +181,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 - if _, ok := memTx.senders.Load(peerID); !ok { + if !memTx.IsSender(peerID) { success := peer.Send(p2p.Envelope{ ChannelID: MempoolChannel, Message: &protomem.Txs{Txs: [][]byte{memTx.tx}}, From 73d714c08c88a46aa353e465ed95fd9adc276f9d Mon Sep 17 00:00:00 2001 From: hvanz Date: Tue, 30 May 2023 23:22:58 +0200 Subject: [PATCH 2/5] renaming --- mempool/clist_mempool.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index f1ae40c5ad4..20800a8057a 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -100,15 +100,15 @@ func NewCListMempool( return mp } -func (mem *CListMempool) getElement(txKey types.TxKey) (*clist.CElement, bool) { +func (mem *CListMempool) getCElement(txKey types.TxKey) (*clist.CElement, bool) { if e, ok := mem.txsMap.Load(txKey); ok { return e.(*clist.CElement), true } return nil, false } -func (mem *CListMempool) getEntry(txKey types.TxKey) *mempoolTx { - if e, ok := mem.getElement(txKey); ok { +func (mem *CListMempool) getMemTx(txKey types.TxKey) *mempoolTx { + if e, ok := mem.getCElement(txKey); ok { return e.Value.(*mempoolTx) } return nil @@ -258,7 +258,7 @@ func (mem *CListMempool) CheckTx( // Note it's possible a tx is still in the cache but no longer in the mempool // (eg. after committing a block, txs are removed from mempool but not cache), // so we only record the sender for txs still in the mempool. - if memTx := mem.getEntry(tx.Key()); memTx != nil { + if memTx := mem.getMemTx(tx.Key()); memTx != nil { memTx.AddSender(txInfo.SenderID) // TODO: consider punishing peer for dups, // its non-trivial since invalid txs can become valid, @@ -343,7 +343,7 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) { // - Update (lock held) if tx was committed // - resCbRecheck (lock not held) if tx was invalidated func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { - if elem, ok := mem.getElement(txKey); ok { + if elem, ok := mem.getCElement(txKey); ok { mem.txs.Remove(elem) elem.DetachPrev() mem.txsMap.Delete(txKey) From 7f8eca5cb3dbd8185356d4d99ee80e584339bc9b Mon Sep 17 00:00:00 2001 From: hvanz Date: Wed, 31 May 2023 09:39:56 +0200 Subject: [PATCH 3/5] Pass TxInfo as argument instead of individual fields --- mempool/clist_mempool.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 20800a8057a..8364c6fd24c 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -13,7 +13,6 @@ import ( "github.com/cometbft/cometbft/libs/log" cmtmath "github.com/cometbft/cometbft/libs/math" cmtsync "github.com/cometbft/cometbft/libs/sync" - "github.com/cometbft/cometbft/p2p" "github.com/cometbft/cometbft/proxy" "github.com/cometbft/cometbft/types" ) @@ -271,7 +270,7 @@ func (mem *CListMempool) CheckTx( if err != nil { return err } - reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) + reqRes.SetCallback(mem.reqResCb(tx, txInfo, cb)) return nil } @@ -308,8 +307,7 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { // Used in CheckTx to record PeerID who sent us the tx. func (mem *CListMempool) reqResCb( tx []byte, - peerID uint16, - peerP2PID p2p.ID, + txInfo TxInfo, externalCb func(*abci.ResponseCheckTx), ) func(res *abci.Response) { return func(res *abci.Response) { @@ -318,7 +316,7 @@ func (mem *CListMempool) reqResCb( panic("recheck cursor is not nil in reqResCb") } - mem.resCbFirstTime(tx, peerID, peerP2PID, res) + mem.resCbFirstTime(tx, txInfo, res) // update metrics mem.metrics.Size.Set(float64(mem.Size())) @@ -378,8 +376,7 @@ func (mem *CListMempool) isFull(txSize int) error { // handled by the resCbRecheck callback. func (mem *CListMempool) resCbFirstTime( tx []byte, - peerID uint16, - peerP2PID p2p.ID, + txInfo TxInfo, res *abci.Response, ) { switch r := res.Value.(type) { @@ -403,7 +400,7 @@ func (mem *CListMempool) resCbFirstTime( gasWanted: r.CheckTx.GasWanted, tx: tx, } - memTx.AddSender(peerID) + memTx.AddSender(txInfo.SenderID) mem.addTx(memTx) mem.logger.Debug( "added good transaction", @@ -418,7 +415,7 @@ func (mem *CListMempool) resCbFirstTime( mem.logger.Debug( "rejected bad transaction", "tx", types.Tx(tx).Hash(), - "peerID", peerP2PID, + "peerID", txInfo.SenderP2PID, "res", r, "err", postCheckErr, ) From a2b3b4b4722eb7d6f976c4cd784b95ec4fe4b75b Mon Sep 17 00:00:00 2001 From: hvanz Date: Wed, 31 May 2023 09:49:04 +0200 Subject: [PATCH 4/5] Unexport sender methods --- mempool/clist_mempool.go | 4 ++-- mempool/mempoolTx.go | 4 ++-- mempool/reactor.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 8364c6fd24c..24541619b93 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -258,7 +258,7 @@ func (mem *CListMempool) CheckTx( // (eg. after committing a block, txs are removed from mempool but not cache), // so we only record the sender for txs still in the mempool. if memTx := mem.getMemTx(tx.Key()); memTx != nil { - memTx.AddSender(txInfo.SenderID) + memTx.addSender(txInfo.SenderID) // TODO: consider punishing peer for dups, // its non-trivial since invalid txs can become valid, // but they can spam the same tx with little cost to them atm. @@ -400,7 +400,7 @@ func (mem *CListMempool) resCbFirstTime( gasWanted: r.CheckTx.GasWanted, tx: tx, } - memTx.AddSender(txInfo.SenderID) + memTx.addSender(txInfo.SenderID) mem.addTx(memTx) mem.logger.Debug( "added good transaction", diff --git a/mempool/mempoolTx.go b/mempool/mempoolTx.go index 3cb0da0a27d..5c160ce2f7f 100644 --- a/mempool/mempoolTx.go +++ b/mempool/mempoolTx.go @@ -23,12 +23,12 @@ func (memTx *mempoolTx) Height() int64 { return atomic.LoadInt64(&memTx.height) } -func (memTx *mempoolTx) IsSender(peerID uint16) bool { +func (memTx *mempoolTx) isSender(peerID uint16) bool { _, ok := memTx.senders.Load(peerID) return ok } -func (memTx *mempoolTx) AddSender(senderID uint16) bool { +func (memTx *mempoolTx) addSender(senderID uint16) bool { _, added := memTx.senders.LoadOrStore(senderID, true) return added } diff --git a/mempool/reactor.go b/mempool/reactor.go index 72f4b2868b9..9a0372e4b85 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -181,7 +181,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 - if !memTx.IsSender(peerID) { + if !memTx.isSender(peerID) { success := peer.Send(p2p.Envelope{ ChannelID: MempoolChannel, Message: &protomem.Txs{Txs: [][]byte{memTx.tx}}, From 0d20a543e95758b2e6f9cd275d9ed4b392e0fc1d Mon Sep 17 00:00:00 2001 From: hvanz Date: Wed, 31 May 2023 18:47:41 +0200 Subject: [PATCH 5/5] Fix logging; restore comment --- mempool/clist_mempool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 24541619b93..d10fa733758 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -337,6 +337,7 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) { mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) } +// RemoveTxByKey removes a transaction from the mempool by its TxKey index. // Called from: // - Update (lock held) if tx was committed // - resCbRecheck (lock not held) if tx was invalidated @@ -479,7 +480,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { // Tx became invalidated due to newly committed block. mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr) if err := mem.RemoveTxByKey(memTx.tx.Key()); err != nil { - mem.logger.Debug("Transaction could not be removed from mempool", err) + mem.logger.Debug("Transaction could not be removed from mempool", "err", err) } // We remove the invalid tx from the cache because it might be good later if !mem.config.KeepInvalidTxsInCache {