From 8ed36c5a57d18a7ec13fa9b668e8efe0d538b63d Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Fri, 30 Jun 2023 12:45:25 +0900 Subject: [PATCH 01/12] Add DBManager Pruning accessors --- storage/database/db_manager.go | 100 +++++++++++++++++++++++++++- storage/database/db_manager_test.go | 47 +++++++++++++ storage/database/schema.go | 36 +++++++++- 3 files changed, 179 insertions(+), 4 deletions(-) diff --git a/storage/database/db_manager.go b/storage/database/db_manager.go index 106a4f0404..c28231f8ed 100644 --- a/storage/database/db_manager.go +++ b/storage/database/db_manager.go @@ -168,6 +168,16 @@ type DBManager interface { DeleteTrieNode(hash common.ExtHash) WritePreimages(number uint64, preimages map[common.Hash][]byte) + // Trie pruning + ReadPruningEnabled() bool + WritePruningEnabled() + DeletePruningEnabled() + + WritePruningMarks(marks []PruningMark) + ReadPruningMarks(startNumber, endNumber uint64) []PruningMark + DeletePruningMarks(marks []PruningMark) + PruneTrieNodes(marks []PruningMark) + // from accessors_indexes.go ReadTxLookupEntry(hash common.Hash) (common.Hash, uint64, uint64) WriteTxLookupEntries(block *types.Block) @@ -1863,6 +1873,12 @@ func (dbm *databaseManager) PutTrieNodeToBatch(batch Batch, hash common.ExtHash, } } +func (dbm *databaseManager) DeleteTrieNode(hash common.ExtHash) { + if err := dbm.getDatabase(StateTrieDB).Delete(TrieNodeKey(hash)); err != nil { + logger.Crit("Failed to delete trie node", "err", err) + } +} + // WritePreimages writes the provided set of preimages to the database. `number` is the // current block number, and is used for debug messages only. func (dbm *databaseManager) WritePreimages(number uint64, preimages map[common.Hash][]byte) { @@ -1882,9 +1898,87 @@ func (dbm *databaseManager) WritePreimages(number uint64, preimages map[common.H preimageHitCounter.Inc(int64(len(preimages))) } -func (dbm *databaseManager) DeleteTrieNode(hash common.ExtHash) { - if err := dbm.getDatabase(StateTrieDB).Delete(TrieNodeKey(hash)); err != nil { - logger.Crit("Failed to delete trie node", "err", err) +func (dbm *databaseManager) ReadPruningEnabled() bool { + ok, _ := dbm.getDatabase(MiscDB).Has(pruningEnabledKey) + return ok +} + +func (dbm *databaseManager) WritePruningEnabled() { + if err := dbm.getDatabase(MiscDB).Put(pruningEnabledKey, []byte("42")); err != nil { + logger.Crit("Failed to store pruning enabled flag", "err", err) + } +} + +func (dbm *databaseManager) DeletePruningEnabled() { + if err := dbm.getDatabase(MiscDB).Delete(pruningEnabledKey); err != nil { + logger.Crit("Failed to remove pruning enabled flag", "err", err) + } +} + +// WritePruningMarks writes the provided set of pruning marks to the database. +func (dbm *databaseManager) WritePruningMarks(marks []PruningMark) { + batch := dbm.NewBatch(MiscDB) + for _, mark := range marks { + if err := batch.Put(pruningMarkKey(mark), pruningMarkValue); err != nil { + logger.Crit("Failed to store trie pruning mark", "err", err) + } + if _, err := WriteBatchesOverThreshold(batch); err != nil { + logger.Crit("Failed to store trie pruning mark", "err", err) + } + } + if err := batch.Write(); err != nil { + logger.Crit("Failed to batch write pruning mark", "err", err) + } +} + +// ReadPruningMarks reads the pruning marks in the block number range [startNumber, endNumber). +func (dbm *databaseManager) ReadPruningMarks(startNumber, endNumber uint64) []PruningMark { + prefix := pruningMarkPrefix + startKey := pruningMarkKey(PruningMark{startNumber, common.ExtHash{}}) + it := dbm.getDatabase(MiscDB).NewIterator(prefix, startKey[len(prefix):]) + + var marks []PruningMark + for it.Next() { + mark := parsePruningMarkKey(it.Key()) + if endNumber != 0 && mark.Number >= endNumber { + break + } + marks = append(marks, mark) + } + return marks +} + +// DeletePruningMarks deletes the provided set of pruning marks from the database. +// Note that trie nodes are not deleted by this function. To prune trie nodes, use +// the PruneTrieNodes or DeleteTrieNode functions. +func (dbm *databaseManager) DeletePruningMarks(marks []PruningMark) { + batch := dbm.NewBatch(MiscDB) + for _, mark := range marks { + if err := batch.Delete(pruningMarkKey(mark)); err != nil { + logger.Crit("Failed to delete trie pruning mark", "err", err) + } + if _, err := WriteBatchesOverThreshold(batch); err != nil { + logger.Crit("Failed to delete trie pruning mark", "err", err) + } + } + if err := batch.Write(); err != nil { + logger.Crit("Failed to batch delete pruning mark", "err", err) + } +} + +// PruneTrieNodes deletes the trie nodes according to the provided set of pruning marks. +func (dbm *databaseManager) PruneTrieNodes(marks []PruningMark) { + batch := dbm.NewBatch(StateTrieDB) + for _, mark := range marks { + if err := batch.Delete(TrieNodeKey(mark.Hash)); err != nil { + logger.Crit("Failed to prune trie node", "err", err) + } + if _, err := WriteBatchesOverThreshold(batch); err != nil { + logger.Crit("Failed to prune trie node", "err", err) + } + } + if err := batch.Write(); err != nil { + logger.Crit("Failed to batch prune trie node", "err", err) } } diff --git a/storage/database/db_manager_test.go b/storage/database/db_manager_test.go index 7281d34ced..c687f33aea 100644 --- a/storage/database/db_manager_test.go +++ b/storage/database/db_manager_test.go @@ -521,6 +521,53 @@ func TestDBManager_TrieNode(t *testing.T) { } } +func TestDBManager_PruningMarks(t *testing.T) { + log.EnableLogForTest(log.LvlCrit, log.LvlTrace) + for _, dbm := range dbManagers { + if dbm.GetMiscDB().Type() == BadgerDB { + continue // badgerDB doesn't support NewIterator, so cannot test ReadPruningMarks. + } + + assert.False(t, dbm.ReadPruningEnabled()) + dbm.WritePruningEnabled() + assert.True(t, dbm.ReadPruningEnabled()) + dbm.DeletePruningEnabled() + assert.False(t, dbm.ReadPruningEnabled()) + + var ( + node1 = hash1.Extend() + node2 = hash2.Extend() + node3 = hash3.Extend() + node4 = hash4.Extend() + value = []byte("value") + ) + + dbm.WriteTrieNode(node1, value) + dbm.WriteTrieNode(node2, value) + dbm.WriteTrieNode(node3, value) + dbm.WriteTrieNode(node4, value) + dbm.WritePruningMarks([]PruningMark{ + {100, node1}, {200, node2}, {300, node3}, {400, node4}, + }) + + marks := dbm.ReadPruningMarks(300, 0) + assert.Equal(t, []PruningMark{{300, node3}, {400, node4}}, marks) + marks = dbm.ReadPruningMarks(0, 300) + assert.Equal(t, []PruningMark{{100, node1}, {200, node2}}, marks) + + dbm.PruneTrieNodes(marks) // delete node1, node2 + has := func(hash common.ExtHash) bool { ok, _ := dbm.HasTrieNode(hash); return ok } + assert.False(t, has(node1)) + assert.False(t, has(node2)) + assert.True(t, has(node3)) + assert.True(t, has(node4)) + + dbm.DeletePruningMarks(marks) + marks = dbm.ReadPruningMarks(0, 0) + assert.Equal(t, []PruningMark{{300, node3}, {400, node4}}, marks) + } +} + // TestDBManager_TxLookupEntry tests read, write and delete operations of TxLookupEntries. func TestDBManager_TxLookupEntry(t *testing.T) { log.EnableLogForTest(log.LvlCrit, log.LvlTrace) diff --git a/storage/database/schema.go b/storage/database/schema.go index 5bbb1f4b9a..7eec58a848 100644 --- a/storage/database/schema.go +++ b/storage/database/schema.go @@ -25,6 +25,7 @@ import ( "encoding/binary" "github.com/klaytn/klaytn/common" + "github.com/klaytn/klaytn/common/hexutil" "github.com/rcrowley/go-metrics" ) @@ -92,6 +93,11 @@ var ( preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("klay-config-") // config prefix for the db + pruningEnabledKey = []byte("PruningEnabled") + pruningMarkPrefix = []byte("Pruning-") // KIP-111 pruning markings + pruningMarkValue = []byte{0x01} // A nonempty value to store a pruning mark + pruningMarkKeyLen = len(pruningMarkPrefix) + 8 + common.ExtHashLength // prefix + num (uint64) + node hash + // Chain index prefixes (use `i` + single byte to avoid mixing data types). BloomBitsIndexPrefix = []byte("iB") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress @@ -232,7 +238,7 @@ func snapshotKey(hash common.Hash) []byte { } func childChainTxHashKey(ccBlockHash common.Hash) []byte { - return append(append(childChainTxHashPrefix, ccBlockHash.Bytes()...)) + return append(childChainTxHashPrefix, ccBlockHash.Bytes()...) } func receiptFromParentChainKey(blockHash common.Hash) []byte { @@ -270,3 +276,31 @@ func TrieNodeKey(hash common.ExtHash) []byte { return hash.Bytes() } } + +type PruningMark struct { + Number uint64 + Hash common.ExtHash +} + +// TriePruningMarkKey = prefix + number + hash +// Block number comes first to sort the entries by block numbers. +// Later we can iterate through the marks and extract any given block number range. +func pruningMarkKey(mark PruningMark) []byte { + bNumber := make([]byte, 8) + binary.BigEndian.PutUint64(bNumber, mark.Number) + bHash := mark.Hash.Bytes() + return append(append(pruningMarkPrefix, bNumber...), bHash...) +} + +func parsePruningMarkKey(key []byte) PruningMark { + if len(key) != pruningMarkKeyLen { + logger.Crit("Invalid pruningMarkKey", "key", hexutil.Encode(key)) + } + prefixLen := len(pruningMarkPrefix) + bNumber := key[prefixLen : prefixLen+8] + bHash := key[prefixLen+8:] + return PruningMark{ + Number: binary.BigEndian.Uint64(bNumber), + Hash: common.BytesToExtHash(bHash), + } +} From dfeefb86aeb476b9b2624015e81585383b925d9a Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Fri, 30 Jun 2023 13:25:16 +0900 Subject: [PATCH 02/12] Flush PruningMarks in statedb.Database --- storage/statedb/database.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/storage/statedb/database.go b/storage/statedb/database.go index da412095f4..9fd12896df 100644 --- a/storage/statedb/database.go +++ b/storage/statedb/database.go @@ -91,7 +91,8 @@ type Database struct { oldest common.ExtHash // Oldest tracked node, flush-list head newest common.ExtHash // Newest tracked node, flush-list tail - preimages map[common.Hash][]byte // Preimages of nodes from the secure trie + preimages map[common.Hash][]byte // Preimages of nodes from the secure trie + pruningMarks []database.PruningMark // Trie node pruning marks from the pruning trie gctime time.Duration // Time spent on garbage collection since last commit gcnodes uint64 // Nodes garbage collected since last commit @@ -465,6 +466,10 @@ func (db *Database) insertPreimage(hash common.Hash, preimage []byte) { db.preimagesSize += common.StorageSize(common.HashLength + len(preimage)) } +func (db *Database) insertPruningMark(mark database.PruningMark) { + db.pruningMarks = append(db.pruningMarks, mark) +} + // getCachedNode finds an encoded node in the trie node cache if enabled. func (db *Database) getCachedNode(hash common.ExtHash) []byte { if db.trieNodeCache != nil { @@ -752,11 +757,10 @@ func (db *Database) Cap(limit common.StorageSize) error { // leave for later to deduplicate writes. flushPreimages := db.preimagesSize > 4*1024*1024 if flushPreimages { - if err := db.writeBatchPreimages(); err != nil { - db.lock.RUnlock() - return err - } + db.diskDB.WritePreimages(0, db.preimages) } + db.diskDB.WritePruningMarks(db.pruningMarks) + // Keep committing nodes from the flush-list until we're below allowance oldest := db.oldest batch := db.diskDB.NewBatch(database.StateTrieDB) @@ -795,6 +799,8 @@ func (db *Database) Cap(limit common.StorageSize) error { db.preimages = make(map[common.Hash][]byte) db.preimagesSize = 0 } + db.pruningMarks = []database.PruningMark{} + for db.oldest != oldest { node := db.nodes[db.oldest] delete(db.nodes, db.oldest) @@ -822,11 +828,6 @@ func (db *Database) Cap(limit common.StorageSize) error { return nil } -func (db *Database) writeBatchPreimages() error { - db.diskDB.WritePreimages(0, db.preimages) - return nil -} - // commitResult contains the result from concurrent commit calls. // key and val are nil if the commitResult indicates the end of // concurrentCommit goroutine. @@ -899,10 +900,8 @@ func (db *Database) Commit(root common.Hash, report bool, blockNum uint64) error db.lock.RLock() commitStart := time.Now() - if err := db.writeBatchPreimages(); err != nil { - db.lock.RUnlock() - return err - } + db.diskDB.WritePreimages(0, db.preimages) + db.diskDB.WritePruningMarks(db.pruningMarks) // Move the trie itself into the batch, flushing if enough data is accumulated numNodes, nodesSize := len(db.nodes), db.nodesSize @@ -919,6 +918,7 @@ func (db *Database) Commit(root common.Hash, report bool, blockNum uint64) error db.preimages = make(map[common.Hash][]byte) db.preimagesSize = 0 + db.pruningMarks = []database.PruningMark{} uncacheStart := time.Now() db.uncache(hash) From b82eea0440b000118813f2d80f6f4fa04e4f785d Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Fri, 30 Jun 2023 13:25:39 +0900 Subject: [PATCH 03/12] Mark obsolete nodes for pruning --- storage/statedb/errors.go | 1 + storage/statedb/iterator_test.go | 8 ++- storage/statedb/trie.go | 50 +++++++++++++-- storage/statedb/trie_test.go | 102 ++++++++++++++++++++++++++++++- 4 files changed, 151 insertions(+), 10 deletions(-) diff --git a/storage/statedb/errors.go b/storage/statedb/errors.go index ada679db97..a47d9cb294 100644 --- a/storage/statedb/errors.go +++ b/storage/statedb/errors.go @@ -40,3 +40,4 @@ func (err *MissingNodeError) Error() string { } var ErrZeroHashNode = errors.New("cannot retrieve a node which has 0x00 hash value") +var ErrPruningDisabled = errors.New("pruning is disabled on database") diff --git a/storage/statedb/iterator_test.go b/storage/statedb/iterator_test.go index 500839f5f8..a4ae406c68 100644 --- a/storage/statedb/iterator_test.go +++ b/storage/statedb/iterator_test.go @@ -141,15 +141,17 @@ func TestNodeIteratorCoverage(t *testing.T) { // NodeIterator yields exact same result for Trie and StroageTrie func TestNodeIteratorStorageTrie(t *testing.T) { - triedb := NewDatabase(database.NewMemoryDBManager()) + dbm := database.NewMemoryDBManager() + dbm.WritePruningEnabled() + triedb := NewDatabase(dbm) - trie1, _ := NewTrie(common.Hash{}, triedb, &TrieOpts{Pruning: true}) + trie1, _ := NewTrie(common.Hash{}, triedb, nil) hashes1 := make(map[common.Hash]struct{}) for it := trie1.NodeIterator(nil); it.Next(true); { hashes1[it.Hash()] = struct{}{} } - trie2, _ := NewStorageTrie(common.ExtHash{}, triedb, &TrieOpts{Pruning: true}) + trie2, _ := NewStorageTrie(common.ExtHash{}, triedb, nil) hashes2 := make(map[common.Hash]struct{}) for it := trie2.NodeIterator(nil); it.Next(true); { hashes2[it.Hash()] = struct{}{} diff --git a/storage/statedb/trie.go b/storage/statedb/trie.go index f90ec48c72..f4bbc5ab1c 100644 --- a/storage/statedb/trie.go +++ b/storage/statedb/trie.go @@ -27,6 +27,7 @@ import ( "github.com/klaytn/klaytn/common" "github.com/klaytn/klaytn/crypto" + "github.com/klaytn/klaytn/storage/database" ) var ( @@ -41,10 +42,10 @@ var ( type TrieOpts struct { Prefetching bool // If true, certain metric is enabled - // If true, enable live pruning. i.e. node hashes are ExtHash with fresh nonces - // and obsolete nodes are enqueued in the pruning queue. - // TODO-Klaytn-Pruning: Prune obsolete nodes - Pruning bool + // If PruningBlockNumber is nonzero, trie update and delete operations + // will schedule obsolete nodes to be pruned when the given block number becomes obsolete. + // This option is only viable when the pruning is enabled on database. + PruningBlockNumber uint64 } // LeafCallback is a callback type invoked when a trie operation reaches a leaf @@ -73,6 +74,7 @@ type Trie struct { db *Database root node originalRoot common.ExtHash + pruning bool // True if the underlying database has pruning enabled. storage bool // If storage and Pruning are both true, root hash is attached a fresh nonce. } @@ -115,9 +117,13 @@ func newTrie(root common.ExtHash, db *Database, opts *TrieOpts, storage bool) (* trie := &Trie{ TrieOpts: *opts, db: db, + pruning: db.diskDB.ReadPruningEnabled(), originalRoot: root, storage: storage, } + if !trie.pruning && trie.PruningBlockNumber != 0 { + return nil, ErrPruningDisabled + } if !common.EmptyExtHash(root) && root.Unextend() != emptyRoot { rootnode, err := trie.resolveHash(root[:], nil) if err != nil { @@ -330,6 +336,7 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error if !dirty || err != nil { return false, n, err } + t.markPrunableNode(n) // dirty; something's changed in the child return true, &shortNode{n.Key, nn, t.newFlag()}, nil } // Otherwise branch out at the index where they differ. @@ -343,6 +350,7 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error if err != nil { return false, nil, err } + t.markPrunableNode(n) // this node has changed // Replace this shortNode with the branch if it occurs at index 0. if matchlen == 0 { return true, branch, nil @@ -355,6 +363,7 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error if !dirty || err != nil { return false, n, err } + t.markPrunableNode(n) // dirty; something's changed in the child n = n.copy() n.flags = t.newFlag() n.Children[key[0]] = nn @@ -412,6 +421,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { return false, n, nil // don't replace n on mismatch } if matchlen == len(key) { + t.markPrunableNode(n) // it's the target leaf return true, nil, nil // remove n entirely for whole matches } // The key is longer than n.Key. Remove the remaining suffix @@ -422,6 +432,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { if !dirty || err != nil { return false, n, err } + t.markPrunableNode(n) // dirty; something's changed in the child switch child := child.(type) { case *shortNode: // Deleting from the subtrie reduced it to another @@ -440,6 +451,7 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { if !dirty || err != nil { return false, n, err } + t.markPrunableNode(n) // dirty; something's changed in the child n = n.copy() n.flags = t.newFlag() n.Children[key[0]] = nn @@ -577,7 +589,7 @@ func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (common.ExtHash, node } h := newHasher(&hasherOpts{ onleaf: onleaf, - pruning: t.Pruning, + pruning: t.pruning, // If database has pruning enabled, nodes must be stored with ExtHash. storageRoot: t.storage, }) defer returnHasherToPool(h) @@ -586,6 +598,34 @@ func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (common.ExtHash, node return hash, cached } +// Mark the node for later pruning by writing PruningMark to database. +func (t *Trie) markPrunableNode(n node) { + // Mark nodes only if both conditions are met: + // - t.pruning: database has pruning enabled, i.e. nodes are stored with ExtHash + // - t.PruningBlockNumber: requested pruning through state.New -> OpenTrie -> NewTrie. + if !t.pruning || t.PruningBlockNumber == 0 { + return + } + + if hn, ok := n.(hashNode); ok { + // If a node exists as a hashNode, it means the node is either: + // (1) lives in database but yet to be resolved - subject to pruning, + // (2) collapsed by Hash or Commit - may or may not be in database, add the mark anyway. + t.db.insertPruningMark(database.PruningMark{ + Number: t.PruningBlockNumber, + Hash: common.BytesToExtHash(hn), + }) + } else if hn, _ := n.cache(); hn != nil { + // If node.flags.hash is nonempty, it means the node is either: + // (1) loaded from databas - subject to pruning, + // (2) went through hasher by Hash or Commit - may or may not be in database, add the mark anyway. + t.db.insertPruningMark(database.PruningMark{ + Number: t.PruningBlockNumber, + Hash: common.BytesToExtHash(hn), + }) + } +} + func GetHashAndHexKey(key []byte) ([]byte, []byte) { var hashKeyBuf [common.HashLength]byte h := newHasher(nil) diff --git a/storage/statedb/trie_test.go b/storage/statedb/trie_test.go index 38a72abc18..7066d3f4e6 100644 --- a/storage/statedb/trie_test.go +++ b/storage/statedb/trie_test.go @@ -324,8 +324,12 @@ func TestLargeValue(t *testing.T) { func TestStorageTrie(t *testing.T) { newStorageTrie := func(pruning bool) *Trie { - db := NewDatabase(database.NewMemoryDBManager()) - trie, _ := NewStorageTrie(common.ExtHash{}, db, &TrieOpts{Pruning: pruning}) + dbm := database.NewMemoryDBManager() + if pruning { + dbm.WritePruningEnabled() + } + db := NewDatabase(dbm) + trie, _ := NewStorageTrie(common.ExtHash{}, db, nil) updateString(trie, "doe", "reindeer") return trie } @@ -349,6 +353,100 @@ func TestStorageTrie(t *testing.T) { assert.False(t, root.IsLegacy()) } +func TestPruningByUpdate(t *testing.T) { + dbm := database.NewMemoryDBManager() + dbm.WritePruningEnabled() + db := NewDatabase(dbm) + hasnode := func(hash common.ExtHash) bool { ok, _ := dbm.HasTrieNode(hash); return ok } + common.ResetExtHashCounterForTest(0xccccddddeeee00) + + trie, _ := NewTrie(common.Hash{}, db, &TrieOpts{PruningBlockNumber: 1}) + nodehash1 := common.HexToExtHash("05ae693aac2107336a79309e0c60b24a7aac6aa3edecaef593921500d33c63c400000000000000") + nodehash2 := common.HexToExtHash("f226ef598ed9195f2211546cf5b2860dc27b4da07ff7ab5108ee68107f0c9d00ccccddddeeee01") + + // Test that extension and branch nodes are correctly pruned via Update. + // - extension <05ae693aac2107336a79309e0c60b24a7aac6aa3edecaef593921500d33c63c400000000000045> + // - branch + // - [5]value "reindeer" + // - [7]value "puppy" + // By inserting "dogglesworth", both extension and branch nodes are affected, hence pruning the both. + + // Update and commit to store the nodes + updateString(trie, "doe", "reindeer") + updateString(trie, "dog", "puppy") + trie.Commit(nil) + db.Cap(0) + + // The nodes still exist + assert.True(t, hasnode(nodehash1)) + assert.True(t, hasnode(nodehash2)) + + // Trigger pruning + updateString(trie, "dogglesworth", "cat") + trie.Commit(nil) + db.Cap(0) + + // Those nodes and the only those nodes are scheduled to be deleted + expectedMarks := []database.PruningMark{ + {Number: 1, Hash: nodehash1}, + {Number: 1, Hash: nodehash2}, + } + marks := dbm.ReadPruningMarks(0, 0) + assert.Equal(t, expectedMarks, marks) + + // The nodes are deleted + dbm.PruneTrieNodes(marks) + assert.False(t, hasnode(nodehash1)) + assert.False(t, hasnode(nodehash2)) +} + +func TestPruningByDelete(t *testing.T) { + dbm := database.NewMemoryDBManager() + dbm.WritePruningEnabled() + db := NewDatabase(dbm) + hasnode := func(hash common.ExtHash) bool { ok, _ := dbm.HasTrieNode(hash); return ok } + common.ResetExtHashCounterForTest(0xccccddddeeee00) + + trie, _ := NewTrie(common.Hash{}, db, &TrieOpts{PruningBlockNumber: 1}) + nodehash1 := common.HexToExtHash("05ae693aac2107336a79309e0c60b24a7aac6aa3edecaef593921500d33c63c400000000000000") + nodehash2 := common.HexToExtHash("f226ef598ed9195f2211546cf5b2860dc27b4da07ff7ab5108ee68107f0c9d00ccccddddeeee01") + + // Test that extension and branch nodes are correctly pruned via Delete. + // - extension <05ae693aac2107336a79309e0c60b24a7aac6aa3edecaef593921500d33c63c400000000000045> + // - branch + // - [5]value "reindeer" + // - [7]value "puppy" + // By deleting "doe", both extension and branch nodes are affected, hence pruning the both. + + // Update and commit to store the nodes + updateString(trie, "doe", "reindeer") + updateString(trie, "dog", "puppy") + trie.Commit(nil) + db.Cap(0) + + // The nodes still exist + assert.True(t, hasnode(nodehash1)) + assert.True(t, hasnode(nodehash2)) + + // Trigger pruning + deleteString(trie, "doe") + trie.Commit(nil) + db.Cap(0) + + // Those nodes and the only those nodes are scheduled to be deleted + expectedMarks := []database.PruningMark{ + {Number: 1, Hash: nodehash1}, + {Number: 1, Hash: nodehash2}, + } + marks := dbm.ReadPruningMarks(0, 0) + assert.Equal(t, expectedMarks, marks) + + // The nodes are deleted + dbm.PruneTrieNodes(marks) + assert.False(t, hasnode(nodehash1)) + assert.False(t, hasnode(nodehash2)) +} + type countingDB struct { database.DBManager gets map[string]int From 0b66648aaa7e5ed5d6f8cd01d34a84e79628288e Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Fri, 30 Jun 2023 13:26:59 +0900 Subject: [PATCH 04/12] Serialize StateObject with ExtHash preserved --- blockchain/state/state_object.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/blockchain/state/state_object.go b/blockchain/state/state_object.go index f8046681ed..5c6926960f 100644 --- a/blockchain/state/state_object.go +++ b/blockchain/state/state_object.go @@ -130,7 +130,8 @@ func newObject(db *StateDB, address common.Address, data account.Account) *state // EncodeRLP implements rlp.Encoder. func (c *stateObject) EncodeRLP(w io.Writer) error { - serializer := account.NewAccountSerializerWithAccount(c.account) + // State objects are RLP encoded with ExtHash preserved. + serializer := account.NewAccountSerializerExtWithAccount(c.account) return rlp.Encode(w, serializer) } From 1c3e2ef5c82db59f21e8a916d8127acac3a04320 Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Fri, 30 Jun 2023 13:27:47 +0900 Subject: [PATCH 05/12] Save TrieOpts in StateDB and use with OpenTrie --- blockchain/state/state_object.go | 3 +- blockchain/state/statedb.go | 8 +++-- blockchain/state/statedb_test.go | 61 ++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 5 deletions(-) diff --git a/blockchain/state/state_object.go b/blockchain/state/state_object.go index 5c6926960f..b401205ac2 100644 --- a/blockchain/state/state_object.go +++ b/blockchain/state/state_object.go @@ -35,7 +35,6 @@ import ( "github.com/klaytn/klaytn/crypto" "github.com/klaytn/klaytn/kerrors" "github.com/klaytn/klaytn/rlp" - "github.com/klaytn/klaytn/storage/statedb" ) var emptyCodeHash = crypto.Keccak256(nil) @@ -158,7 +157,7 @@ func (c *stateObject) touch() { } func (c *stateObject) openStorageTrie(hash common.ExtHash, db Database) (Trie, error) { - return db.OpenStorageTrie(hash, &statedb.TrieOpts{Prefetching: c.db.prefetching}) + return db.OpenStorageTrie(hash, c.db.trieOpts) } func (c *stateObject) getStorageTrie(db Database) Trie { diff --git a/blockchain/state/statedb.go b/blockchain/state/statedb.go index 12f79ff035..a2ac611ae7 100644 --- a/blockchain/state/statedb.go +++ b/blockchain/state/statedb.go @@ -60,8 +60,9 @@ var ( // StateDBs within the Klaytn protocol are used to cache stateObjects from Merkle Patricia Trie // and mediate the operations to them. type StateDB struct { - db Database - trie Trie + db Database + trie Trie + trieOpts *statedb.TrieOpts snaps *snapshot.Tree snap snapshot.Snapshot @@ -125,6 +126,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree, opts *statedb.Trie sdb := &StateDB{ db: db, trie: tr, + trieOpts: opts, snaps: snaps, stateObjects: make(map[common.Address]*stateObject), stateObjectsDirtyStorage: make(map[common.Address]struct{}), @@ -171,7 +173,7 @@ func (self *StateDB) Error() error { // Reset clears out all ephemeral state objects from the state db, but keeps // the underlying state trie to avoid reloading data for the next operations. func (self *StateDB) Reset(root common.Hash) error { - tr, err := self.db.OpenTrie(root, nil) + tr, err := self.db.OpenTrie(root, self.trieOpts) if err != nil { return err } diff --git a/blockchain/state/statedb_test.go b/blockchain/state/statedb_test.go index ebc2ac139b..85e93159f2 100644 --- a/blockchain/state/statedb_test.go +++ b/blockchain/state/statedb_test.go @@ -209,6 +209,67 @@ func TestStateObjects(t *testing.T) { assert.Equal(t, 128, len(stateDB.stateObjects)) } +// Test that invalid pruning options are prohibited. +func TestPruningOptions(t *testing.T) { + opens := func(pruning bool, pruningNum bool) bool { + dbm := database.NewMemoryDBManager() + opts := &statedb.TrieOpts{} + if pruning { + dbm.WritePruningEnabled() + } + if pruningNum { + opts.PruningBlockNumber = 1 + } + _, err := New(common.Hash{}, NewDatabase(dbm), nil, opts) + return err == nil + } + + // DB pruning disabled & not request pruning. Normal non-pruning setup. + assert.True(t, opens(false, false)) + // DB pruning disabled & request pruning must fail. + assert.False(t, opens(false, true)) + + // DB pruning enabled & not request pruning. Temporary trie, + // such as in debug_traceTransaction or eth_call. + assert.True(t, opens(true, false)) + // DB pruning enabled & request pruning. Normal pruning setup, + // such as in InsertChain. + assert.True(t, opens(true, true)) +} + +// Test that the storage root (ExtHash) has correct extensions +// under different pruning options. +func TestPruningRoot(t *testing.T) { + addr := common.HexToAddress("0xaaaa") + + makeState := func(db Database) common.Hash { + stateDB, _ := New(common.Hash{}, db, nil, nil) + stateDB.CreateSmartContractAccount(addr, params.CodeFormatEVM, params.Rules{}) + stateDB.SetState(addr, common.HexToHash("1"), common.HexToHash("2")) + root, _ := stateDB.Commit(false) + return root + } + + // When pruning is disabled, storage root is zero-extended. + dbm := database.NewMemoryDBManager() + + db := NewDatabase(dbm) + root := makeState(db) + stateDB, _ := New(root, db, nil, nil) + storageRoot, _ := stateDB.GetContractStorageRoot(addr) + assert.True(t, storageRoot.IsLegacy()) + + // When pruning is enabled, storage root is nonzero-extended. + dbm = database.NewMemoryDBManager() + dbm.WritePruningEnabled() + + db = NewDatabase(dbm) + root = makeState(db) + stateDB, _ = New(root, db, nil, nil) // Reopen trie to check the account stored in disk. + storageRoot, _ = stateDB.GetContractStorageRoot(addr) + assert.False(t, storageRoot.IsLegacy()) +} + // A snapshotTest checks that reverting StateDB snapshots properly undoes all changes // captured by the snapshot. Instances of this test with pseudorandom content are created // by Generate. From 9a16388d28ebdfb69ce585e8a67547100bdc8aca Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Tue, 4 Jul 2023 18:24:24 +0900 Subject: [PATCH 06/12] Live pruning in BlockChain --- blockchain/blockchain.go | 77 +++++++++++++++++++++++++++++------ blockchain/blockchain_test.go | 71 ++++++++++++++++++++++++++++++++ storage/statedb/database.go | 9 +++- work/mocks/blockchain_mock.go | 15 +++++++ work/work.go | 1 + work/worker.go | 2 +- 6 files changed, 160 insertions(+), 15 deletions(-) diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 1ab6272e19..e11d1da685 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -107,9 +107,10 @@ const ( ) const ( - DefaultTriesInMemory = 128 - DefaultBlockInterval = 128 - MaxPrefetchTxs = 20000 + DefaultTriesInMemory = 128 + DefaultBlockInterval = 128 + DefaultLivePruningRetention = 172800 + MaxPrefetchTxs = 20000 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // Changelog: @@ -127,6 +128,7 @@ type CacheConfig struct { CacheSize int // Size of in-memory cache of a trie (MiB) to flush matured singleton trie nodes to disk BlockInterval uint // Block interval to flush the trie. Each interval state trie will be flushed into disk TriesInMemory uint64 // Maximum number of recent state tries according to its block number + LivePruningRetention uint64 // Number of blocks before trie nodes in pruning marks to be deleted. If zero, obsolete nodes are not deleted. SenderTxHashIndexing bool // Enables saving senderTxHash to txHash mapping information to database and cache TrieNodeCacheConfig *statedb.TrieNodeCacheConfig // Configures trie node cache SnapshotCacheSize int // Memory allowance (MB) to use for caching snapshot entries in memory @@ -161,6 +163,7 @@ type BlockChain struct { snaps *snapshot.Tree // Snapshot tree for fast trie leaf access triegc *prque.Prque // Priority queue mapping block numbers to tries to gc chBlock chan gcBlock // chPushBlockGCPrque is a channel for delivering the gc item to gc loop. + chPrune chan uint64 // chPrune is a channel for delivering the current block number for pruning loop. hc *HeaderChain rmLogsFeed event.Feed @@ -224,13 +227,14 @@ type prefetchTx struct { func NewBlockChain(db database.DBManager, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { if cacheConfig == nil { cacheConfig = &CacheConfig{ - ArchiveMode: false, - CacheSize: 512, - BlockInterval: DefaultBlockInterval, - TriesInMemory: DefaultTriesInMemory, - TrieNodeCacheConfig: statedb.GetEmptyTrieNodeCacheConfig(), - SnapshotCacheSize: 512, - SnapshotAsyncGen: true, + ArchiveMode: false, + CacheSize: 512, + BlockInterval: DefaultBlockInterval, + TriesInMemory: DefaultTriesInMemory, + LivePruningRetention: DefaultLivePruningRetention, + TrieNodeCacheConfig: statedb.GetEmptyTrieNodeCacheConfig(), + SnapshotCacheSize: 512, + SnapshotAsyncGen: true, } } @@ -248,6 +252,7 @@ func NewBlockChain(db database.DBManager, cacheConfig *CacheConfig, chainConfig db: db, triegc: prque.New(), chBlock: make(chan gcBlock, 1000), + chPrune: make(chan uint64, 1000), stateCache: state.NewDatabaseWithNewCache(db, cacheConfig.TrieNodeCacheConfig), quit: make(chan struct{}), futureBlocks: futureBlocks, @@ -353,6 +358,7 @@ func NewBlockChain(db database.DBManager, cacheConfig *CacheConfig, chainConfig // Take ownership of this particular state go bc.update() bc.gcCachedNodeLoop() + bc.pruneTrieNodeLoop() bc.restartStateMigration() if cacheConfig.TrieNodeCacheConfig.DumpPeriodically() { @@ -687,6 +693,18 @@ func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) { return state.New(root, bc.stateCache, bc.snaps, nil) } +// PrunableStateAt returns a new mutable state based on a particular point in time. +// If live pruning is enabled on the databse, and num is nonzero, then trie will mark obsolete nodes for pruning. +func (bc *BlockChain) PrunableStateAt(root common.Hash, num uint64) (*state.StateDB, error) { + if bc.db.ReadPruningEnabled() && bc.cacheConfig.LivePruningRetention != 0 { + return state.New(root, bc.stateCache, bc.snaps, &statedb.TrieOpts{ + PruningBlockNumber: num, + }) + } else { + return bc.StateAt(root) + } +} + // StateAtWithPersistent returns a new mutable state based on a particular point in time with persistent trie nodes. func (bc *BlockChain) StateAtWithPersistent(root common.Hash) (*state.StateDB, error) { exist := bc.stateCache.TrieDB().DoesExistNodeInPersistent(root.ExtendLegacy()) @@ -1335,8 +1353,8 @@ func (bc *BlockChain) writeStateTrie(block *types.Block, state *state.StateDB) e logger.Error("Error from trieDB.Cap by state migration", "err", err) } } - bc.lastCommittedBlock = block.NumberU64() + bc.chPrune <- block.NumberU64() } bc.chBlock <- gcBlock{root, block.NumberU64()} @@ -1397,6 +1415,41 @@ func (bc *BlockChain) gcCachedNodeLoop() { }() } +func (bc *BlockChain) pruneTrieNodeLoop() { + // ReadPruningMarks(1, limit) is very slow because it iterates over the most of MiscDB. + // ReadPruningMarks(start, limit) is much faster because it only iterates a small range. + startNum := uint64(1) + + bc.wg.Add(1) + go func() { + defer bc.wg.Done() + for { + select { + case num := <-bc.chPrune: + if !bc.db.ReadPruningEnabled() || bc.cacheConfig.LivePruningRetention == 0 { + continue + } + if num <= bc.cacheConfig.LivePruningRetention { + continue + } + limit := num - bc.cacheConfig.LivePruningRetention // Prune [1, latest - retention] + + startTime := time.Now() + marks := bc.db.ReadPruningMarks(startNum, limit+1) + bc.db.PruneTrieNodes(marks) + bc.db.DeletePruningMarks(marks) + + logger.Info("Pruned trie nodes", "number", num, "start", startNum, "limit", limit, + "count", len(marks), "elapsed", time.Since(startTime)) + + startNum = limit + 1 + case <-bc.quit: + return + } + } + }() +} + func isCommitTrieRequired(bc *BlockChain, blockNum uint64) bool { if bc.prepareStateMigration { return true @@ -1918,7 +1971,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty return i, events, coalescedLogs, err } - stateDB, err := bc.StateAt(parent.Root()) + stateDB, err := bc.PrunableStateAt(parent.Root(), parent.NumberU64()) if err != nil { return i, events, coalescedLogs, err } diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index a2224369f2..01ec5f68b8 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -41,6 +41,7 @@ import ( "github.com/klaytn/klaytn/consensus" "github.com/klaytn/klaytn/consensus/gxhash" "github.com/klaytn/klaytn/crypto" + "github.com/klaytn/klaytn/log" "github.com/klaytn/klaytn/params" "github.com/klaytn/klaytn/rlp" "github.com/klaytn/klaytn/storage" @@ -48,6 +49,7 @@ import ( "github.com/klaytn/klaytn/storage/statedb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // So we can deterministically seed different blockchains @@ -1262,6 +1264,75 @@ func TestTrieForkGC(t *testing.T) { } } +// Tests that State pruning indeed deletes obsolete trie nodes. +func TestStatePruning(t *testing.T) { + log.EnableLogForTest(log.LvlCrit, log.LvlInfo) + var ( + db = database.NewMemoryDBManager() + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.HexToAddress("0xaaaa") + + gspec = &Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}, + } + genesis = gspec.MustCommit(db) + signer = types.LatestSignerForChainID(gspec.Config.ChainID) + engine = gxhash.NewFaker() + + // Latest `retention` blocks survive. + // Blocks 1..7 are pruned, blocks 8..10 are kept. + retention = uint64(3) + numBlocks = 10 + pruneNum = uint64(numBlocks) - retention + ) + + db.WritePruningEnabled() // Enable pruning on database by writing the flag at genesis + cacheConfig := &CacheConfig{ + ArchiveMode: false, + CacheSize: 512, + BlockInterval: 1, // Write frequently to test pruning + TriesInMemory: DefaultTriesInMemory, + LivePruningRetention: retention, // Enable pruning on blockchain by setting it nonzero + TrieNodeCacheConfig: statedb.GetEmptyTrieNodeCacheConfig(), + } + blockchain, _ := NewBlockChain(db, cacheConfig, gspec.Config, engine, vm.Config{}) + defer blockchain.Stop() + + chain, _ := GenerateChain(gspec.Config, genesis, engine, db, numBlocks, func(i int, gen *BlockGen) { + tx, _ := types.SignTx(types.NewTransaction( + gen.TxNonce(addr1), addr2, common.Big1, 21000, common.Big1, nil), signer, key1) + gen.AddTx(tx) + }) + if _, err := blockchain.InsertChain(chain); err != nil { + t.Fatalf("failed to insert chain: %v", err) + } + assert.Equal(t, uint64(numBlocks), blockchain.CurrentBlock().NumberU64()) + + // Give some time for pruning loop to run + time.Sleep(100 * time.Millisecond) + + // Genesis block always survives + state, err := blockchain.StateAt(genesis.Root()) + assert.Nil(t, err) + assert.NotZero(t, state.GetBalance(addr1).Uint64()) + + // Pruned blocks should be inaccessible. + for num := uint64(1); num <= pruneNum; num++ { + _, err := blockchain.StateAt(blockchain.GetBlockByNumber(num).Root()) + assert.IsType(t, &statedb.MissingNodeError{}, err, num) + } + + // Recent unpruned blocks should be accessible. + for num := pruneNum + 1; num < uint64(numBlocks); num++ { + state, err := blockchain.StateAt(blockchain.GetBlockByNumber(num).Root()) + require.Nil(t, err, num) + assert.NotZero(t, state.GetBalance(addr1).Uint64()) + assert.NotZero(t, state.GetBalance(addr2).Uint64()) + } +} + // TODO-Klaytn-FailedTest Failed test. Enable this later. /* // Tests that doing large reorgs works even if the state associated with the diff --git a/storage/statedb/database.go b/storage/statedb/database.go index 9fd12896df..8326d67a87 100644 --- a/storage/statedb/database.go +++ b/storage/statedb/database.go @@ -756,10 +756,13 @@ func (db *Database) Cap(limit common.StorageSize) error { // If the preimage cache got large enough, push to disk. If it's still small // leave for later to deduplicate writes. flushPreimages := db.preimagesSize > 4*1024*1024 + numPreimages := 0 if flushPreimages { db.diskDB.WritePreimages(0, db.preimages) + numPreimages = len(db.preimages) } db.diskDB.WritePruningMarks(db.pruningMarks) + numPruningMarks := len(db.pruningMarks) // Keep committing nodes from the flush-list until we're below allowance oldest := db.oldest @@ -824,7 +827,7 @@ func (db *Database) Cap(limit common.StorageSize) error { logger.Info("Persisted nodes from memory database by Cap", "nodes", nodes-len(db.nodes), "size", nodeSize-db.nodesSize, "preimagesSize", preimagesSize-db.preimagesSize, "time", time.Since(start), "flushnodes", db.flushnodes, "flushsize", db.flushsize, "flushtime", db.flushtime, "livenodes", len(db.nodes), - "livesize", db.nodesSize) + "livesize", db.nodesSize, "preimages", numPreimages, "pruningMarks", numPruningMarks) return nil } @@ -902,6 +905,8 @@ func (db *Database) Commit(root common.Hash, report bool, blockNum uint64) error commitStart := time.Now() db.diskDB.WritePreimages(0, db.preimages) db.diskDB.WritePruningMarks(db.pruningMarks) + numPreimages := len(db.preimages) + numPruningMarks := len(db.pruningMarks) // Move the trie itself into the batch, flushing if enough data is accumulated numNodes, nodesSize := len(db.nodes), db.nodesSize @@ -936,7 +941,7 @@ func (db *Database) Commit(root common.Hash, report bool, blockNum uint64) error localLogger("Persisted trie from memory database", "blockNum", blockNum, "updated nodes", numNodes-len(db.nodes), "updated nodes size", nodesSize-db.nodesSize, "time", commitEnd.Sub(commitStart), "gcnodes", db.gcnodes, "gcsize", db.gcsize, "gctime", db.gctime, - "livenodes", len(db.nodes), "livesize", db.nodesSize) + "livenodes", len(db.nodes), "livesize", db.nodesSize, "preimages", numPreimages, "pruningMarks", numPruningMarks) // Reset the garbage collection statistics db.gcnodes, db.gcsize, db.gctime = 0, 0, 0 diff --git a/work/mocks/blockchain_mock.go b/work/mocks/blockchain_mock.go index 680a2e4755..5938cadaf8 100644 --- a/work/mocks/blockchain_mock.go +++ b/work/mocks/blockchain_mock.go @@ -678,6 +678,21 @@ func (mr *MockBlockChainMockRecorder) Processor() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Processor", reflect.TypeOf((*MockBlockChain)(nil).Processor)) } +// PrunableStateAt mocks base method. +func (m *MockBlockChain) PrunableStateAt(arg0 common.Hash, arg1 uint64) (*state.StateDB, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PrunableStateAt", arg0, arg1) + ret0, _ := ret[0].(*state.StateDB) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PrunableStateAt indicates an expected call of PrunableStateAt. +func (mr *MockBlockChainMockRecorder) PrunableStateAt(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrunableStateAt", reflect.TypeOf((*MockBlockChain)(nil).PrunableStateAt), arg0, arg1) +} + // ResetWithGenesisBlock mocks base method. func (m *MockBlockChain) ResetWithGenesisBlock(arg0 *types.Block) error { m.ctrl.T.Helper() diff --git a/work/work.go b/work/work.go index 6b2ad8b95d..0a7dd69b88 100644 --- a/work/work.go +++ b/work/work.go @@ -271,6 +271,7 @@ type BlockChain interface { Processor() blockchain.Processor BadBlocks() ([]blockchain.BadBlockArgs, error) StateAt(root common.Hash) (*state.StateDB, error) + PrunableStateAt(root common.Hash, num uint64) (*state.StateDB, error) StateAtWithPersistent(root common.Hash) (*state.StateDB, error) StateAtWithGCLock(root common.Hash) (*state.StateDB, error) Export(w io.Writer) error diff --git a/work/worker.go b/work/worker.go index 84240a2b25..44bf043c37 100644 --- a/work/worker.go +++ b/work/worker.go @@ -478,7 +478,7 @@ func (self *worker) push(work *Task) { // makeCurrent creates a new environment for the current cycle. func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error { - stateDB, err := self.chain.StateAt(parent.Root()) + stateDB, err := self.chain.PrunableStateAt(parent.Root(), parent.NumberU64()) if err != nil { return err } From 044cecc33cb7dee9d075d2c13c1763c5e64847bc Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Tue, 4 Jul 2023 18:26:53 +0900 Subject: [PATCH 07/12] Add state.live-pruning CLI flags --- cmd/utils/config.go | 1 + cmd/utils/flaggroup.go | 2 ++ cmd/utils/flags.go | 11 +++++++++++ cmd/utils/nodecmd/chaincmd.go | 7 +++++++ cmd/utils/nodecmd/nodeflags.go | 1 + node/cn/backend.go | 12 +++++++++--- node/cn/config.go | 20 +++++++++++--------- 7 files changed, 42 insertions(+), 12 deletions(-) diff --git a/cmd/utils/config.go b/cmd/utils/config.go index 6b0103fdd2..1deae45f9b 100644 --- a/cmd/utils/config.go +++ b/cmd/utils/config.go @@ -540,6 +540,7 @@ func (kCfg *KlayConfig) SetKlayConfig(ctx *cli.Context, stack *node.Node) { common.DefaultCacheType = common.CacheType(ctx.GlobalInt(CacheTypeFlag.Name)) cfg.TrieBlockInterval = ctx.GlobalUint(TrieBlockIntervalFlag.Name) cfg.TriesInMemory = ctx.GlobalUint64(TriesInMemoryFlag.Name) + cfg.LivePruningRetention = ctx.GlobalUint64(LivePruningRetentionFlag.Name) if ctx.GlobalIsSet(CacheScaleFlag.Name) { common.CacheScale = ctx.GlobalInt(CacheScaleFlag.Name) diff --git a/cmd/utils/flaggroup.go b/cmd/utils/flaggroup.go index 3210312967..8a7eeb3886 100644 --- a/cmd/utils/flaggroup.go +++ b/cmd/utils/flaggroup.go @@ -176,6 +176,8 @@ var FlagGroups = []FlagGroup{ TrieMemoryCacheSizeFlag, TrieBlockIntervalFlag, TriesInMemoryFlag, + LivePruningFlag, + LivePruningRetentionFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ea13bdc239..544076b638 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -351,6 +351,17 @@ var ( Value: blockchain.DefaultTriesInMemory, EnvVar: "KLAYTN_STATE_TRIES_IN_MEMORY", } + LivePruningFlag = cli.BoolFlag{ + Name: "state.live-pruning", + Usage: "Enable trie live pruning", + EnvVar: "KLAYTN_STATE_LIVE_PRUNING", + } + LivePruningRetentionFlag = cli.Uint64Flag{ + Name: "state.live-pruning-retention", + Usage: "Number of blocks from the latest block that are not to be pruned", + Value: blockchain.DefaultLivePruningRetention, + EnvVar: "KLAYTN_STATE_LIVE_PRUNING_RETENTION", + } CacheTypeFlag = cli.IntFlag{ Name: "cache.type", Usage: "Cache Type: 0=LRUCache, 1=LRUShardCache, 2=FIFOCache", diff --git a/cmd/utils/nodecmd/chaincmd.go b/cmd/utils/nodecmd/chaincmd.go index b97aa3ceca..a94c139696 100644 --- a/cmd/utils/nodecmd/chaincmd.go +++ b/cmd/utils/nodecmd/chaincmd.go @@ -58,6 +58,7 @@ var ( utils.LevelDBCompressionTypeFlag, utils.DataDirFlag, utils.OverwriteGenesisFlag, + utils.LivePruningFlag, }, Category: "BLOCKCHAIN COMMANDS", Description: ` @@ -173,6 +174,12 @@ func initGenesis(ctx *cli.Context) error { logger.Crit("Failed to write governance items", "err", err) } + // Write the live pruning flag to database + if ctx.Bool(utils.LivePruningFlag.Name) { + chainDB.WritePruningEnabled() + logger.Info("Live pruning enabled") + } + logger.Info("Successfully wrote genesis state", "database", name, "hash", hash.String()) chainDB.Close() } diff --git a/cmd/utils/nodecmd/nodeflags.go b/cmd/utils/nodecmd/nodeflags.go index 25836d1b88..63b9bbbf98 100644 --- a/cmd/utils/nodecmd/nodeflags.go +++ b/cmd/utils/nodecmd/nodeflags.go @@ -156,6 +156,7 @@ var CommonNodeFlags = []cli.Flag{ altsrc.NewIntFlag(utils.TrieMemoryCacheSizeFlag), altsrc.NewUintFlag(utils.TrieBlockIntervalFlag), altsrc.NewUint64Flag(utils.TriesInMemoryFlag), + altsrc.NewUint64Flag(utils.LivePruningRetentionFlag), altsrc.NewIntFlag(utils.CacheTypeFlag), altsrc.NewIntFlag(utils.CacheScaleFlag), altsrc.NewStringFlag(utils.CacheUsageLevelFlag), diff --git a/node/cn/backend.go b/node/cn/backend.go index 651bc8f728..b678b2b4e6 100644 --- a/node/cn/backend.go +++ b/node/cn/backend.go @@ -253,9 +253,15 @@ func New(ctx *node.ServiceContext, config *Config) (*CN, error) { var ( vmConfig = config.getVMConfig() cacheConfig = &blockchain.CacheConfig{ - ArchiveMode: config.NoPruning, CacheSize: config.TrieCacheSize, - BlockInterval: config.TrieBlockInterval, TriesInMemory: config.TriesInMemory, - TrieNodeCacheConfig: &config.TrieNodeCacheConfig, SenderTxHashIndexing: config.SenderTxHashIndexing, SnapshotCacheSize: config.SnapshotCacheSize, SnapshotAsyncGen: config.SnapshotAsyncGen, + ArchiveMode: config.NoPruning, + CacheSize: config.TrieCacheSize, + BlockInterval: config.TrieBlockInterval, + TriesInMemory: config.TriesInMemory, + LivePruningRetention: config.LivePruningRetention, + TrieNodeCacheConfig: &config.TrieNodeCacheConfig, + SenderTxHashIndexing: config.SenderTxHashIndexing, + SnapshotCacheSize: config.SnapshotCacheSize, + SnapshotAsyncGen: config.SnapshotAsyncGen, } ) diff --git a/node/cn/config.go b/node/cn/config.go index 7600d8d828..43a473f8b6 100644 --- a/node/cn/config.go +++ b/node/cn/config.go @@ -45,15 +45,16 @@ var logger = log.NewModuleLogger(log.NodeCN) // GetDefaultConfig returns default settings for use on the Klaytn main net. func GetDefaultConfig() *Config { return &Config{ - SyncMode: downloader.FullSync, - NetworkId: params.CypressNetworkId, - LevelDBCacheSize: 768, - TrieCacheSize: 512, - TrieTimeout: 5 * time.Minute, - TrieBlockInterval: blockchain.DefaultBlockInterval, - TrieNodeCacheConfig: *statedb.GetEmptyTrieNodeCacheConfig(), - TriesInMemory: blockchain.DefaultTriesInMemory, - GasPrice: big.NewInt(18 * params.Ston), + SyncMode: downloader.FullSync, + NetworkId: params.CypressNetworkId, + LevelDBCacheSize: 768, + TrieCacheSize: 512, + TrieTimeout: 5 * time.Minute, + TrieBlockInterval: blockchain.DefaultBlockInterval, + TrieNodeCacheConfig: *statedb.GetEmptyTrieNodeCacheConfig(), + TriesInMemory: blockchain.DefaultTriesInMemory, + LivePruningRetention: blockchain.DefaultLivePruningRetention, + GasPrice: big.NewInt(18 * params.Ston), TxPool: blockchain.DefaultTxPoolConfig, GPO: gasprice.Config{ @@ -121,6 +122,7 @@ type Config struct { TrieTimeout time.Duration TrieBlockInterval uint TriesInMemory uint64 + LivePruningRetention uint64 SenderTxHashIndexing bool ParallelDBWrite bool TrieNodeCacheConfig statedb.TrieNodeCacheConfig From 1709442da15df276331df60fd6b6f8408493306c Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Thu, 6 Jul 2023 19:09:27 +0900 Subject: [PATCH 08/12] Allow --state.live-pruning as a global flag --- cmd/utils/config.go | 1 + cmd/utils/nodecmd/chaincmd.go | 5 +++-- cmd/utils/nodecmd/nodeflags.go | 1 + node/cn/backend.go | 14 ++++++++++++++ node/cn/config.go | 1 + 5 files changed, 20 insertions(+), 2 deletions(-) diff --git a/cmd/utils/config.go b/cmd/utils/config.go index 1deae45f9b..990f67a824 100644 --- a/cmd/utils/config.go +++ b/cmd/utils/config.go @@ -540,6 +540,7 @@ func (kCfg *KlayConfig) SetKlayConfig(ctx *cli.Context, stack *node.Node) { common.DefaultCacheType = common.CacheType(ctx.GlobalInt(CacheTypeFlag.Name)) cfg.TrieBlockInterval = ctx.GlobalUint(TrieBlockIntervalFlag.Name) cfg.TriesInMemory = ctx.GlobalUint64(TriesInMemoryFlag.Name) + cfg.LivePruning = ctx.GlobalBool(LivePruningFlag.Name) cfg.LivePruningRetention = ctx.GlobalUint64(LivePruningRetentionFlag.Name) if ctx.GlobalIsSet(CacheScaleFlag.Name) { diff --git a/cmd/utils/nodecmd/chaincmd.go b/cmd/utils/nodecmd/chaincmd.go index a94c139696..4eaf520cf0 100644 --- a/cmd/utils/nodecmd/chaincmd.go +++ b/cmd/utils/nodecmd/chaincmd.go @@ -133,6 +133,7 @@ func initGenesis(ctx *cli.Context) error { singleDB := ctx.GlobalIsSet(utils.SingleDBFlag.Name) numStateTrieShards := ctx.GlobalUint(utils.NumStateTrieShardsFlag.Name) overwriteGenesis := ctx.GlobalBool(utils.OverwriteGenesisFlag.Name) + livePruning := ctx.GlobalBool(utils.LivePruningFlag.Name) dbtype := database.DBType(ctx.GlobalString(utils.DbTypeFlag.Name)).ToValid() if len(dbtype) == 0 { @@ -175,9 +176,9 @@ func initGenesis(ctx *cli.Context) error { } // Write the live pruning flag to database - if ctx.Bool(utils.LivePruningFlag.Name) { + if livePruning { chainDB.WritePruningEnabled() - logger.Info("Live pruning enabled") + logger.Info("Enabling live pruning") } logger.Info("Successfully wrote genesis state", "database", name, "hash", hash.String()) diff --git a/cmd/utils/nodecmd/nodeflags.go b/cmd/utils/nodecmd/nodeflags.go index 63b9bbbf98..a9fa0b8596 100644 --- a/cmd/utils/nodecmd/nodeflags.go +++ b/cmd/utils/nodecmd/nodeflags.go @@ -156,6 +156,7 @@ var CommonNodeFlags = []cli.Flag{ altsrc.NewIntFlag(utils.TrieMemoryCacheSizeFlag), altsrc.NewUintFlag(utils.TrieBlockIntervalFlag), altsrc.NewUint64Flag(utils.TriesInMemoryFlag), + altsrc.NewBoolFlag(utils.LivePruningFlag), altsrc.NewUint64Flag(utils.LivePruningRetentionFlag), altsrc.NewIntFlag(utils.CacheTypeFlag), altsrc.NewIntFlag(utils.CacheScaleFlag), diff --git a/node/cn/backend.go b/node/cn/backend.go index b678b2b4e6..f692f3e436 100644 --- a/node/cn/backend.go +++ b/node/cn/backend.go @@ -271,6 +271,20 @@ func New(ctx *node.ServiceContext, config *Config) (*CN, error) { } bc.SetCanonicalBlock(config.StartBlockNumber) + // Write the live pruning flag to database if the node is started for the first time + if config.LivePruning && !chainDB.ReadPruningEnabled() { + if bc.CurrentBlock().NumberU64() > 0 { + return nil, errors.New("cannot enable live pruning after chain has advanced") + } + chainDB.WritePruningEnabled() + logger.Info("Enabling live pruning") + } + // Live pruning is enabled according to the flag in database + // regardless of the command line flag --state.live-pruning + if chainDB.ReadPruningEnabled() { + logger.Info("Live pruning is enabled") + } + cn.blockchain = bc governance.SetBlockchain(cn.blockchain) if err := governance.UpdateParams(cn.blockchain.CurrentBlock().NumberU64()); err != nil { diff --git a/node/cn/config.go b/node/cn/config.go index 43a473f8b6..3e18ffd919 100644 --- a/node/cn/config.go +++ b/node/cn/config.go @@ -122,6 +122,7 @@ type Config struct { TrieTimeout time.Duration TrieBlockInterval uint TriesInMemory uint64 + LivePruning bool LivePruningRetention uint64 SenderTxHashIndexing bool ParallelDBWrite bool From 05a54f1647fc4960f82aee621e458cb2b82d8dd2 Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Tue, 25 Jul 2023 09:53:04 +0900 Subject: [PATCH 09/12] Fix logging and comments --- node/cn/backend.go | 3 ++- storage/database/db_manager.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/node/cn/backend.go b/node/cn/backend.go index f692f3e436..b8d78026cd 100644 --- a/node/cn/backend.go +++ b/node/cn/backend.go @@ -281,7 +281,8 @@ func New(ctx *node.ServiceContext, config *Config) (*CN, error) { } // Live pruning is enabled according to the flag in database // regardless of the command line flag --state.live-pruning - if chainDB.ReadPruningEnabled() { + // But live pruning is disabled when --state.live-pruning-retention 0 + if chainDB.ReadPruningEnabled() && config.LivePruningRetention != 0 { logger.Info("Live pruning is enabled") } diff --git a/storage/database/db_manager.go b/storage/database/db_manager.go index c28231f8ed..117fba84b5 100644 --- a/storage/database/db_manager.go +++ b/storage/database/db_manager.go @@ -1873,6 +1873,7 @@ func (dbm *databaseManager) PutTrieNodeToBatch(batch Batch, hash common.ExtHash, } } +// DeleteTrieNode deletes a trie node having a specific hash. It is used only for testing. func (dbm *databaseManager) DeleteTrieNode(hash common.ExtHash) { if err := dbm.getDatabase(StateTrieDB).Delete(TrieNodeKey(hash)); err != nil { logger.Crit("Failed to delete trie node", "err", err) From 60df92ef57ddfdd4d1e1078f6112bbb6c7db6af0 Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Tue, 25 Jul 2023 10:04:41 +0900 Subject: [PATCH 10/12] Increase blockchain chBlock and chQueue size --- blockchain/blockchain.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index e11d1da685..faf110fd75 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -251,8 +251,8 @@ func NewBlockChain(db database.DBManager, cacheConfig *CacheConfig, chainConfig cacheConfig: cacheConfig, db: db, triegc: prque.New(), - chBlock: make(chan gcBlock, 1000), - chPrune: make(chan uint64, 1000), + chBlock: make(chan gcBlock, 2048), // downloader.maxResultsProcess + chPrune: make(chan uint64, 2048), // downloader.maxResultsProcess stateCache: state.NewDatabaseWithNewCache(db, cacheConfig.TrieNodeCacheConfig), quit: make(chan struct{}), futureBlocks: futureBlocks, From 0e387cd8164ff32b1bee2ce9b6f3d96fd15c037c Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Tue, 25 Jul 2023 15:01:48 +0900 Subject: [PATCH 11/12] Fix TestStatePruning to simulate node restart --- blockchain/blockchain_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index 01ec5f68b8..b55d34a3e4 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -1292,13 +1292,12 @@ func TestStatePruning(t *testing.T) { cacheConfig := &CacheConfig{ ArchiveMode: false, CacheSize: 512, - BlockInterval: 1, // Write frequently to test pruning + BlockInterval: 2, // Write frequently to test pruning TriesInMemory: DefaultTriesInMemory, LivePruningRetention: retention, // Enable pruning on blockchain by setting it nonzero TrieNodeCacheConfig: statedb.GetEmptyTrieNodeCacheConfig(), } blockchain, _ := NewBlockChain(db, cacheConfig, gspec.Config, engine, vm.Config{}) - defer blockchain.Stop() chain, _ := GenerateChain(gspec.Config, genesis, engine, db, numBlocks, func(i int, gen *BlockGen) { tx, _ := types.SignTx(types.NewTransaction( @@ -1313,6 +1312,14 @@ func TestStatePruning(t *testing.T) { // Give some time for pruning loop to run time.Sleep(100 * time.Millisecond) + // Note that even if trie nodes are deleted from disk (DiskDB), + // they may still be cached in memory (TrieDB). + // + // Therefore reopen the blockchain from the DiskDB with a clean TrieDB. + // This simulates the node program restart. + blockchain.Stop() + blockchain, _ = NewBlockChain(db, cacheConfig, gspec.Config, engine, vm.Config{}) + // Genesis block always survives state, err := blockchain.StateAt(genesis.Root()) assert.Nil(t, err) @@ -1331,6 +1338,7 @@ func TestStatePruning(t *testing.T) { assert.NotZero(t, state.GetBalance(addr1).Uint64()) assert.NotZero(t, state.GetBalance(addr2).Uint64()) } + blockchain.Stop() } // TODO-Klaytn-FailedTest Failed test. Enable this later. From 01d7d1f48eb82ad92c3fd1e99f2b2ca2d9f46776 Mon Sep 17 00:00:00 2001 From: "ollie.j" Date: Tue, 25 Jul 2023 15:05:31 +0900 Subject: [PATCH 12/12] Acquire db.lock around insertPruningMark --- storage/statedb/database.go | 9 ++++++-- storage/statedb/trie.go | 42 +++++++++++++++++++++++-------------- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/storage/statedb/database.go b/storage/statedb/database.go index 8326d67a87..cefe0e2f32 100644 --- a/storage/statedb/database.go +++ b/storage/statedb/database.go @@ -466,8 +466,13 @@ func (db *Database) insertPreimage(hash common.Hash, preimage []byte) { db.preimagesSize += common.StorageSize(common.HashLength + len(preimage)) } -func (db *Database) insertPruningMark(mark database.PruningMark) { - db.pruningMarks = append(db.pruningMarks, mark) +// insertPruningMark writes a new pruning mark to the memory database. +// Note, this method assumes that the database's lock is held! +func (db *Database) insertPruningMark(hash common.ExtHash, blockNum uint64) { + db.pruningMarks = append(db.pruningMarks, database.PruningMark{ + Number: blockNum, + Hash: hash, + }) } // getCachedNode finds an encoded node in the trie node cache if enabled. diff --git a/storage/statedb/trie.go b/storage/statedb/trie.go index f4bbc5ab1c..3b6654aad3 100644 --- a/storage/statedb/trie.go +++ b/storage/statedb/trie.go @@ -27,7 +27,6 @@ import ( "github.com/klaytn/klaytn/common" "github.com/klaytn/klaytn/crypto" - "github.com/klaytn/klaytn/storage/database" ) var ( @@ -74,8 +73,10 @@ type Trie struct { db *Database root node originalRoot common.ExtHash - pruning bool // True if the underlying database has pruning enabled. - storage bool // If storage and Pruning are both true, root hash is attached a fresh nonce. + + pruning bool // True if the underlying database has pruning enabled. + storage bool // If storage and Pruning are both true, root hash is attached a fresh nonce. + pruningMarksCache map[common.ExtHash]uint64 } // newFlag returns the cache flag value for a newly created node. @@ -115,11 +116,12 @@ func newTrie(root common.ExtHash, db *Database, opts *TrieOpts, storage bool) (* } trie := &Trie{ - TrieOpts: *opts, - db: db, - pruning: db.diskDB.ReadPruningEnabled(), - originalRoot: root, - storage: storage, + TrieOpts: *opts, + db: db, + originalRoot: root, + pruning: db.diskDB.ReadPruningEnabled(), + storage: storage, + pruningMarksCache: make(map[common.ExtHash]uint64), } if !trie.pruning && trie.PruningBlockNumber != 0 { return nil, ErrPruningDisabled @@ -578,6 +580,7 @@ func (t *Trie) CommitExt(onleaf LeafCallback) (root common.ExtHash, err error) { if t.db == nil { panic("commit called on trie with nil database") } + t.commitPruningMarks() hash, cached := t.hashRoot(t.db, onleaf) t.root = cached return hash, nil @@ -611,18 +614,25 @@ func (t *Trie) markPrunableNode(n node) { // If a node exists as a hashNode, it means the node is either: // (1) lives in database but yet to be resolved - subject to pruning, // (2) collapsed by Hash or Commit - may or may not be in database, add the mark anyway. - t.db.insertPruningMark(database.PruningMark{ - Number: t.PruningBlockNumber, - Hash: common.BytesToExtHash(hn), - }) + t.pruningMarksCache[common.BytesToExtHash(hn)] = t.PruningBlockNumber } else if hn, _ := n.cache(); hn != nil { // If node.flags.hash is nonempty, it means the node is either: // (1) loaded from databas - subject to pruning, // (2) went through hasher by Hash or Commit - may or may not be in database, add the mark anyway. - t.db.insertPruningMark(database.PruningMark{ - Number: t.PruningBlockNumber, - Hash: common.BytesToExtHash(hn), - }) + t.pruningMarksCache[common.BytesToExtHash(hn)] = t.PruningBlockNumber + } +} + +// commitPruningMarks writes all the pruning marks +func (t *Trie) commitPruningMarks() { + if len(t.pruningMarksCache) > 0 { + t.db.lock.Lock() + for hash, blockNum := range t.pruningMarksCache { + t.db.insertPruningMark(hash, blockNum) + } + t.db.lock.Unlock() + + t.pruningMarksCache = make(map[common.ExtHash]uint64) } }