-
Notifications
You must be signed in to change notification settings - Fork 636
mempool: slight refactor for improving readability #894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
571596a
Move mempoolTx to new file; add methods for sender
hvanz 73d714c
renaming
hvanz 7f8eca5
Pass TxInfo as argument instead of individual fields
hvanz a2b3b4b
Unexport sender methods
hvanz 0d20a54
Fix logging; restore comment
hvanz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,6 @@ import ( | |
"github.com/cometbft/cometbft/libs/log" | ||
cmtmath "github.com/cometbft/cometbft/libs/math" | ||
cmtsync "github.com/cometbft/cometbft/libs/sync" | ||
"github.com/cometbft/cometbft/p2p" | ||
"github.com/cometbft/cometbft/proxy" | ||
"github.com/cometbft/cometbft/types" | ||
) | ||
|
@@ -100,6 +99,32 @@ func NewCListMempool( | |
return mp | ||
} | ||
|
||
func (mem *CListMempool) getCElement(txKey types.TxKey) (*clist.CElement, bool) { | ||
if e, ok := mem.txsMap.Load(txKey); ok { | ||
return e.(*clist.CElement), true | ||
} | ||
return nil, false | ||
} | ||
|
||
func (mem *CListMempool) getMemTx(txKey types.TxKey) *mempoolTx { | ||
if e, ok := mem.getCElement(txKey); ok { | ||
return e.Value.(*mempoolTx) | ||
} | ||
return nil | ||
} | ||
|
||
func (mem *CListMempool) removeAllTxs() { | ||
for e := mem.txs.Front(); e != nil; e = e.Next() { | ||
mem.txs.Remove(e) | ||
e.DetachPrev() | ||
} | ||
|
||
mem.txsMap.Range(func(key, _ interface{}) bool { | ||
mem.txsMap.Delete(key) | ||
return true | ||
}) | ||
} | ||
|
||
// NOTE: not thread safe - should only be called once, on startup | ||
func (mem *CListMempool) EnableTxsAvailable() { | ||
mem.txsAvailable = make(chan struct{}, 1) | ||
|
@@ -162,15 +187,7 @@ func (mem *CListMempool) Flush() { | |
_ = atomic.SwapInt64(&mem.txsBytes, 0) | ||
mem.cache.Reset() | ||
|
||
for e := mem.txs.Front(); e != nil; e = e.Next() { | ||
mem.txs.Remove(e) | ||
e.DetachPrev() | ||
} | ||
|
||
mem.txsMap.Range(func(key, _ interface{}) bool { | ||
mem.txsMap.Delete(key) | ||
return true | ||
}) | ||
mem.removeAllTxs() | ||
} | ||
|
||
// TxsFront returns the first transaction in the ordered list for peer | ||
|
@@ -240,9 +257,8 @@ func (mem *CListMempool) CheckTx( | |
// Note it's possible a tx is still in the cache but no longer in the mempool | ||
// (eg. after committing a block, txs are removed from mempool but not cache), | ||
// so we only record the sender for txs still in the mempool. | ||
if e, ok := mem.txsMap.Load(tx.Key()); ok { | ||
memTx := e.(*clist.CElement).Value.(*mempoolTx) | ||
memTx.senders.LoadOrStore(txInfo.SenderID, true) | ||
if memTx := mem.getMemTx(tx.Key()); memTx != nil { | ||
memTx.addSender(txInfo.SenderID) | ||
// TODO: consider punishing peer for dups, | ||
// its non-trivial since invalid txs can become valid, | ||
// but they can spam the same tx with little cost to them atm. | ||
|
@@ -254,7 +270,7 @@ func (mem *CListMempool) CheckTx( | |
if err != nil { | ||
return err | ||
} | ||
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) | ||
reqRes.SetCallback(mem.reqResCb(tx, txInfo, cb)) | ||
|
||
return nil | ||
} | ||
|
@@ -291,8 +307,7 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { | |
// Used in CheckTx to record PeerID who sent us the tx. | ||
func (mem *CListMempool) reqResCb( | ||
tx []byte, | ||
peerID uint16, | ||
peerP2PID p2p.ID, | ||
txInfo TxInfo, | ||
externalCb func(*abci.ResponseCheckTx), | ||
) func(res *abci.Response) { | ||
return func(res *abci.Response) { | ||
|
@@ -301,7 +316,7 @@ func (mem *CListMempool) reqResCb( | |
panic("recheck cursor is not nil in reqResCb") | ||
} | ||
|
||
mem.resCbFirstTime(tx, peerID, peerP2PID, res) | ||
mem.resCbFirstTime(tx, txInfo, res) | ||
|
||
// update metrics | ||
mem.metrics.Size.Set(float64(mem.Size())) | ||
|
@@ -322,27 +337,20 @@ func (mem *CListMempool) addTx(memTx *mempoolTx) { | |
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) | ||
} | ||
|
||
// RemoveTxByKey removes a transaction from the mempool by its TxKey index. | ||
// Called from: | ||
// - Update (lock held) if tx was committed | ||
// - resCbRecheck (lock not held) if tx was invalidated | ||
func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement) { | ||
mem.txs.Remove(elem) | ||
elem.DetachPrev() | ||
mem.txsMap.Delete(tx.Key()) | ||
atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) | ||
} | ||
|
||
// RemoveTxByKey removes a transaction from the mempool by its TxKey index. | ||
func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { | ||
if e, ok := mem.txsMap.Load(txKey); ok { | ||
memTx := e.(*clist.CElement).Value.(*mempoolTx) | ||
if memTx != nil { | ||
mem.removeTx(memTx.tx, e.(*clist.CElement)) | ||
return nil | ||
} | ||
return errors.New("found empty transaction") | ||
if elem, ok := mem.getCElement(txKey); ok { | ||
mem.txs.Remove(elem) | ||
elem.DetachPrev() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also a missing call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. In all places where |
||
mem.txsMap.Delete(txKey) | ||
tx := elem.Value.(*mempoolTx).tx | ||
atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) | ||
return nil | ||
} | ||
return errors.New("transaction not found") | ||
return errors.New("transaction not found in mempool") | ||
} | ||
|
||
func (mem *CListMempool) isFull(txSize int) error { | ||
|
@@ -369,8 +377,7 @@ func (mem *CListMempool) isFull(txSize int) error { | |
// handled by the resCbRecheck callback. | ||
func (mem *CListMempool) resCbFirstTime( | ||
tx []byte, | ||
peerID uint16, | ||
peerP2PID p2p.ID, | ||
txInfo TxInfo, | ||
res *abci.Response, | ||
) { | ||
switch r := res.Value.(type) { | ||
|
@@ -394,7 +401,7 @@ func (mem *CListMempool) resCbFirstTime( | |
gasWanted: r.CheckTx.GasWanted, | ||
tx: tx, | ||
} | ||
memTx.senders.Store(peerID, true) | ||
memTx.addSender(txInfo.SenderID) | ||
mem.addTx(memTx) | ||
mem.logger.Debug( | ||
"added good transaction", | ||
|
@@ -409,7 +416,7 @@ func (mem *CListMempool) resCbFirstTime( | |
mem.logger.Debug( | ||
"rejected bad transaction", | ||
"tx", types.Tx(tx).Hash(), | ||
"peerID", peerP2PID, | ||
"peerID", txInfo.SenderP2PID, | ||
"res", r, | ||
"err", postCheckErr, | ||
) | ||
|
@@ -472,7 +479,9 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { | |
if (r.CheckTx.Code != abci.CodeTypeOK) || postCheckErr != nil { | ||
// Tx became invalidated due to newly committed block. | ||
mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr) | ||
mem.removeTx(tx, mem.recheckCursor) | ||
if err := mem.RemoveTxByKey(memTx.tx.Key()); err != nil { | ||
mem.logger.Debug("Transaction could not be removed from mempool", "err", err) | ||
} | ||
// We remove the invalid tx from the cache because it might be good later | ||
if !mem.config.KeepInvalidTxsInCache { | ||
mem.cache.Remove(tx) | ||
|
@@ -665,21 +674,3 @@ func (mem *CListMempool) recheckTxs() { | |
// all pending messages to the app. There doesn't seem to be any need here as the buffer | ||
// will get flushed regularly or when filled. | ||
} | ||
|
||
//-------------------------------------------------------------------------------- | ||
|
||
// mempoolTx is a transaction that successfully ran | ||
type mempoolTx struct { | ||
height int64 // height that this tx had been validated in | ||
gasWanted int64 // amount of gas this tx states it will require | ||
tx types.Tx // | ||
|
||
// ids of peers who've sent us this tx (as a map for quick lookups). | ||
// senders: PeerID -> bool | ||
senders sync.Map | ||
} | ||
|
||
// Height returns the height for this transaction | ||
func (memTx *mempoolTx) Height() int64 { | ||
return atomic.LoadInt64(&memTx.height) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package mempool | ||
|
||
import ( | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/cometbft/cometbft/types" | ||
) | ||
|
||
// mempoolTx is an entry in the mempool | ||
type mempoolTx struct { | ||
height int64 // height that this tx had been validated in | ||
gasWanted int64 // amount of gas this tx states it will require | ||
tx types.Tx // validated by the application | ||
|
||
// ids of peers who've sent us this tx (as a map for quick lookups). | ||
// senders: PeerID -> bool | ||
senders sync.Map | ||
} | ||
|
||
// Height returns the height for this transaction | ||
func (memTx *mempoolTx) Height() int64 { | ||
return atomic.LoadInt64(&memTx.height) | ||
} | ||
|
||
func (memTx *mempoolTx) isSender(peerID uint16) bool { | ||
_, ok := memTx.senders.Load(peerID) | ||
return ok | ||
} | ||
|
||
func (memTx *mempoolTx) addSender(senderID uint16) bool { | ||
_, added := memTx.senders.LoadOrStore(senderID, true) | ||
return added | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract in
CList
indicates that bothprev
andnext
should be detached to avoid memory leaks. (This was also missing initially.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract says "and/or": "Caller must call e.DetachPrev() and/or e.DetachNext() to avoid memory leaks". I haven't actually checked the code to know how it works. In any case, it's really good that you have checked this stuff.