From fea54f4fc6941ee09aca79385dbe0a921b8dd2c6 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Fri, 24 Mar 2023 12:11:19 +0100 Subject: [PATCH 1/5] fix: State dump creates dumps at the correct point The dump needs to be of the state at the beginning of the last block of the previous epoch. --- nearcore/src/metrics.rs | 9 - nearcore/src/state_sync.rs | 41 +++-- tools/state-viewer/src/state_parts.rs | 239 +++++++++++++++----------- 3 files changed, 160 insertions(+), 129 deletions(-) diff --git a/nearcore/src/metrics.rs b/nearcore/src/metrics.rs index 4fd4fa910c2..3816b6b9f23 100644 --- a/nearcore/src/metrics.rs +++ b/nearcore/src/metrics.rs @@ -50,15 +50,6 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy = Lazy: ) .unwrap() }); -pub(crate) static STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_dump_obtain_part_elapsed_sec", - "Time needed to obtain a part", - &["shard_id"], - Some(exponential_buckets(0.001, 1.6, 25).unwrap()), - ) - .unwrap() -}); pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_dump_num_parts_total", diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 458bff4e27e..a4a910896c6 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -175,7 +175,7 @@ async fn state_sync_dump( .with_label_values(&[&shard_id.to_string()]) .start_timer(); - let state_part = match get_state_part( + let state_part = match obtain_and_store_state_part( &runtime, &shard_id, &sync_hash, @@ -328,7 +328,8 @@ fn set_metrics( } } -fn get_state_part( +/// Obtains and then saves the part data. +fn obtain_and_store_state_part( runtime: &Arc, shard_id: &ShardId, sync_hash: &CryptoHash, @@ -337,19 +338,13 @@ fn get_state_part( num_parts: u64, chain: &Chain, ) -> Result, Error> { - let state_part = { - let _timer = metrics::STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - runtime.obtain_state_part( - *shard_id, - &sync_hash, - &state_root, - PartId::new(part_id, num_parts), - )? - }; + let state_part = runtime.obtain_state_part( + *shard_id, + &sync_hash, + &state_root, + PartId::new(part_id, num_parts), + )?; - // Save the part data. let key = StatePartKey(*sync_hash, *shard_id, part_id).try_to_vec()?; let mut store_update = chain.store().store().store_update(); store_update.set(DBCol::StateParts, &key, &state_part); @@ -368,13 +363,17 @@ fn start_dumping( let epoch_info = runtime.get_epoch_info(&epoch_id)?; let epoch_height = epoch_info.epoch_height(); let num_shards = runtime.num_shards(&epoch_id)?; - let sync_hash_block = chain.get_block(&sync_hash)?; - if runtime.cares_about_shard(None, sync_hash_block.header().prev_hash(), shard_id, false) { - assert_eq!(num_shards, sync_hash_block.chunks().len() as u64); - let state_root = sync_hash_block.chunks()[shard_id as usize].prev_state_root(); - let state_root_node = runtime.get_state_root_node(shard_id, &sync_hash, &state_root)?; + let sync_prev_header = chain.get_block_header(&sync_hash)?; + let sync_prev_hash = sync_prev_header.prev_hash(); + let prev_sync_block = chain.get_block(&sync_prev_hash)?; + if runtime.cares_about_shard(None, prev_sync_block.header().prev_hash(), shard_id, false) { + assert_eq!(num_shards, prev_sync_block.chunks().len() as u64); + let state_root = prev_sync_block.chunks()[shard_id as usize].prev_state_root(); + // See `get_state_response_header()` for reference. + let state_root_node = + runtime.get_state_root_node(shard_id, &sync_prev_hash, &state_root)?; let num_parts = get_num_state_parts(state_root_node.memory_usage); - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch"); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch"); // Note that first the state of the state machines gets changes to // `InProgress` and it starts dumping state after a short interval. set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height)); @@ -387,7 +386,7 @@ fn start_dumping( num_parts, })) } else { - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch"); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Shard is not tracked, skip the epoch"); Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) })) } } diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index a0a15e8414a..bfbd7b99969 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -1,7 +1,6 @@ use crate::epoch_info::iterate_and_filter; -use near_chain::types::RuntimeAdapter; -use near_chain::{ChainStore, ChainStoreAccess}; -use near_epoch_manager::EpochManager; +use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode}; +use near_client::sync::state::StateSync; use near_primitives::epoch_manager::epoch_info::EpochInfo; use near_primitives::state_part::PartId; use near_primitives::syncing::get_num_state_parts; @@ -49,6 +48,12 @@ pub(crate) enum StatePartsSubCommand { #[clap(subcommand)] epoch_selection: EpochSelection, }, + /// Read State Header from the DB + ReadStateHeader { + /// Select an epoch to work on. + #[clap(subcommand)] + epoch_selection: EpochSelection, + }, } impl StatePartsSubCommand { @@ -62,6 +67,17 @@ impl StatePartsSubCommand { near_config: NearConfig, store: Store, ) { + let runtime = + Arc::new(NightshadeRuntime::from_config(home_dir, store.clone(), &near_config)); + let chain_genesis = ChainGenesis::new(&near_config.genesis); + let mut chain = Chain::new_for_view_client( + runtime.clone(), + &chain_genesis, + DoomslugThresholdMode::TwoThirds, + false, + ) + .unwrap(); + let chain_id = &near_config.genesis.config.chain_id; match self { StatePartsSubCommand::Apply { dry_run, state_root, part_id, epoch_selection } => { apply_state_parts( @@ -70,8 +86,8 @@ impl StatePartsSubCommand { part_id, dry_run, state_root, - home_dir, - near_config, + &mut chain, + chain_id, store, Location::new(root_dir, (s3_bucket, s3_region)), ); @@ -82,12 +98,15 @@ impl StatePartsSubCommand { shard_id, part_from, part_to, - home_dir, - near_config, + &chain, + chain_id, store, Location::new(root_dir, (s3_bucket, s3_region)), ); } + StatePartsSubCommand::ReadStateHeader { epoch_selection } => { + read_state_header(epoch_selection, shard_id, &chain, store) + } } } } @@ -107,15 +126,10 @@ pub(crate) enum EpochSelection { } impl EpochSelection { - fn to_epoch_id( - &self, - store: Store, - chain_store: &ChainStore, - epoch_manager: &EpochManager, - ) -> EpochId { + fn to_epoch_id(&self, store: Store, chain: &Chain) -> EpochId { match self { EpochSelection::Current => { - epoch_manager.get_epoch_id(&chain_store.head().unwrap().last_block_hash).unwrap() + chain.runtime_adapter.get_epoch_id(&chain.head().unwrap().last_block_hash).unwrap() } EpochSelection::EpochId { epoch_id } => { EpochId(CryptoHash::from_str(&epoch_id).unwrap()) @@ -132,12 +146,12 @@ impl EpochSelection { } EpochSelection::BlockHash { block_hash } => { let block_hash = CryptoHash::from_str(&block_hash).unwrap(); - epoch_manager.get_epoch_id(&block_hash).unwrap() + chain.runtime_adapter.get_epoch_id(&block_hash).unwrap() } EpochSelection::BlockHeight { block_height } => { // Fetch an epoch containing the given block height. - let block_hash = chain_store.get_block_hash_by_height(*block_height).unwrap(); - epoch_manager.get_epoch_id(&block_hash).unwrap() + let block_hash = chain.store().get_block_hash_by_height(*block_height).unwrap(); + chain.runtime_adapter.get_epoch_id(&block_hash).unwrap() } } } @@ -172,21 +186,18 @@ impl Location { } } -/// Returns block hash of the last block of an epoch preceding the given `epoch_info`. -fn get_prev_hash_of_epoch( - epoch_info: &EpochInfo, - chain_store: &ChainStore, - epoch_manager: &EpochManager, -) -> CryptoHash { - let head = chain_store.head().unwrap(); - let mut cur_block_info = epoch_manager.get_block_info(&head.last_block_hash).unwrap(); +/// Returns block hash of some block of the given `epoch_info` epoch. +fn get_any_block_hash_of_epoch(epoch_info: &EpochInfo, chain: &Chain) -> CryptoHash { + let head = chain.store().head().unwrap(); + let mut cur_block_info = chain.runtime_adapter.get_block_info(&head.last_block_hash).unwrap(); // EpochManager doesn't have an API that maps EpochId to Blocks, and this function works // around that limitation by iterating over the epochs. // This workaround is acceptable because: // 1) Extending EpochManager's API is a major change. // 2) This use case is not critical at all. loop { - let cur_epoch_info = epoch_manager.get_epoch_info(cur_block_info.epoch_id()).unwrap(); + let cur_epoch_info = + chain.runtime_adapter.get_epoch_info(cur_block_info.epoch_id()).unwrap(); let cur_epoch_height = cur_epoch_info.epoch_height(); assert!( cur_epoch_height >= epoch_info.epoch_height(), @@ -195,12 +206,12 @@ fn get_prev_hash_of_epoch( epoch_info.epoch_height() ); let epoch_first_block_info = - epoch_manager.get_block_info(cur_block_info.epoch_first_block()).unwrap(); + chain.runtime_adapter.get_block_info(cur_block_info.epoch_first_block()).unwrap(); let prev_epoch_last_block_info = - epoch_manager.get_block_info(epoch_first_block_info.prev_hash()).unwrap(); + chain.runtime_adapter.get_block_info(epoch_first_block_info.prev_hash()).unwrap(); if cur_epoch_height == epoch_info.epoch_height() { - return *prev_epoch_last_block_info.hash(); + return *cur_block_info.hash(); } cur_block_info = prev_epoch_last_block_info; @@ -213,32 +224,36 @@ fn apply_state_parts( part_id: Option, dry_run: bool, maybe_state_root: Option, - home_dir: &Path, - near_config: NearConfig, + chain: &mut Chain, + chain_id: &str, store: Store, location: Location, ) { - let runtime_adapter: Arc = - Arc::new(NightshadeRuntime::from_config(home_dir, store.clone(), &near_config)); - let epoch_manager = - EpochManager::new_from_genesis_config(store.clone(), &near_config.genesis.config) - .expect("Failed to start Epoch Manager"); - let chain_store = ChainStore::new( - store.clone(), - near_config.genesis.config.genesis_height, - near_config.client_config.save_trie_changes, - ); - - let epoch_id = epoch_selection.to_epoch_id(store, &chain_store, &epoch_manager); - let epoch = epoch_manager.get_epoch_info(&epoch_id).unwrap(); - - let (state_root, sync_prev_hash) = if let Some(state_root) = maybe_state_root { - (state_root, None) + let (state_root, epoch_height, epoch_id, sync_hash, sync_prev_hash) = if let ( + Some(state_root), + EpochSelection::EpochHeight { epoch_height }, + ) = + (maybe_state_root, &epoch_selection) + { + (state_root, *epoch_height, None, None, None) } else { - let sync_prev_hash = get_prev_hash_of_epoch(&epoch, &chain_store, &epoch_manager); - let sync_prev_block = chain_store.get_block(&sync_prev_hash).unwrap(); - - assert!(epoch_manager.is_next_block_epoch_start(&sync_prev_hash).unwrap()); + let epoch_id = epoch_selection.to_epoch_id(store, &chain); + let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); + + let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); + let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); + let sync_header = chain.get_block_header(&sync_hash).unwrap(); + // See `get_state_response_header()`. + let sync_prev_block = chain.get_block(sync_header.prev_hash()).unwrap(); + let sync_prev_hash = sync_prev_block.hash(); + tracing::info!( + target: "state-parts", + ?sync_hash, + ?sync_prev_hash, + height = sync_prev_block.header().height(), + state_roots = ?sync_prev_block.chunks().iter().map(|chunk| chunk.prev_state_root()).collect::>()); + + assert!(chain.runtime_adapter.is_next_block_epoch_start(&sync_prev_hash).unwrap()); assert!( shard_id < sync_prev_block.chunks().len() as u64, "shard_id: {}, #shards: {}", @@ -246,26 +261,21 @@ fn apply_state_parts( sync_prev_block.chunks().len() ); let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); - (state_root, Some(sync_prev_hash)) + (state_root, epoch.epoch_height(), Some(epoch_id), Some(sync_hash), Some(*sync_prev_hash)) }; - let part_storage = get_state_part_reader( - location, - &near_config.client_config.chain_id, - epoch.epoch_height(), - shard_id, - ); + let part_storage = get_state_part_reader(location, &chain_id, epoch_height, shard_id); let num_parts = part_storage.num_parts(); assert_ne!(num_parts, 0, "Too few num_parts: {}", num_parts); let part_ids = get_part_ids(part_id, part_id.map(|x| x + 1), num_parts); tracing::info!( target: "state-parts", - epoch_height = epoch.epoch_height(), - epoch_id = ?epoch_id.0, + epoch_height, shard_id, num_parts, ?sync_prev_hash, + ?sync_hash, ?part_ids, "Applying state as seen at the beginning of the specified epoch.", ); @@ -277,20 +287,29 @@ fn apply_state_parts( let part = part_storage.read(part_id, num_parts); if dry_run { - assert!(runtime_adapter.validate_state_part( + assert!(chain.runtime_adapter.validate_state_part( &state_root, PartId::new(part_id, num_parts), &part )); tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Validated a state part"); } else { - runtime_adapter + chain + .set_state_part( + shard_id, + sync_hash.unwrap(), + PartId::new(part_id, num_parts), + &part, + ) + .unwrap(); + chain + .runtime_adapter .apply_state_part( shard_id, &state_root, PartId::new(part_id, num_parts), &part, - &epoch_id, + epoch_id.as_ref().unwrap(), ) .unwrap(); tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Applied a state part"); @@ -304,28 +323,21 @@ fn dump_state_parts( shard_id: ShardId, part_from: Option, part_to: Option, - home_dir: &Path, - near_config: NearConfig, + chain: &Chain, + chain_id: &str, store: Store, location: Location, ) { - let runtime_adapter: Arc = - Arc::new(NightshadeRuntime::from_config(home_dir, store.clone(), &near_config)); - let epoch_manager = - EpochManager::new_from_genesis_config(store.clone(), &near_config.genesis.config) - .expect("Failed to start Epoch Manager"); - let chain_store = ChainStore::new( - store.clone(), - near_config.genesis.config.genesis_height, - near_config.client_config.save_trie_changes, - ); - - let epoch_id = epoch_selection.to_epoch_id(store, &chain_store, &epoch_manager); - let epoch = epoch_manager.get_epoch_info(&epoch_id).unwrap(); - let sync_prev_hash = get_prev_hash_of_epoch(&epoch, &chain_store, &epoch_manager); - let sync_prev_block = chain_store.get_block(&sync_prev_hash).unwrap(); - - assert!(epoch_manager.is_next_block_epoch_start(&sync_prev_hash).unwrap()); + let epoch_id = epoch_selection.to_epoch_id(store, &chain); + let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); + let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); + let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); + let sync_header = chain.get_block_header(&sync_hash).unwrap(); + // See `get_state_response_header()`. + let sync_prev_block = chain.get_block(sync_header.prev_hash()).unwrap(); + let sync_prev_hash = sync_prev_block.hash(); + + assert!(chain.runtime_adapter.is_next_block_epoch_start(&sync_prev_hash).unwrap()); assert!( shard_id < sync_prev_block.chunks().len() as u64, "shard_id: {}, #shards: {}", @@ -334,7 +346,7 @@ fn dump_state_parts( ); let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); let state_root_node = - runtime_adapter.get_state_root_node(shard_id, &sync_prev_hash, &state_root).unwrap(); + chain.runtime_adapter.get_state_root_node(shard_id, &sync_prev_hash, &state_root).unwrap(); let num_parts = get_num_state_parts(state_root_node.memory_usage); let part_ids = get_part_ids(part_from, part_to, num_parts); @@ -345,29 +357,22 @@ fn dump_state_parts( epoch_id = ?epoch_id.0, shard_id, num_parts, + ?sync_hash, ?sync_prev_hash, ?part_ids, + ?state_root, "Dumping state as seen at the beginning of the specified epoch.", ); - let part_storage = get_state_part_writer( - location, - &near_config.client_config.chain_id, - epoch.epoch_height(), - shard_id, - ); + let part_storage = get_state_part_writer(location, chain_id, epoch.epoch_height(), shard_id); let timer = Instant::now(); for part_id in part_ids { let timer = Instant::now(); assert!(part_id < num_parts, "part_id: {}, num_parts: {}", part_id, num_parts); - let state_part = runtime_adapter - .obtain_state_part( - shard_id, - &sync_prev_hash, - &state_root, - PartId::new(part_id, num_parts), - ) + let state_part = chain + .runtime_adapter + .obtain_state_part(shard_id, &sync_hash, &state_root, PartId::new(part_id, num_parts)) .unwrap(); part_storage.write(&state_part, part_id, num_parts); tracing::info!(target: "state-parts", part_id, part_length = state_part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Wrote a state part"); @@ -375,10 +380,28 @@ fn dump_state_parts( tracing::info!(target: "state-parts", total_elapsed_sec = timer.elapsed().as_secs_f64(), "Wrote all requested state parts"); } +/// Reads `StateHeader` stored in the DB. +fn read_state_header( + epoch_selection: EpochSelection, + shard_id: ShardId, + chain: &Chain, + store: Store, +) { + let epoch_id = epoch_selection.to_epoch_id(store, &chain); + let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); + + let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); + let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); + + let state_header = chain.store().get_state_header(shard_id, sync_hash); + tracing::info!(target: "state-parts", ?epoch_id, ?sync_hash, ?state_header); +} + fn get_part_ids(part_from: Option, part_to: Option, num_parts: u64) -> Range { part_from.unwrap_or(0)..part_to.unwrap_or(num_parts) } +// Needs to be in sync with `fn s3_location()`. fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String { format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id) } @@ -466,6 +489,7 @@ impl FileSystemStorage { tracing::info!(target: "state-parts", ?root_dir, ?prefix, ?state_parts_dir, "Ensuring the directory exists"); std::fs::create_dir_all(&state_parts_dir).unwrap(); } + tracing::info!(target: "state-parts", ?state_parts_dir, "Initialized FileSystemStorage"); Self { state_parts_dir } } @@ -485,21 +509,38 @@ impl StatePartWriter for FileSystemStorage { impl StatePartReader for FileSystemStorage { fn read(&self, part_id: u64, num_parts: u64) -> Vec { let filename = self.get_location(part_id, num_parts); + tracing::debug!(target: "state-parts", part_id, num_parts, ?filename, "Reading state part file"); let part = std::fs::read(filename).unwrap(); part } fn num_parts(&self) -> u64 { let paths = std::fs::read_dir(&self.state_parts_dir).unwrap(); - let num_parts = paths + let mut known_num_parts = None; + let num_files = paths .filter(|path| { let full_path = path.as_ref().unwrap(); tracing::debug!(target: "state-parts", ?full_path); - is_part_filename(full_path.file_name().to_str().unwrap()) + let filename = full_path.file_name().to_str().unwrap().to_string(); + if let Some(num_parts) = get_num_parts_from_filename(&filename) { + if let Some(known_num_parts) = known_num_parts { + assert_eq!(known_num_parts, num_parts); + } + known_num_parts = Some(num_parts); + } + is_part_filename(&filename) }) .collect::>>() .len(); - num_parts as u64 + if known_num_parts != Some(num_files as u64) { + // This is expected when a user saves time and downloads a few parts instead of all parts. + tracing::warn!(target: "state-parts", + dir = ?self.state_parts_dir, + ?known_num_parts, + num_files, + "Filename indicates that number of files expected doesn't match the number of files available"); + } + known_num_parts.unwrap() } } From 428234ca412e31ae0accea7a42967538f2623db1 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Fri, 24 Mar 2023 12:13:15 +0100 Subject: [PATCH 2/5] Metrics --- nearcore/src/metrics.rs | 18 ++++++++++++++++++ nearcore/src/runtime/mod.rs | 18 ++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/nearcore/src/metrics.rs b/nearcore/src/metrics.rs index 3816b6b9f23..19605ceeca1 100644 --- a/nearcore/src/metrics.rs +++ b/nearcore/src/metrics.rs @@ -82,3 +82,21 @@ pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy = Lazy::new(|| ) .unwrap() }); +pub static STATE_SYNC_APPLY_PART_DELAY: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_apply_part_delay_sec", + "Latency of applying a state part", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() +}); +pub static STATE_SYNC_OBTAIN_PART_DELAY: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_obtain_part_delay_sec", + "Latency of applying a state part", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() +}); diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 3375ae8b3ee..5080711ee06 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -1240,6 +1240,10 @@ impl RuntimeAdapter for NightshadeRuntime { %block_hash, num_parts = part_id.total) .entered(); + let _timer = metrics::STATE_SYNC_OBTAIN_PART_DELAY + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + let epoch_id = self.get_epoch_id(block_hash)?; let shard_uid = self.get_shard_uid_from_epoch_id(shard_id, &epoch_id)?; let trie = self.tries.get_view_trie_for_shard(shard_uid, *state_root); @@ -1264,11 +1268,17 @@ impl RuntimeAdapter for NightshadeRuntime { match Trie::validate_trie_nodes_for_part(state_root, part_id, trie_nodes) { Ok(_) => true, // Storage error should not happen - Err(_) => false, + Err(err) => { + tracing::error!(target: "state-parts", ?err, "State part storage error"); + false + } } } // Deserialization error means we've got the data from malicious peer - Err(_) => false, + Err(err) => { + tracing::error!(target: "state-parts", ?err, "State part deserialization error"); + false + } } } @@ -1365,6 +1375,10 @@ impl RuntimeAdapter for NightshadeRuntime { data: &[u8], epoch_id: &EpochId, ) -> Result<(), Error> { + let _timer = metrics::STATE_SYNC_APPLY_PART_DELAY + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + let part = BorshDeserialize::try_from_slice(data) .expect("Part was already validated earlier, so could never fail here"); let ApplyStatePartResult { trie_changes, flat_state_delta, contract_codes } = From 313d441939cbd341cc375f419e5e2d687b6748d4 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Fri, 24 Mar 2023 12:41:47 +0100 Subject: [PATCH 3/5] fix --- tools/state-viewer/src/state_parts.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index bfbd7b99969..e23ff293502 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -14,7 +14,6 @@ use std::fs::DirEntry; use std::ops::Range; use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::Arc; use std::time::Instant; #[derive(clap::Subcommand, Debug, Clone)] @@ -68,7 +67,7 @@ impl StatePartsSubCommand { store: Store, ) { let runtime = - Arc::new(NightshadeRuntime::from_config(home_dir, store.clone(), &near_config)); + NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); let chain_genesis = ChainGenesis::new(&near_config.genesis); let mut chain = Chain::new_for_view_client( runtime.clone(), From c28321e1a217195577f6f3b7f40d1cf54693492c Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Fri, 24 Mar 2023 17:05:35 +0100 Subject: [PATCH 4/5] fmt --- tools/state-viewer/src/state_parts.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index e23ff293502..c44943dede1 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -66,8 +66,7 @@ impl StatePartsSubCommand { near_config: NearConfig, store: Store, ) { - let runtime = - NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); + let runtime = NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); let chain_genesis = ChainGenesis::new(&near_config.genesis); let mut chain = Chain::new_for_view_client( runtime.clone(), From 76564db1ec77990884b7804770f442773c2d9366 Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Fri, 24 Mar 2023 23:34:03 +0100 Subject: [PATCH 5/5] Implemented a suggestion from @marcelo-gonzalez. Using `get_state_response_header()` instead of reimplementing it. --- chain/chain/src/chain.rs | 27 +++-- core/primitives/src/syncing.rs | 3 - nearcore/src/metrics.rs | 8 +- nearcore/src/runtime/mod.rs | 14 ++- nearcore/src/state_sync.rs | 138 +++++++++++++------------- tools/state-viewer/src/state_parts.rs | 74 ++++---------- 6 files changed, 127 insertions(+), 137 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index c9bf8a31630..85d43a95a37 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -2699,17 +2699,12 @@ impl Chain { ) } - pub fn get_state_response_header( + /// Computes ShardStateSyncResponseHeader. + pub fn compute_state_response_header( &self, shard_id: ShardId, sync_hash: CryptoHash, ) -> Result { - // Check cache - let key = StateHeaderKey(shard_id, sync_hash).try_to_vec()?; - if let Ok(Some(header)) = self.store.store().get_ser(DBCol::StateHeaders, &key) { - return Ok(header); - } - // Consistency rules: // 1. Everything prefixed with `sync_` indicates new epoch, for which we are syncing. // 1a. `sync_prev` means the last of the prev epoch. @@ -2875,6 +2870,24 @@ impl Chain { }) } }; + Ok(shard_state_header) + } + + /// Returns ShardStateSyncResponseHeader for the given epoch and shard. + /// If the header is already available in the DB, returns the cached version and doesn't recompute it. + /// If the header was computed then it also gets cached in the DB. + pub fn get_state_response_header( + &self, + shard_id: ShardId, + sync_hash: CryptoHash, + ) -> Result { + // Check cache + let key = StateHeaderKey(shard_id, sync_hash).try_to_vec()?; + if let Ok(Some(header)) = self.store.store().get_ser(DBCol::StateHeaders, &key) { + return Ok(header); + } + + let shard_state_header = self.compute_state_response_header(shard_id, sync_hash)?; // Saving the header data let mut store_update = self.store.store().store_update(); diff --git a/core/primitives/src/syncing.rs b/core/primitives/src/syncing.rs index 4dc68f3b7a6..a1450fceb8d 100644 --- a/core/primitives/src/syncing.rs +++ b/core/primitives/src/syncing.rs @@ -249,10 +249,7 @@ pub enum StateSyncDumpProgress { /// Block hash of the first block of the epoch. /// The dumped state corresponds to the state before applying this block. sync_hash: CryptoHash, - /// Root of the state being dumped. - state_root: StateRoot, /// Progress made. parts_dumped: u64, - num_parts: u64, }, } diff --git a/nearcore/src/metrics.rs b/nearcore/src/metrics.rs index 19605ceeca1..60363424afd 100644 --- a/nearcore/src/metrics.rs +++ b/nearcore/src/metrics.rs @@ -82,19 +82,19 @@ pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy = Lazy::new(|| ) .unwrap() }); -pub static STATE_SYNC_APPLY_PART_DELAY: Lazy = Lazy::new(|| { +pub static STATE_SYNC_APPLY_PART_DELAY: Lazy = Lazy::new(|| { try_create_histogram_vec( "near_state_sync_apply_part_delay_sec", - "Latency of applying a state part", + "Time needed to apply a state part", &["shard_id"], Some(exponential_buckets(0.001, 2.0, 20).unwrap()), ) .unwrap() }); -pub static STATE_SYNC_OBTAIN_PART_DELAY: Lazy = Lazy::new(|| { +pub static STATE_SYNC_OBTAIN_PART_DELAY: Lazy = Lazy::new(|| { try_create_histogram_vec( "near_state_sync_obtain_part_delay_sec", - "Latency of applying a state part", + "Time needed to obtain a part", &["shard_id"], Some(exponential_buckets(0.001, 2.0, 20).unwrap()), ) diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 7c55c565d48..26f603c56db 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -1275,14 +1275,24 @@ impl RuntimeAdapter for NightshadeRuntime { Ok(_) => true, // Storage error should not happen Err(err) => { - tracing::error!(target: "state-parts", ?err, "State part storage error"); + tracing::error!( + target: "state-parts", + ?state_root, + ?part_id, + ?err, + "State part storage error"); false } } } // Deserialization error means we've got the data from malicious peer Err(err) => { - tracing::error!(target: "state-parts", ?err, "State part deserialization error"); + tracing::error!( + target: "state-parts", + ?state_root, + ?part_id, + ?err, + "State part deserialization error"); false } } diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index a4a910896c6..0df51e42802 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -161,63 +161,75 @@ async fn state_sync_dump( epoch_id, epoch_height, sync_hash, - state_root, parts_dumped, - num_parts, })) => { - // The actual dumping of state to S3. - tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, %state_root, parts_dumped, num_parts, "Creating parts and dumping them"); - let mut res = None; - for part_id in parts_dumped..num_parts { - // Dump parts sequentially synchronously. - // TODO: How to make it possible to dump state more effectively using multiple nodes? - let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); + let state_header = chain.get_state_response_header(shard_id, sync_hash); + match state_header { + Ok(state_header) => { + let state_root = state_header.chunk_prev_state_root(); + let num_parts = + get_num_state_parts(state_header.state_root_node().memory_usage); - let state_part = match obtain_and_store_state_part( - &runtime, - &shard_id, - &sync_hash, - &state_root, - part_id, - num_parts, - &chain, - ) { - Ok(state_part) => state_part, - Err(err) => { - res = Some(err); - break; + let mut res = None; + // The actual dumping of state to S3. + tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, parts_dumped, "Creating parts and dumping them"); + for part_id in parts_dumped..num_parts { + // Dump parts sequentially synchronously. + // TODO: How to make it possible to dump state more effectively using multiple nodes? + let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + + let state_part = match obtain_and_store_state_part( + &runtime, + &shard_id, + &sync_hash, + &state_root, + part_id, + num_parts, + &chain, + ) { + Ok(state_part) => state_part, + Err(err) => { + res = Some(err); + break; + } + }; + let location = s3_location( + &config.chain_id, + epoch_height, + shard_id, + part_id, + num_parts, + ); + if let Err(err) = + put_state_part(&location, &state_part, &shard_id, &bucket).await + { + res = Some(err); + break; + } + update_progress( + &shard_id, + &epoch_id, + epoch_height, + &sync_hash, + part_id, + num_parts, + state_part.len(), + &chain, + ); + } + if let Some(err) = res { + Err(err) + } else { + Ok(Some(StateSyncDumpProgress::AllDumped { + epoch_id, + epoch_height, + num_parts: Some(num_parts), + })) } - }; - let location = - s3_location(&config.chain_id, epoch_height, shard_id, part_id, num_parts); - if let Err(err) = - put_state_part(&location, &state_part, &shard_id, &bucket).await - { - res = Some(err); - break; } - update_progress( - &shard_id, - &epoch_id, - epoch_height, - &sync_hash, - &state_root, - part_id, - num_parts, - state_part.len(), - &chain, - ); - } - if let Some(err) = res { - Err(err) - } else { - Ok(Some(StateSyncDumpProgress::AllDumped { - epoch_id, - epoch_height, - num_parts: Some(num_parts), - })) + Err(err) => Err(err), } } }; @@ -268,7 +280,6 @@ fn update_progress( epoch_id: &EpochId, epoch_height: EpochHeight, sync_hash: &CryptoHash, - state_root: &StateRoot, part_id: u64, num_parts: u64, part_len: usize, @@ -282,9 +293,7 @@ fn update_progress( epoch_id: epoch_id.clone(), epoch_height, sync_hash: *sync_hash, - state_root: *state_root, parts_dumped: part_id + 1, - num_parts, }; match chain.store().set_state_sync_dump_progress(*shard_id, Some(next_progress.clone())) { Ok(_) => { @@ -362,18 +371,13 @@ fn start_dumping( ) -> Result, Error> { let epoch_info = runtime.get_epoch_info(&epoch_id)?; let epoch_height = epoch_info.epoch_height(); - let num_shards = runtime.num_shards(&epoch_id)?; let sync_prev_header = chain.get_block_header(&sync_hash)?; - let sync_prev_hash = sync_prev_header.prev_hash(); - let prev_sync_block = chain.get_block(&sync_prev_hash)?; - if runtime.cares_about_shard(None, prev_sync_block.header().prev_hash(), shard_id, false) { - assert_eq!(num_shards, prev_sync_block.chunks().len() as u64); - let state_root = prev_sync_block.chunks()[shard_id as usize].prev_state_root(); - // See `get_state_response_header()` for reference. - let state_root_node = - runtime.get_state_root_node(shard_id, &sync_prev_hash, &state_root)?; - let num_parts = get_num_state_parts(state_root_node.memory_usage); - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch"); + let sync_prev_hash = sync_prev_header.hash(); + + let state_header = chain.get_state_response_header(shard_id, sync_hash)?; + let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage); + if runtime.cares_about_shard(None, sync_prev_hash, shard_id, false) { + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Initialize dumping state of Epoch"); // Note that first the state of the state machines gets changes to // `InProgress` and it starts dumping state after a short interval. set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height)); @@ -381,9 +385,7 @@ fn start_dumping( epoch_id, epoch_height, sync_hash, - state_root, parts_dumped: 0, - num_parts, })) } else { tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Shard is not tracked, skip the epoch"); diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index c44943dede1..6bd4afef858 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -69,7 +69,7 @@ impl StatePartsSubCommand { let runtime = NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); let chain_genesis = ChainGenesis::new(&near_config.genesis); let mut chain = Chain::new_for_view_client( - runtime.clone(), + runtime, &chain_genesis, DoomslugThresholdMode::TwoThirds, false, @@ -227,40 +227,23 @@ fn apply_state_parts( store: Store, location: Location, ) { - let (state_root, epoch_height, epoch_id, sync_hash, sync_prev_hash) = if let ( - Some(state_root), - EpochSelection::EpochHeight { epoch_height }, - ) = - (maybe_state_root, &epoch_selection) - { - (state_root, *epoch_height, None, None, None) - } else { - let epoch_id = epoch_selection.to_epoch_id(store, &chain); - let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); - - let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); - let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); - let sync_header = chain.get_block_header(&sync_hash).unwrap(); - // See `get_state_response_header()`. - let sync_prev_block = chain.get_block(sync_header.prev_hash()).unwrap(); - let sync_prev_hash = sync_prev_block.hash(); - tracing::info!( - target: "state-parts", - ?sync_hash, - ?sync_prev_hash, - height = sync_prev_block.header().height(), - state_roots = ?sync_prev_block.chunks().iter().map(|chunk| chunk.prev_state_root()).collect::>()); - - assert!(chain.runtime_adapter.is_next_block_epoch_start(&sync_prev_hash).unwrap()); - assert!( - shard_id < sync_prev_block.chunks().len() as u64, - "shard_id: {}, #shards: {}", - shard_id, - sync_prev_block.chunks().len() - ); - let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); - (state_root, epoch.epoch_height(), Some(epoch_id), Some(sync_hash), Some(*sync_prev_hash)) - }; + let (state_root, epoch_height, epoch_id, sync_hash) = + if let (Some(state_root), EpochSelection::EpochHeight { epoch_height }) = + (maybe_state_root, &epoch_selection) + { + (state_root, *epoch_height, None, None) + } else { + let epoch_id = epoch_selection.to_epoch_id(store, &chain); + let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); + + let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); + let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); + + let state_header = chain.get_state_response_header(shard_id, sync_hash).unwrap(); + let state_root = state_header.chunk_prev_state_root(); + + (state_root, epoch.epoch_height(), Some(epoch_id), Some(sync_hash)) + }; let part_storage = get_state_part_reader(location, &chain_id, epoch_height, shard_id); @@ -272,7 +255,6 @@ fn apply_state_parts( epoch_height, shard_id, num_parts, - ?sync_prev_hash, ?sync_hash, ?part_ids, "Applying state as seen at the beginning of the specified epoch.", @@ -330,23 +312,10 @@ fn dump_state_parts( let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); - let sync_header = chain.get_block_header(&sync_hash).unwrap(); - // See `get_state_response_header()`. - let sync_prev_block = chain.get_block(sync_header.prev_hash()).unwrap(); - let sync_prev_hash = sync_prev_block.hash(); - - assert!(chain.runtime_adapter.is_next_block_epoch_start(&sync_prev_hash).unwrap()); - assert!( - shard_id < sync_prev_block.chunks().len() as u64, - "shard_id: {}, #shards: {}", - shard_id, - sync_prev_block.chunks().len() - ); - let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); - let state_root_node = - chain.runtime_adapter.get_state_root_node(shard_id, &sync_prev_hash, &state_root).unwrap(); - let num_parts = get_num_state_parts(state_root_node.memory_usage); + let state_header = chain.compute_state_response_header(shard_id, sync_hash).unwrap(); + let state_root = state_header.chunk_prev_state_root(); + let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage); let part_ids = get_part_ids(part_from, part_to, num_parts); tracing::info!( @@ -356,7 +325,6 @@ fn dump_state_parts( shard_id, num_parts, ?sync_hash, - ?sync_prev_hash, ?part_ids, ?state_root, "Dumping state as seen at the beginning of the specified epoch.",