8000 [Merged by Bors] - Improve eth1 block sync by paulhauner · Pull Request #2008 · sigp/lighthouse · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[Merged by Bors] - Improve eth1 block sync #2008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions beacon_node/beacon_chain/src/eth1_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,52 @@ fn get_sync_status<T: EthSpec>(
latest_cached_block: Option<&Eth1Block>,
head_block: Option<&Eth1Block>,
genesis_time: u64,
current_slot: Slot,
current_slot: Option<Slot>,
spec: &ChainSpec,
) -> Option<Eth1SyncStatusData> {
let period = T::SlotsPerEth1VotingPeriod::to_u64();
// Since `period` is a "constant", we assume it is set sensibly.
let voting_period_start_slot = (current_slot / period) * period;
let voting_target_timestamp = {
let eth1_follow_distance_seconds = spec
.seconds_per_eth1_block
.saturating_mul(spec.eth1_follow_distance);

// The voting target timestamp needs to be special-cased when we're before
// genesis (as defined by `current_slot == None`).
//
// For the sake of this status, when prior to genesis we want to invent some voting periods
// that are *before* genesis, so that we can indicate to users that we're actually adequately
// cached for where they are in time.
let voting_target_timestamp = if let Some(current_slot) = current_slot {
let period = T::SlotsPerEth1VotingPeriod::to_u64();
let voting_period_start_slot = (current_slot / period) * period;

let period_start = slot_start_seconds::<T>(
genesis_time,
spec.milliseconds_per_slot,
voting_period_start_slot,
);
let eth1_follow_distance_seconds = spec
.seconds_per_eth1_block
.saturating_mul(spec.eth1_follow_distance);

period_start.saturating_sub(eth1_follow_distance_seconds)
} else {
// The number of seconds in an eth1 voting period.
let voting_period_duration =
T::slots_per_eth1_voting_period() as u64 * (spec.milliseconds_per_slot / 1_000);

let now = SystemTime::now().duration_since(UNIX_EPOCH).ok()?.as_secs();

// The number of seconds between now and genesis.
let seconds_till_genesis = genesis_time.saturating_sub(now);

// Determine how many voting periods are contained in distance between
// now and genesis, rounding up.
let voting_periods_past =
(seconds_till_genesis + voting_period_duration - 1) / voting_period_duration;

// Return the start time of the current voting period*.
//
// *: This voting period doesn't *actually* exist, we're just using it to
// give useful logs prior to genesis.
genesis_time
.saturating_sub(voting_periods_past * voting_period_duration)
.saturating_sub(eth1_follow_distance_seconds)
};

let latest_cached_block_number = latest_cached_block.map(|b| b.number);
Expand Down Expand Up @@ -232,7 +261,7 @@ where
pub fn sync_status(
&self,
genesis_time: u64,
current_slot: Slot,
current_slot: Option<Slot>,
spec: &ChainSpec,
) -> Option<Eth1SyncStatusData> {
get_sync_status::<E>(
Expand Down
88 changes: 56 additions & 32 deletions beacon_node/client/src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::NetworkGlobals;
use futures::prelude::*;
use parking_lot::Mutex;
use slog::{debug, error, info, warn};
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -56,6 +56,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
"peers" => peer_count_pretty(network.connected_peers()),
"wait_time" => estimated_time_pretty(Some(next_slot.as_secs() as f64)),
);
eth1_logging(&beacon_chain, &log);
sleep(slot_duration).await;
}
_ => break,
Expand Down Expand Up @@ -172,37 +173,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
);
}

// Perform some logging about the eth1 chain
if let Some(eth1_chain) = beacon_chain.eth1_chain.as_ref() {
if let Some(status) =
eth1_chain.sync_status(head_info.genesis_time, current_slot, &beacon_chain.spec)
{
debug!(
log,
"Eth1 cache sync status";
"eth1_head_block" => status.head_block_number,
"latest_cached_block_number" => status.latest_cached_block_number,
"latest_cached_timestamp" => status.latest_cached_block_timestamp,
"voting_target_timestamp" => status.voting_target_timestamp,
"ready" => status.lighthouse_is_cached_and_ready
);

if !status.lighthouse_is_cached_and_ready {
warn!(
log,
"Syncing eth1 block cache";
"target_timestamp" => status.voting_target_timestamp,
"latest_timestamp" => status.latest_cached_block_timestamp,
"msg" => "block production temporarily impaired"
);
}
} else {
error!(
log,
"Unable to determine eth1 sync status";
);
}
}
eth1_logging(&beacon_chain, &log);
}
Ok::<(), ()>(())
};
Expand All @@ -213,6 +184,59 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
Ok(())
}

fn eth1_logging<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>, log: &Logger) {
let current_slot_opt = beacon_chain.slot().ok();

if let Ok(head_info) = beacon_chain.head_info() {
// Perform some logging about the eth1 chain
if let Some(eth1_chain) = beacon_chain.eth1_chain.as_ref() {
if let Some(status) =
eth1_chain.sync_status(head_info.genesis_time, current_slot_opt, &beacon_chain.spec)
{
debug!(
log,
"Eth1 cache sync status";
"eth1_head_block" => status.head_block_number,
"latest_cached_block_number" => status.latest_cached_block_number,
"latest_cached_timestamp" => status.latest_cached_block_timestamp,
"voting_target_timestamp" => status.voting_target_timestamp,
"ready" => status.lighthouse_is_cached_and_ready
);

if !status.lighthouse_is_cached_and_ready {
let voting_target_timestamp = status.voting_target_timestamp;

let distance = status
.latest_cached_block_timestamp
.map(|latest| {
voting_target_timestamp.saturating_sub(latest)
/ beacon_chain.spec.seconds_per_eth1_block
})
.map(|distance| distance.to_string())
.unwrap_or_else(|| "initializing deposits".to_string());

warn!(
log,
"Syncing eth1 block cache";
"msg" => "sync can take longer when using remote eth1 nodes",
"est_blocks_remaining" => distance,
);
}
} else {
error!(
log,
"Unable to determine eth1 sync status";
);
}
}
} else {
error!(
log,
"Unable to get head info";
);
}
}

/// Returns the peer count, returning something helpful if it's `usize::max_value` (effectively a
/// `None` value).
fn peer_count_pretty(peer_count: usize) -> String {
Expand Down
121 changes: 61 additions & 60 deletions beacon_node/eth1/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use crate::{
deposit_cache::Error as DepositCacheError,
http::{
get_block, get_block_number, get_chain_id, get_deposit_logs_in_range, get_network_id,
BlockQuery, Eth1Id, Log,
BlockQuery, Eth1Id,
},
inner::{DepositUpdater, Inner},
};
use fallback::{Fallback, FallbackError};
use futures::{future::TryFutureExt, stream, stream::TryStreamExt, StreamExt};
use futures::{future::TryFutureExt, StreamExt};
use parking_lot::{RwLock, RwLockReadGuard};
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, trace, warn, Logger};
Expand All @@ -34,7 +34,7 @@ const BLOCK_NUMBER_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getBlockByNumber call.
const GET_BLOCK_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getLogs to read the deposit contract logs.
const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = 60_000;

const WARNING_MSG: &str = "BLOCK PROPOSALS WILL FAIL WITHOUT VALID, SYNCED ETH1 CONNECTION";

Expand Down Expand Up @@ -384,8 +384,8 @@ impl Default for Config {
block_cache_truncation: Some(4_096),
auto_update_interval_millis: 7_000,
blocks_per_log_query: 1_000,
max_log_requests_per_update: None,
max_blocks_per_update: None,
max_log_requests_per_update: Some(100),
max_blocks_per_update: Some(8_192),
}
}
}
Expand Down Expand Up @@ -817,38 +817, 2E87 40 @@ impl Service {
Vec::new()
};

let mut logs_imported: usize = 0;
let deposit_contract_address_ref: &str = &deposit_contract_address;
let logs: Vec<(Range<u64>, Vec<Log>)> =
stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async {
match chunks.next() {
Some(chunk) => {
let chunk_ref = &chunk;
endpoints
.first_success(|e| async move {
get_deposit_logs_in_range(
e,
deposit_contract_address_ref,
chunk_ref.clone(),
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
)
.await
F438 .map_err(SingleEndpointError::GetDepositLogsFailed)
})
.await
.map(|logs| Some(((chunk, logs), chunks)))
}
None => Ok(None),
}
})
.try_collect()
.await
.map_err(Error::FallbackError)?;
for block_range in block_number_chunks.into_iter() {
if block_range.is_empty() {
debug!(
self.log,
"No new blocks to scan for logs";
);
continue;
}

/*
* Step 1. Download logs.
*/
let block_range_ref = &block_range;
let logs = endpoints
.first_success(|e| async move {
get_deposit_logs_in_range(
e,
&deposit_contract_address_ref,
block_range_ref.clone(),
Duration::from_millis(GET_DEPOSIT_LOG_TIMEOUT_MILLIS),
)
.await
.map_err(SingleEndpointError::GetDepositLogsFailed)
})
.await
.map_err(Error::FallbackError)?;

let mut logs_imported = 0;
for (block_range, log_chunk) in logs.iter() {
/*
* Step 2. Import logs to cache.
*/
let mut cache = self.deposits().write();
log_chunk
.iter()
logs.iter()
.map(|raw_log| {
raw_log.to_deposit_log(self.inner.spec()).map_err(|error| {
Error::FailedToParseDepositLog {
Expand Down Expand Up @@ -881,6 +883,12 @@ impl Service {
// node to choose an invalid genesis state or propose an invalid block.
.collect::<Result<_, _>>()?;

debug!(
self.log,
"Imported deposit logs chunk";
"logs" => logs.len(),
);

cache.last_processed_block = Some(block_range.end.saturating_sub(1));

metrics::set_gauge(&metrics::DEPOSIT_CACHE_LEN, cache.cache.len() as i64);
Expand Down Expand Up @@ -976,8 +984,9 @@ impl Service {
} else {
Vec::new()
};
// Download the range of blocks and sequentially import them into the cache.
// Last processed block in deposit cache

// This value is used to prevent the block cache from importing a block that is not yet in
// the deposit cache.
let latest_in_cache = self
.inner
.deposit_cache
Expand All @@ -990,34 +999,26 @@ impl Service {
.filter(|x| *x <= latest_in_cache)
.take(max_blocks_per_update)
.collect::<Vec<_>>();

debug!(
self.log,
"Downloading eth1 blocks";
"first" => ?required_block_numbers.first(),
"last" => ?required_block_numbers.last(),
);

// Produce a stream from the list of required block numbers and return a future that
// consumes the it.

let eth1_blocks: Vec<Eth1Block> = stream::try_unfold(
required_block_numbers.into_iter(),
|mut block_numbers| async {
match block_numbers.next() {
Some(block_number) => {
match endpoints
.first_success(|e| async move {
download_eth1_block(e, self.inner.clone(), Some(block_number)).await
})
.await
{
Ok(eth1_block) => Ok(Some((eth1_block, block_numbers))),
Err(e) => Err(e),
}
}
None => Ok(None),
}
},
)
.try_collect()
.await
.map_err(Error::FallbackError)?;

let mut blocks_imported = 0;
for eth1_block in eth1_blocks {
for block_number in required_block_numbers {
let eth1_block = endpoints
.first_success(|e| async move {
download_eth1_block(e, self.inner.clone(), Some(block_number)).await
})
.await
.map_err(Error::FallbackError)?;

self.inner
.block_cache
.write()
Expand Down
6 changes: 2 additions & 4 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2075,9 +2075,7 @@ pub fn serve<T: BeaconChainTypes>(
let head_info = chain
.head_info()
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_slot = chain
.slot()
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_slot_opt = chain.slot().ok();

chain
.eth1_chain
Expand All @@ -2088,7 +2086,7 @@ pub fn serve<T: BeaconChainTypes>(
)
})
.and_then(|eth1| {
eth1.sync_status(head_info.genesis_time, current_slot, &chain.spec)
eth1.sync_status(head_info.genesis_time, current_slot_opt, &chain.spec)
.ok_or_else(|| {
warp_utils::reject::custom_server_error(
"Unable to determine Eth1 sync status".to_string(),
Expand Down
Loading
0