10000 KIP-111 Prune obsolete trie nodes by blukat29 · Pull Request #1871 · klaytn/klaytn · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
This repository was archived by the owner on Aug 19, 2024. It is now read-only.

KIP-111 Prune obsolete trie nodes #1871

Merged
merged 12 commits into from
Jul 26, 2023
79 changes: 66 additions & 13 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ const (
)

const (
DefaultTriesInMemory = 128
DefaultBlockInterval = 128
MaxPrefetchTxs = 20000
DefaultTriesInMemory = 128
DefaultBlockInterval = 128
DefaultLivePruningRetention = 172800
MaxPrefetchTxs = 20000

// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
// Changelog:
Expand All @@ -127,6 +128,7 @@ type CacheConfig struct {
CacheSize int // Size of in-memory cache of a trie (MiB) to flush matured singleton trie nodes to disk
BlockInterval uint // Block interval to flush the trie. Each interval state trie will be flushed into disk
TriesInMemory uint64 // Maximum number of recent state tries according to its block number
LivePruningRetention uint64 // Number of blocks before trie nodes in pruning marks to be deleted. If zero, obsolete nodes are not deleted.
SenderTxHashIndexing bool // Enables saving senderTxHash to txHash mapping information to database and cache
TrieNodeCacheConfig *statedb.TrieNodeCacheConfig // Configures trie node cache
SnapshotCacheSize int // Memory allowance (MB) to use for caching snapshot entries in memory
Expand Down Expand Up @@ -161,6 +163,7 @@ type BlockChain struct {
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
chBlock chan gcBlock // chPushBlockGCPrque is a channel for delivering the gc item to gc loop.
chPrune chan uint64 // chPrune is a channel for delivering the current block number for pruning loop.

hc *HeaderChain
rmLogsFeed event.Feed
Expand Down Expand Up @@ -224,13 +227,14 @@ type prefetchTx struct {
func NewBlockChain(db database.DBManager, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
if cacheConfig == nil {
cacheConfig = &CacheConfig{
ArchiveMode: false,
CacheSize: 512,
BlockInterval: DefaultBlockInterval,
TriesInMemory: DefaultTriesInMemory,
TrieNodeCacheConfig: statedb.GetEmptyTrieNodeCacheConfig(),
SnapshotCacheSize: 512,
SnapshotAsyncGen: true,
ArchiveMode: false,
CacheSize: 512,
BlockInterval: DefaultBlockInterval,
TriesInMemory: DefaultTriesInMemory,
LivePruningRetention: DefaultLivePruningRetention,
TrieNodeCacheConfig: statedb.GetEmptyTrieNodeCacheConfig(),
SnapshotCacheSize: 512,
SnapshotAsyncGen: true,
}
}

Expand All @@ -247,7 +251,8 @@ func NewBlockChain(db database.DBManager, cacheConfig *CacheConfig, chainConfig
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
chBlock: make(chan gcBlock, 1000),
chBlock: make(chan gcBlock, 2048), // downloader.maxResultsProcess
chPrune: make(chan uint64, 2048), // downloader.maxResultsProcess
stateCache: state.NewDatabaseWithNewCache(db, cacheConfig.TrieNodeCacheConfig),
quit: make(chan struct{}),
futureBlocks: futureBlocks,
Expand Down Expand Up @@ -353,6 +358,7 @@ func NewBlockChain(db database.DBManager, cacheConfig *CacheConfig, chainConfig
// Take ownership of this particular state
go bc.update()
bc.gcCachedNodeLoop()
bc.pruneTrieNodeLoop()
bc.restartStateMigration()

if cacheConfig.TrieNodeCacheConfig.DumpPeriodically() {
Expand Down Expand Up @@ -687,6 +693,18 @@ func (bc *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
return state.New(root, bc.stateCache, bc.snaps, nil)
}

// PrunableStateAt returns a new mutable state based on a particular point in time.
// If live pruning is enabled on the databse, and num is nonzero, then trie will mark obsolete nodes for pruning.
func (bc *BlockChain) PrunableStateAt(root common.Hash, num uint64) (*state.StateDB, error) {
if bc.db.ReadPruningEnabled() && bc.cacheConfig.LivePruningRetention != 0 {
return state.New(root, bc.stateCache, bc.snaps, &statedb.TrieOpts{
PruningBlockNumber: num,
})
} else {
return bc.StateAt(root)
}
}

// StateAtWithPersistent returns a new mutable state based on a particular point in time with persistent trie nodes.
func (bc *BlockChain) StateAtWithPersistent(root common.Hash) (*state.StateDB, error) {
exist := bc.stateCache.TrieDB().DoesExistNodeInPersistent(root.ExtendLegacy())
Expand Down Expand Up @@ -1335,8 +1353,8 @@ func (bc *BlockChain) writeStateTrie(block *types.Block, state *state.StateDB) e
logger.Error("Error from trieDB.Cap by state migration", "err", err)
}
}

bc.lastCommittedBlock = block.NumberU64()
bc.chPrune <- block.NumberU64()
}

bc.chBlock <- gcBlock{root, block.NumberU64()}
Expand Down Expand Up @@ -1397,6 +1415,41 @@ func (bc *BlockChain) gcCachedNodeLoop() {
}()
}

func (bc *BlockChain) pruneTrieNodeLoop() {
// ReadPruningMarks(1, limit) is very slow because it iterates over the most of MiscDB.
// ReadPruningMarks(start, limit) is much faster because it only iterates a small range.
startNum := uint64(1)

bc.wg.Add(1)
go func() {
defer bc.wg.Done()
for {
select {
case num := <-bc.chPrune:
if !bc.db.ReadPruningEnabled() || bc.cacheConfig.LivePruningRetention == 0 {
continue
}
if num <= bc.cacheConfig.LivePruningRetention {
continue
}
limit := num - bc.cacheConfig.LivePruningRetention // Prune [1, latest - retention]

startTime := time.Now()
marks := bc.db.ReadPruningMarks(startNum, limit+1)
bc.db.PruneTrieNodes(marks)
bc.db.DeletePruningMarks(marks)

logger.Info("Pruned trie nodes", "number", num, "start", startNum, "limit", limit,
"count", len(marks), "elapsed", time.Since(startTime))

startNum = limit + 1
case <-bc.quit:
return
}
}
}()
}

func isCommitTrieRequired(bc *BlockChain, blockNum uint64) bool {
if bc.prepareStateMigration {
return true
Expand Down Expand Up @@ -1918,7 +1971,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
return i, events, coalescedLogs, err
}

stateDB, err := bc.StateAt(parent.Root())
stateDB, err := bc.PrunableStateAt(parent.Root(), parent.NumberU64())
if err != nil {
return i, events, coalescedLogs, err
}
Expand Down
79 changes: 79 additions & 0 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ import (
"github.com/klaytn/klaytn/consensus"
"github.com/klaytn/klaytn/consensus/gxhash"
"github.com/klaytn/klaytn/crypto"
"github.com/klaytn/klaytn/log"
"github.com/klaytn/klaytn/params"
"github.com/klaytn/klaytn/rlp"
"github.com/klaytn/klaytn/storage"
"github.com/klaytn/klaytn/storage/database"
"github.com/klaytn/klaytn/storage/statedb"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// So we can deterministically seed different blockchains
Expand Down Expand Up @@ -1262,6 +1264,83 @@ func TestTrieForkGC(t *testing.T) {
}
}

// Tests that State pruning indeed deletes obsolete trie nodes.
func TestStatePruning(t *testing.T) {
log.EnableLogForTest(log.LvlCrit, log.LvlInfo)
var (
db = database.NewMemoryDBManager()
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.HexToAddress("0xaaaa")

gspec = &Genesis{
Config: params.TestChainConfig,
Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}},
}
genesis = gspec.MustCommit(db)
signer = types.LatestSignerForChainID(gspec.Config.ChainID)
engine = gxhash.NewFaker()

// Latest `retention` blocks survive.
// Blocks 1..7 are pruned, blocks 8..10 are kept.
retention = uint64(3)
numBlocks = 10
pruneNum = uint64(numBlocks) - retention
)

db.WritePruningEnabled() // Enable pruning on database by writing the flag at genesis
cacheConfig := &CacheConfig{
ArchiveMode: false,
CacheSize: 512,
BlockInterval: 2, // Write frequently to test pruning
TriesInMemory: DefaultTriesInMemory,
LivePruningRetention: retention, // Enable pruning on blockchain by setting it nonzero
TrieNodeCacheConfig: statedb.GetEmptyTrieNodeCacheConfig(),
}
blockchain, _ := NewBlockChain(db, cacheConfig, gspec.Config, engine, vm.Config{})

chain, _ := GenerateChain(gspec.Config, genesis, engine, db, numBlocks, func(i int, gen *BlockGen) {
tx, _ := types.SignTx(types.NewTransaction(
gen.TxNonce(addr1), addr2, common.Big1, 21000, common.Big1, nil), signer, key1)
gen.AddTx(tx)
})
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}
assert.Equal(t, uint64(numBlocks), blockchain.CurrentBlock().NumberU64())

// Give some time for pruning loop to run
time.Sleep(100 * time.Millisecond)

// Note that even if trie nodes are deleted from disk (DiskDB),
// they may still be cached in memory (TrieDB).
//
// Therefore reopen the blockchain from the DiskDB with a clean TrieDB.
// This simulates the node program restart.
blockchain.Stop()
blockchain, _ = NewBlockChain(db, cacheConfig, gspec.Config, engine, vm.Config{})

// Genesis block always survives
state, err := blockchain.StateAt(genesis.Root())
assert.Nil(t, err)
assert.NotZero(t, state.GetBalance(addr1).Uint64())

// Pruned blocks should be inaccessible.
for num := uint64(1); num <= pruneNum; num++ {
_, err := blockchain.StateAt(blockchain.GetBlockByNumber(num).Root())
assert.IsType(t, &statedb.MissingNodeError{}, err, num)
}

// Recent unpruned blocks should be accessible.
for num := pruneNum + 1; num < uint64(numBlocks); num++ {
state, err := blockchain.StateAt(blockchain.GetBlockByNumber(num).Root())
require.Nil(t, err, num)
assert.NotZero(t, state.GetBalance(addr1).Uint64())
assert.NotZero(t, state.GetBalance(addr2).Uint64())
}
blockchain.Stop()
}

// TODO-Klaytn-FailedTest Failed test. Enable this later.
/*
// Tests that doing large reorgs works even if the state associated with the
Expand Down
6 changes: 3 additions & 3 deletions blockchain/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/klaytn/klaytn/crypto"
"github.com/klaytn/klaytn/kerrors"
"github.com/klaytn/klaytn/rlp"
"github.com/klaytn/klaytn/storage/statedb"
)

var emptyCodeHash = crypto.Keccak256(nil)
Expand Down Expand Up @@ -130,7 +129,8 @@ func newObject(db *StateDB, address common.Address, data account.Account) *state

// EncodeRLP implements rlp.Encoder.
func (c *stateObject) EncodeRLP(w io.Writer) error {
serializer := account.NewAccountSerializerWithAccount(c.account)
// State objects are RLP encoded with ExtHash preserved.
serializer := account.NewAccountSerializerExtWithAccount(c.account)
return rlp.Encode(w, serializer)
}

Expand All @@ -157,7 +157,7 @@ func (c *stateObject) touch() {
}

func (c *stateObject) openStorageTrie(hash common.ExtHash, db Database) (Trie, error) {
return db.OpenStorageTrie(hash, &statedb.TrieOpts{Prefetching: c.db.prefetching})
return db.OpenStorageTrie(hash, c.db.trieOpts)
}

func (c *stateObject) getStorageTrie(db Database) Trie {
Expand Down
8 changes: 5 additions & 3 deletions blockchain/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ var (
// StateDBs within the Klaytn protocol are used to cache stateObjects from Merkle Patricia Trie
// and mediate the operations to them.
type StateDB struct {
db Database
trie Trie
db Database
trie Trie
trieOpts *statedb.TrieOpts

snaps *snapshot.Tree
snap snapshot.Snapshot
Expand Down Expand Up @@ -125,6 +126,7 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree, opts *statedb.Trie
sdb := &StateDB{
db: db,
trie: tr,
trieOpts: opts,
snaps: snaps,
stateObjects: make(map[common.Address]*stateObject),
stateObjectsDirtyStorage: make(map[common.Address]struct{}),
Expand Down Expand Up @@ -171,7 +173,7 @@ func (self *StateDB) Error() error {
// Reset clears out all ephemeral state objects from the state db, but keeps
// the underlying state trie to avoid reloading data for the next operations.
func (self *StateDB) Reset(root common.Hash) error {
tr, err := self.db.OpenTrie(root, nil)
tr, err := self.db.OpenTrie(root, self.trieOpts)
if err != nil {
return err
}
Expand Down
61 changes: 61 additions & 0 deletions blockchain/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,67 @@ func TestStateObjects(t *testing.T) {
assert.Equal(t, 128, len(stateDB.stateObjects))
}

// Test that invalid pruning options are prohibited.
func TestPruningOptions(t *testing.T) {
opens := func(pruning bool, pruningNum bool) bool {
dbm := database.NewMemoryDBManager()
opts := &statedb.TrieOpts{}
if pruning {
dbm.WritePruningEnabled()
}
if pruningNum {
opts.PruningBlockNumber = 1
}
_, err := New(common.Hash{}, NewDatabase(dbm), nil, opts)
return err == nil
}

// DB pruning disabled & not request pruning. Normal non-pruning setup.
assert.True(t, opens(false, false))
// DB pruning disabled & request pruning must fail.
assert.False(t, opens(false, true))

// DB pruning enabled & not request pruning. Temporary trie,
// such as in debug_traceTransaction or eth_call.
assert.True(t, opens(true, false))
// DB pruning enabled & request pruning. Normal pruning setup,
// such as in InsertChain.
assert.True(t, opens(true, true))
}

// Test that the storage root (ExtHash) has correct extensions
// under different pruning options.
func TestPruningRoot(t *testing.T) {
addr := common.HexToAddress("0xaaaa")

makeState := func(db Database) common.Hash {
stateDB, _ := New(common.Hash{}, db, nil, nil)
stateDB.CreateSmartContractAccount(addr, params.CodeFormatEVM, params.Rules{})
stateDB.SetState(addr, common.HexToHash("1"), common.HexToHash("2"))
root, _ := stateDB.Commit(false)
return root
}

// When pruning is disabled, storage root is zero-extended.
dbm := database.NewMemoryDBManager()

db := NewDatabase(dbm)
root := makeState(db)
stateDB, _ := New(root, db, nil, nil)
storageRoot, _ := stateDB.GetContractStorageRoot(addr)
assert.True(t, storageRoot.IsLegacy())

// When pruning is enabled, storage root is nonzero-extended.
dbm = database.NewMemoryDBManager()
dbm.WritePruningEnabled()

db = NewDatabase(dbm)
root = makeState(db)
stateDB, _ = New(root, db, nil, nil) // Reopen trie to check the account stored in disk.
storageRoot, _ = stateDB.GetContractStorageRoot(addr)
assert.False(t, storageRoot.IsLegacy())
}

// A snapshotTest checks that reverting StateDB snapshots properly undoes all changes
// captured by the snapshot. Instances of this test with pseudorandom content are created
// by Generate.
Expand Down
2 changes: 2 additions & 0 deletions cmd/utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,8 @@ func (kCfg *KlayConfig) SetKlayConfig(ctx *cli.Context, stack *node.Node) {
common.DefaultCacheType = common.CacheType(ctx.GlobalInt(CacheTypeFlag.Name))
cfg.TrieBlockInterval = ctx.GlobalUint(TrieBlockIntervalFlag.Name)
cfg.TriesInMemory = ctx.GlobalUint64(TriesInMemoryFlag.Name)
cfg.LivePruning = ctx.GlobalBool(LivePruningFlag.Name)
cfg.LivePruningRetention = ctx.GlobalUint64(LivePruningRetentionFlag.Name)

if ctx.GlobalIsSet(CacheScaleFlag.Name) {
common.CacheScale = ctx.GlobalInt(CacheScaleFlag.Name)
Expand Down
2 changes: 2 additions & 0 deletions cmd/utils/flaggroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ var FlagGroups = []FlagGroup{
TrieMemoryCacheSizeFlag,
TrieBlockIntervalFlag,
TriesInMemoryFlag,
LivePruningFlag,
LivePruningRetentionFlag,
},
},
{
Expand Down
Loading
0