From 42b2668868ce347fff04095c0d7be53b43dd1526 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 7 Feb 2024 04:55:36 +0000 Subject: [PATCH 1/8] blockstore: atomize slot clearing, relax parent slot meta check clear_unconfirmed_slot can leave blockstore in an irrecoverable state if it panics in the middle. write batch this function, so that any errors can be recovered after restart. additionally relax the constraint that the parent slot meta must exist, as it could have been cleaned up if outdated. --- ledger/src/blockstore.rs | 29 ++------- ledger/src/blockstore/blockstore_purge.rs | 72 +++++++++++++++++++++++ ledger/src/blockstore_db.rs | 2 + 3 files changed, 78 insertions(+), 25 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c01b1806a8fa27..c038818877d6a9 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1164,35 +1164,14 @@ impl Blockstore { /// family. pub fn clear_unconfirmed_slot(&self, slot: Slot) { let _lock = self.insert_shreds_lock.lock().unwrap(); - if let Some(mut slot_meta) = self + if let Some(slot_meta) = self .meta(slot) .expect("Couldn't fetch from SlotMeta column family") { - // Clear all slot related information - self.run_purge(slot, slot, PurgeType::Exact) + // Clear all slot related information, and cleanup slot meta by removing + // `slot` from parents `next_slots`, but retaining `slot`'s `next_slots`. + self.run_purge_and_cleanup_slot_meta(slot, slot_meta) .expect("Purge database operations failed"); - - // Clear this slot as a next slot from parent - if let Some(parent_slot) = slot_meta.parent_slot { - let mut parent_slot_meta = self - .meta(parent_slot) - .expect("Couldn't fetch from SlotMeta column family") - .expect("Unconfirmed slot should have had parent slot set"); - // .retain() is a linear scan; however, next_slots should - // only contain several elements so this isn't so bad - parent_slot_meta - .next_slots - .retain(|&next_slot| next_slot != slot); - self.meta_cf - .put(parent_slot, &parent_slot_meta) - .expect("Couldn't insert into SlotMeta column family"); - } - // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` - // field. - slot_meta.clear_unconfirmed_slot(); - self.meta_cf - .put(slot, &slot_meta) - .expect("Couldn't insert into SlotMeta column family"); } else { error!( "clear_unconfirmed_slot() called on slot {} with no SlotMeta", diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 4b599a353d569c..33e748dff2d2df 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -135,6 +135,7 @@ impl Blockstore { } } + #[cfg(test)] pub(crate) fn run_purge( &self, from_slot: Slot, @@ -155,6 +156,48 @@ impl Blockstore { to_slot: Slot, purge_type: PurgeType, purge_stats: &mut PurgeStats, + ) -> Result { + self.run_purge_with_stats_and_maybe_cleanup_slot_meta( + from_slot, + to_slot, + purge_type, + purge_stats, + None, + ) + } + + pub(crate) fn run_purge_and_cleanup_slot_meta( + &self, + slot: Slot, + slot_meta: SlotMeta, + ) -> Result { + self.run_purge_with_stats_and_maybe_cleanup_slot_meta( + slot, + slot, + PurgeType::Exact, + &mut PurgeStats::default(), + Some(slot_meta), + ) + } + + /// A helper function to `purge_slots` that executes the ledger clean up. + /// The cleanup applies to \[`from_slot`, `to_slot`\]. + /// + /// When `from_slot` is 0, any sst-file with a key-range completely older + /// than `to_slot` will also be deleted. + /// + /// If `slot_meta` is specified for some `child_slot`, we require that + /// `child_slot == from_slot == to_slot` - we are purging only one slot. + /// In this case along with the purge we remove `child_slot` from its + /// `parent_slot_meta.next_slots` as well as reinsert an orphaned `slot_meta` + /// for `child_slot` that only retains the `next_slots` value. + pub(crate) fn run_purge_with_stats_and_maybe_cleanup_slot_meta( + &self, + from_slot: Slot, + to_slot: Slot, + purge_type: PurgeType, + purge_stats: &mut PurgeStats, + slot_meta: Option, ) -> Result { let mut write_batch = self .db @@ -239,6 +282,35 @@ impl Blockstore { } delete_range_timer.stop(); + if let Some(mut slot_meta) = slot_meta { + let child_slot = slot_meta.slot; + if child_slot != from_slot || child_slot != to_slot { + error!("Slot meta parent cleanup was requested for {}, but a range was specified {} {}", child_slot, from_slot, to_slot); + return Err(BlockstoreError::InvalidRangeForSlotMetaCleanup); + } + + if let Some(parent_slot) = slot_meta.parent_slot { + let parent_slot_meta = self.meta(parent_slot)?; + if let Some(mut parent_slot_meta) = parent_slot_meta { + // .retain() is a linear scan; however, next_slots should + // only contain several elements so this isn't so bad + parent_slot_meta + .next_slots + .retain(|&next_slot| next_slot != child_slot); + write_batch.put::(parent_slot, &parent_slot_meta)?; + } else { + error!("Parent slot meta {} for child {} is missing. In the absence of a duplicate block this + likely means a cluster restart was performed and your node contains invalid shreds generated + with the wrong shred version, whose ancestors have been cleaned up. + Falling back to duplicate block handling to remedy the situation", parent_slot, child_slot); + } + } + + // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` field. + slot_meta.clear_unconfirmed_slot(); + write_batch.put::(child_slot, &slot_meta)?; + } + let mut write_timer = Measure::start("write_batch"); if let Err(e) = self.db.write(write_batch) { error!( diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 18ba491ea34bd1..00ea80ac05a2a5 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -151,6 +151,8 @@ pub enum BlockstoreError { MissingTransactionMetadata, #[error("transaction-index overflow")] TransactionIndexOverflow, + #[error("invalid purge range for slot meta cleanup")] + InvalidRangeForSlotMetaCleanup, } pub type Result = std::result::Result; From 8c142ba7aa25b2bffc409ad8dd33e393e9c1ccd9 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 21 Feb 2024 03:36:41 +0000 Subject: [PATCH 2/8] pr feedback: use PurgeType, don't pass slot_meta --- ledger/src/blockstore.rs | 18 ++--- ledger/src/blockstore/blockstore_purge.rs | 85 ++++++++--------------- 2 files changed, 37 insertions(+), 66 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c038818877d6a9..f6695a82c3d500 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1164,19 +1164,15 @@ impl Blockstore { /// family. pub fn clear_unconfirmed_slot(&self, slot: Slot) { let _lock = self.insert_shreds_lock.lock().unwrap(); - if let Some(slot_meta) = self - .meta(slot) - .expect("Couldn't fetch from SlotMeta column family") - { - // Clear all slot related information, and cleanup slot meta by removing - // `slot` from parents `next_slots`, but retaining `slot`'s `next_slots`. - self.run_purge_and_cleanup_slot_meta(slot, slot_meta) - .expect("Purge database operations failed"); - } else { - error!( + // Clear all slot related information, and cleanup slot meta by removing + // `slot` from parents `next_slots`, but retaining `slot`'s `next_slots`. + match self.run_purge(slot, slot, PurgeType::ExactAndCleanupChaining) { + Ok(_) => {} + Err(BlockstoreError::SlotUnavailable) => error!( "clear_unconfirmed_slot() called on slot {} with no SlotMeta", slot - ); + ), + Err(e) => panic!("Purge database operations failed {}", e), } } diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 33e748dff2d2df..ffe8db935d6fe7 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -12,7 +12,7 @@ pub struct PurgeStats { delete_files_in_range: u64, } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq, Eq)] /// Controls how `blockstore::purge_slots` purges the data. pub enum PurgeType { /// A slower but more accurate way to purge slots by also ensuring higher @@ -21,6 +21,10 @@ pub enum PurgeType { /// The fastest purge mode that relies on the slot-id based TTL /// compaction filter to do the cleanup. CompactionFilter, + /// In this case along with an `Exact` purge we orphan the specified slot by + /// removing it from `parent_slot_meta.next_slots` and reinsert an empty `slot_meta` + /// for the orphan that only retains the `next_slots` value. + ExactAndCleanupChaining, } impl Blockstore { @@ -135,7 +139,6 @@ impl Blockstore { } } - #[cfg(test)] pub(crate) fn run_purge( &self, from_slot: Slot, @@ -150,55 +153,31 @@ impl Blockstore { /// /// When `from_slot` is 0, any sst-file with a key-range completely older /// than `to_slot` will also be deleted. - pub(crate) fn run_purge_with_stats( - &self, - from_slot: Slot, - to_slot: Slot, - purge_type: PurgeType, - purge_stats: &mut PurgeStats, - ) -> Result { - self.run_purge_with_stats_and_maybe_cleanup_slot_meta( - from_slot, - to_slot, - purge_type, - purge_stats, - None, - ) - } - - pub(crate) fn run_purge_and_cleanup_slot_meta( - &self, - slot: Slot, - slot_meta: SlotMeta, - ) -> Result { - self.run_purge_with_stats_and_maybe_cleanup_slot_meta( - slot, - slot, - PurgeType::Exact, - &mut PurgeStats::default(), - Some(slot_meta), - ) - } - - /// A helper function to `purge_slots` that executes the ledger clean up. - /// The cleanup applies to \[`from_slot`, `to_slot`\]. /// - /// When `from_slot` is 0, any sst-file with a key-range completely older - /// than `to_slot` will also be deleted. - /// - /// If `slot_meta` is specified for some `child_slot`, we require that - /// `child_slot == from_slot == to_slot` - we are purging only one slot. - /// In this case along with the purge we remove `child_slot` from its - /// `parent_slot_meta.next_slots` as well as reinsert an orphaned `slot_meta` - /// for `child_slot` that only retains the `next_slots` value. - pub(crate) fn run_purge_with_stats_and_maybe_cleanup_slot_meta( + /// If the `purge_type` is `PurgeType::ExactAndCleanupChaining` we require + /// that `from_slot == to_slot` - we are purging only one slot. + pub(crate) fn run_purge_with_stats( &self, from_slot: Slot, to_slot: Slot, purge_type: PurgeType, purge_stats: &mut PurgeStats, - slot_meta: Option, ) -> Result { + let slot_meta = if purge_type == PurgeType::ExactAndCleanupChaining { + if from_slot != to_slot { + error!( + "Chaining cleanup was requested but a range was specified {} {}", + from_slot, to_slot + ); + return Err(BlockstoreError::InvalidRangeForSlotMetaCleanup); + } + let Some(slot_meta) = self.meta(from_slot)? else { + return Err(BlockstoreError::SlotUnavailable); + }; + Some(slot_meta) + } else { + None + }; let mut write_batch = self .db .batch() @@ -269,7 +248,7 @@ impl Blockstore { .delete_range_cf::(&mut write_batch, from_slot, to_slot) .is_ok(); match purge_type { - PurgeType::Exact => { + PurgeType::Exact | PurgeType::ExactAndCleanupChaining => { self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?; } PurgeType::CompactionFilter => { @@ -282,13 +261,9 @@ impl Blockstore { } delete_range_timer.stop(); - if let Some(mut slot_meta) = slot_meta { - let child_slot = slot_meta.slot; - if child_slot != from_slot || child_slot != to_slot { - error!("Slot meta parent cleanup was requested for {}, but a range was specified {} {}", child_slot, from_slot, to_slot); - return Err(BlockstoreError::InvalidRangeForSlotMetaCleanup); - } - + if purge_type == PurgeType::ExactAndCleanupChaining { + let mut slot_meta = slot_meta.expect("Slot meta should be present by this point"); + let slot = from_slot; // also equal to to_slot if let Some(parent_slot) = slot_meta.parent_slot { let parent_slot_meta = self.meta(parent_slot)?; if let Some(mut parent_slot_meta) = parent_slot_meta { @@ -296,19 +271,19 @@ impl Blockstore { // only contain several elements so this isn't so bad parent_slot_meta .next_slots - .retain(|&next_slot| next_slot != child_slot); + .retain(|&next_slot| next_slot != slot); write_batch.put::(parent_slot, &parent_slot_meta)?; } else { error!("Parent slot meta {} for child {} is missing. In the absence of a duplicate block this likely means a cluster restart was performed and your node contains invalid shreds generated with the wrong shred version, whose ancestors have been cleaned up. - Falling back to duplicate block handling to remedy the situation", parent_slot, child_slot); + Falling back to duplicate block handling to remedy the situation", parent_slot, slot); } } // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` field. slot_meta.clear_unconfirmed_slot(); - write_batch.put::(child_slot, &slot_meta)?; + write_batch.put::(slot, &slot_meta)?; } let mut write_timer = Measure::start("write_batch"); From d0e8b7f372de99aacc8a53d2f15ae9c65ab3fe29 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 21 Feb 2024 16:28:43 +0000 Subject: [PATCH 3/8] pr feedback: add unit test --- ledger/src/blockstore/blockstore_purge.rs | 67 +++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index ffe8db935d6fe7..3c3bb5921e5e43 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -1150,4 +1150,71 @@ pub mod tests { } assert_eq!(count, 1); } + + #[test] + fn test_purge_exact_and_cleanup_chaining_range() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + assert!(matches!( + blockstore + .run_purge(4, 6, PurgeType::ExactAndCleanupChaining) + .unwrap_err(), + BlockstoreError::InvalidRangeForSlotMetaCleanup + )); + } + + #[test] + fn test_purge_exact_and_cleanup_chaining_missing_slot_meta() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + assert!(matches!( + blockstore + .run_purge(11, 11, PurgeType::ExactAndCleanupChaining) + .unwrap_err(), + BlockstoreError::SlotUnavailable + )); + } + + #[test] + fn test_purge_exact_and_cleanup_chaining() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + let (shreds, _) = make_many_slot_entries(0, 10, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let (slot_11, _) = make_slot_entries(11, 4, 5, true); + blockstore.insert_shreds(slot_11, None, false).unwrap(); + let (slot_12, _) = make_slot_entries(12, 5, 5, true); + blockstore.insert_shreds(slot_12, None, false).unwrap(); + + blockstore + .run_purge(5, 5, PurgeType::ExactAndCleanupChaining) + .unwrap(); + + let slot_meta = blockstore.meta(5).unwrap().unwrap(); + let expected_slot_meta = SlotMeta { + slot: 5, + // Only the next_slots should be preserved + next_slots: vec![6, 12], + ..SlotMeta::default() + }; + assert_eq!(slot_meta, expected_slot_meta); + + let parent_slot_meta = blockstore.meta(4).unwrap().unwrap(); + assert_eq!(parent_slot_meta.next_slots, vec![11]); + + let child_slot_meta = blockstore.meta(6).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + + let child_slot_meta = blockstore.meta(12).unwrap().unwrap(); + assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); + } } From 68a02a4b42793019541d427d3de154bbfa974669 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Sun, 25 Feb 2024 23:15:20 +0000 Subject: [PATCH 4/8] pr feedback: refactor into separate function --- ledger/src/blockstore.rs | 4 +- ledger/src/blockstore/blockstore_purge.rs | 262 +++++++++++----------- ledger/src/blockstore_db.rs | 2 - 3 files changed, 128 insertions(+), 140 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f6695a82c3d500..7b7ac8b5e53326 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1155,7 +1155,7 @@ impl Blockstore { } /// Range-delete all entries which prefix matches the specified `slot`, - /// remove `slot` its' parents SlotMeta next_slots list, and + /// remove `slot`'s parent's SlotMeta next_slots list, and /// clear `slot`'s SlotMeta (except for next_slots). /// /// This function currently requires `insert_shreds_lock`, as both @@ -1166,7 +1166,7 @@ impl Blockstore { let _lock = self.insert_shreds_lock.lock().unwrap(); // Clear all slot related information, and cleanup slot meta by removing // `slot` from parents `next_slots`, but retaining `slot`'s `next_slots`. - match self.run_purge(slot, slot, PurgeType::ExactAndCleanupChaining) { + match self.purge_slot_cleanup_chaining(slot) { Ok(_) => {} Err(BlockstoreError::SlotUnavailable) => error!( "clear_unconfirmed_slot() called on slot {} with no SlotMeta", diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 3c3bb5921e5e43..5eae3f2589c9ff 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -12,7 +12,7 @@ pub struct PurgeStats { delete_files_in_range: u64, } -#[derive(Clone, Copy, PartialEq, Eq)] +#[derive(Clone, Copy)] /// Controls how `blockstore::purge_slots` purges the data. pub enum PurgeType { /// A slower but more accurate way to purge slots by also ensuring higher @@ -21,10 +21,6 @@ pub enum PurgeType { /// The fastest purge mode that relies on the slot-id based TTL /// compaction filter to do the cleanup. CompactionFilter, - /// In this case along with an `Exact` purge we orphan the specified slot by - /// removing it from `parent_slot_meta.next_slots` and reinsert an empty `slot_meta` - /// for the orphan that only retains the `next_slots` value. - ExactAndCleanupChaining, } impl Blockstore { @@ -139,6 +135,7 @@ impl Blockstore { } } + #[cfg(test)] pub(crate) fn run_purge( &self, from_slot: Slot, @@ -148,14 +145,64 @@ impl Blockstore { self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default()) } + /// Purges all columns relating to `slot`. + /// + /// Additionally we cleanup the parent of `slot`, by clearing `slot` from + /// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot` + /// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is + /// replayable upon repair of `slot`. + pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result { + let Some(mut slot_meta) = self.meta(slot)? else { + return Err(BlockstoreError::SlotUnavailable); + }; + let mut write_batch = self + .db + .batch() + .expect("Database Error: Failed to get write batch"); + + let columns_purged = self.purge_range(&mut write_batch, slot, slot); + self.purge_special_columns_exact(&mut write_batch, slot, slot)?; + + if let Some(parent_slot) = slot_meta.parent_slot { + let parent_slot_meta = self.meta(parent_slot)?; + if let Some(mut parent_slot_meta) = parent_slot_meta { + // .retain() is a linear scan; however, next_slots should + // only contain several elements so this isn't so bad + parent_slot_meta + .next_slots + .retain(|&next_slot| next_slot != slot); + write_batch.put::(parent_slot, &parent_slot_meta)?; + } else { + error!("Parent slot meta {} for child {} is missing. In the absence of a duplicate block this + likely means a cluster restart was performed and your node contains invalid shreds generated + with the wrong shred version, whose ancestors have been cleaned up. + Falling back to duplicate block handling to remedy the situation", parent_slot, slot); + } + } + + // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` field. + slot_meta.clear_unconfirmed_slot(); + write_batch.put::(slot, &slot_meta)?; + + if let Err(e) = self.db.write(write_batch) { + error!( + "Error: {:?} while submitting write batch for slot {:?} retrying...", + e, slot + ); + return Err(e); + } + Ok(columns_purged) + } + /// A helper function to `purge_slots` that executes the ledger clean up. /// The cleanup applies to \[`from_slot`, `to_slot`\]. /// /// When `from_slot` is 0, any sst-file with a key-range completely older /// than `to_slot` will also be deleted. /// - /// If the `purge_type` is `PurgeType::ExactAndCleanupChaining` we require - /// that `from_slot == to_slot` - we are purging only one slot. + /// Note: slots > `to_slot` that chained to a purged slot are not properly + /// cleaned up. This function is not intended to be used if such slots need + /// to be replayed. pub(crate) fn run_purge_with_stats( &self, from_slot: Slot, @@ -163,92 +210,14 @@ impl Blockstore { purge_type: PurgeType, purge_stats: &mut PurgeStats, ) -> Result { - let slot_meta = if purge_type == PurgeType::ExactAndCleanupChaining { - if from_slot != to_slot { - error!( - "Chaining cleanup was requested but a range was specified {} {}", - from_slot, to_slot - ); - return Err(BlockstoreError::InvalidRangeForSlotMetaCleanup); - } - let Some(slot_meta) = self.meta(from_slot)? else { - return Err(BlockstoreError::SlotUnavailable); - }; - Some(slot_meta) - } else { - None - }; let mut write_batch = self .db .batch() .expect("Database Error: Failed to get write batch"); let mut delete_range_timer = Measure::start("delete_range"); - let columns_purged = self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok() - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .is_ok(); + let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot); match purge_type { - PurgeType::Exact | PurgeType::ExactAndCleanupChaining => { + PurgeType::Exact => { self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?; } PurgeType::CompactionFilter => { @@ -261,31 +230,6 @@ impl Blockstore { } delete_range_timer.stop(); - if purge_type == PurgeType::ExactAndCleanupChaining { - let mut slot_meta = slot_meta.expect("Slot meta should be present by this point"); - let slot = from_slot; // also equal to to_slot - if let Some(parent_slot) = slot_meta.parent_slot { - let parent_slot_meta = self.meta(parent_slot)?; - if let Some(mut parent_slot_meta) = parent_slot_meta { - // .retain() is a linear scan; however, next_slots should - // only contain several elements so this isn't so bad - parent_slot_meta - .next_slots - .retain(|&next_slot| next_slot != slot); - write_batch.put::(parent_slot, &parent_slot_meta)?; - } else { - error!("Parent slot meta {} for child {} is missing. In the absence of a duplicate block this - likely means a cluster restart was performed and your node contains invalid shreds generated - with the wrong shred version, whose ancestors have been cleaned up. - Falling back to duplicate block handling to remedy the situation", parent_slot, slot); - } - } - - // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` field. - slot_meta.clear_unconfirmed_slot(); - write_batch.put::(slot, &slot_meta)?; - } - let mut write_timer = Measure::start("write_batch"); if let Err(e) = self.db.write(write_batch) { error!( @@ -320,6 +264,72 @@ impl Blockstore { Ok(columns_purged) } + fn purge_range(&self, write_batch: &mut WriteBatch, from_slot: Slot, to_slot: Slot) -> bool { + self.db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, from_slot, to_slot) + .is_ok() + } + fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool { self.db .delete_file_in_range_cf::(from_slot, to_slot) @@ -1152,23 +1162,7 @@ pub mod tests { } #[test] - fn test_purge_exact_and_cleanup_chaining_range() { - let ledger_path = get_tmp_ledger_path_auto_delete!(); - let blockstore = Blockstore::open(ledger_path.path()).unwrap(); - - let (shreds, _) = make_many_slot_entries(0, 10, 5); - blockstore.insert_shreds(shreds, None, false).unwrap(); - - assert!(matches!( - blockstore - .run_purge(4, 6, PurgeType::ExactAndCleanupChaining) - .unwrap_err(), - BlockstoreError::InvalidRangeForSlotMetaCleanup - )); - } - - #[test] - fn test_purge_exact_and_cleanup_chaining_missing_slot_meta() { + fn test_purge_slot_cleanup_chaining_missing_slot_meta() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -1176,15 +1170,13 @@ pub mod tests { blockstore.insert_shreds(shreds, None, false).unwrap(); assert!(matches!( - blockstore - .run_purge(11, 11, PurgeType::ExactAndCleanupChaining) - .unwrap_err(), + blockstore.purge_slot_cleanup_chaining(11).unwrap_err(), BlockstoreError::SlotUnavailable )); } #[test] - fn test_purge_exact_and_cleanup_chaining() { + fn test_purge_slot_cleanup_chaining() { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(ledger_path.path()).unwrap(); @@ -1195,9 +1187,7 @@ pub mod tests { let (slot_12, _) = make_slot_entries(12, 5, 5, true); blockstore.insert_shreds(slot_12, None, false).unwrap(); - blockstore - .run_purge(5, 5, PurgeType::ExactAndCleanupChaining) - .unwrap(); + blockstore.purge_slot_cleanup_chaining(5).unwrap(); let slot_meta = blockstore.meta(5).unwrap().unwrap(); let expected_slot_meta = SlotMeta { diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 00ea80ac05a2a5..18ba491ea34bd1 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -151,8 +151,6 @@ pub enum BlockstoreError { MissingTransactionMetadata, #[error("transaction-index overflow")] TransactionIndexOverflow, - #[error("invalid purge range for slot meta cleanup")] - InvalidRangeForSlotMetaCleanup, } pub type Result = std::result::Result; From 69431c6c8edae0f39b75122099c11e8405059bfd Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 27 Feb 2024 16:58:54 +0000 Subject: [PATCH 5/8] pr feedback: add special columns to helper, err msg, comments --- ledger/src/blockstore/blockstore_purge.rs | 54 ++++++++++++++--------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 5eae3f2589c9ff..4c57eca1cc4cb2 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -160,8 +160,7 @@ impl Blockstore { .batch() .expect("Database Error: Failed to get write batch"); - let columns_purged = self.purge_range(&mut write_batch, slot, slot); - self.purge_special_columns_exact(&mut write_batch, slot, slot)?; + let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?; if let Some(parent_slot) = slot_meta.parent_slot { let parent_slot_meta = self.meta(parent_slot)?; @@ -173,10 +172,11 @@ impl Blockstore { .retain(|&next_slot| next_slot != slot); write_batch.put::(parent_slot, &parent_slot_meta)?; } else { - error!("Parent slot meta {} for child {} is missing. In the absence of a duplicate block this - likely means a cluster restart was performed and your node contains invalid shreds generated - with the wrong shred version, whose ancestors have been cleaned up. - Falling back to duplicate block handling to remedy the situation", parent_slot, slot); + error!( + "Parent slot meta {} for child {} is missing or cleaned up. + Falling back to orphan repair to remedy the situation", + parent_slot, slot + ); } } @@ -214,20 +214,9 @@ impl Blockstore { .db .batch() .expect("Database Error: Failed to get write batch"); + let mut delete_range_timer = Measure::start("delete_range"); - let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot); - match purge_type { - PurgeType::Exact => { - self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?; - } - PurgeType::CompactionFilter => { - // No explicit action is required here because this purge type completely and - // indefinitely relies on the proper working of compaction filter for those - // special column families, never toggling the primary index from the current - // one. Overall, this enables well uniformly distributed writes, resulting - // in no spiky periodic huge delete_range for them. - } - } + let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?; delete_range_timer.stop(); let mut write_timer = Measure::start("write_batch"); @@ -264,8 +253,15 @@ impl Blockstore { Ok(columns_purged) } - fn purge_range(&self, write_batch: &mut WriteBatch, from_slot: Slot, to_slot: Slot) -> bool { - self.db + fn purge_range( + &self, + write_batch: &mut WriteBatch, + from_slot: Slot, + to_slot: Slot, + purge_type: PurgeType, + ) -> Result { + let columns_purged = self + .db .delete_range_cf::(write_batch, from_slot, to_slot) .is_ok() & self @@ -327,7 +323,21 @@ impl Blockstore { & self .db .delete_range_cf::(write_batch, from_slot, to_slot) - .is_ok() + .is_ok(); + + match purge_type { + PurgeType::Exact => { + self.purge_special_columns_exact(write_batch, from_slot, to_slot)?; + } + PurgeType::CompactionFilter => { + // No explicit action is required here because this purge type completely and + // indefinitely relies on the proper working of compaction filter for those + // special column families, never toggling the primary index from the current + // one. Overall, this enables well uniformly distributed writes, resulting + // in no spiky periodic huge delete_range for them. + } + } + Ok(columns_purged) } fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool { From a6bcc31cf73f694df6b0c11367c60d2b59c2a4e3 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 27 Feb 2024 21:58:23 +0000 Subject: [PATCH 6/8] pr feedback: reword comments and write batch error message --- ledger/src/blockstore.rs | 13 ++++++++----- ledger/src/blockstore/blockstore_purge.rs | 20 +++++++++----------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 7b7ac8b5e53326..c140147df34d95 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1154,9 +1154,8 @@ impl Blockstore { self.completed_slots_senders.lock().unwrap().clear(); } - /// Range-delete all entries which prefix matches the specified `slot`, - /// remove `slot`'s parent's SlotMeta next_slots list, and - /// clear `slot`'s SlotMeta (except for next_slots). + /// Clear `slot` from the Blockstore, see ``Blockstore::purge_slot_cleanup_chaining` + /// for more details. /// /// This function currently requires `insert_shreds_lock`, as both /// `clear_unconfirmed_slot()` and `insert_shreds_handle_duplicate()` @@ -1164,8 +1163,12 @@ impl Blockstore { /// family. pub fn clear_unconfirmed_slot(&self, slot: Slot) { let _lock = self.insert_shreds_lock.lock().unwrap(); - // Clear all slot related information, and cleanup slot meta by removing - // `slot` from parents `next_slots`, but retaining `slot`'s `next_slots`. + // Purge the slot and insert an empty `SlotMeta` with only the `next_slots` field preserved. + // Shreds inherently contain the slot of their parent which updates the parent's `next_slots` + // when the child is inserted through `Blockstore::handle_chaining()`. + // However we are only purging and repairing the parent slot here. Since the child will not be + // reinserted the chaining will be lost. In order for bank forks discovery to ingest the child, + // we must retain the chain by preserving `next_slots`. match self.purge_slot_cleanup_chaining(slot) { Ok(_) => {} Err(BlockstoreError::SlotUnavailable) => error!( diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 4c57eca1cc4cb2..5d756ca74ae4d1 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -184,13 +184,12 @@ impl Blockstore { slot_meta.clear_unconfirmed_slot(); write_batch.put::(slot, &slot_meta)?; - if let Err(e) = self.db.write(write_batch) { + self.db.write(write_batch).inspect_err(|e| { error!( - "Error: {:?} while submitting write batch for slot {:?} retrying...", + "Error: {:?} while submitting write batch for slot {:?}", e, slot - ); - return Err(e); - } + ) + })?; Ok(columns_purged) } @@ -220,13 +219,12 @@ impl Blockstore { delete_range_timer.stop(); let mut write_timer = Measure::start("write_batch"); - if let Err(e) = self.db.write(write_batch) { + self.db.write(write_batch).inspect(|e| { error!( - "Error: {:?} while submitting write batch for slot {:?} retrying...", - e, from_slot - ); - return Err(e); - } + "Error: {:?} while submitting write batch for purge from_slot {} to_slot {}", + e, from_slot, to_slot + ) + })?; write_timer.stop(); let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); From 4a95caa7eab2a72d7e2e2c6e25d5d0a4986d356d Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 28 Feb 2024 18:38:11 +0000 Subject: [PATCH 7/8] pr feedback: bubble write_batch error to caller --- ledger/src/blockstore/blockstore_purge.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 5d756ca74ae4d1..fdd5653c65ce22 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -155,10 +155,7 @@ impl Blockstore { let Some(mut slot_meta) = self.meta(slot)? else { return Err(BlockstoreError::SlotUnavailable); }; - let mut write_batch = self - .db - .batch() - .expect("Database Error: Failed to get write batch"); + let mut write_batch = self.db.batch()?; let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?; @@ -209,10 +206,7 @@ impl Blockstore { purge_type: PurgeType, purge_stats: &mut PurgeStats, ) -> Result { - let mut write_batch = self - .db - .batch() - .expect("Database Error: Failed to get write batch"); + let mut write_batch = self.db.batch()?; let mut delete_range_timer = Measure::start("delete_range"); let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?; From bc0f8885bcfcc63eab9a73078e2b495e686e7264 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Wed, 28 Feb 2024 13:39:22 -0500 Subject: [PATCH 8/8] pr feedback: reword comments Co-authored-by: steviez --- ledger/src/blockstore.rs | 6 +++--- ledger/src/blockstore/blockstore_purge.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c140147df34d95..c9bb51530c3325 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1164,9 +1164,9 @@ impl Blockstore { pub fn clear_unconfirmed_slot(&self, slot: Slot) { let _lock = self.insert_shreds_lock.lock().unwrap(); // Purge the slot and insert an empty `SlotMeta` with only the `next_slots` field preserved. - // Shreds inherently contain the slot of their parent which updates the parent's `next_slots` - // when the child is inserted through `Blockstore::handle_chaining()`. - // However we are only purging and repairing the parent slot here. Since the child will not be + // Shreds inherently know their parent slot, and a parent's SlotMeta `next_slots` list + // will be updated when the child is inserted (see `Blockstore::handle_chaining()`). + // However, we are only purging and repairing the parent slot here. Since the child will not be // reinserted the chaining will be lost. In order for bank forks discovery to ingest the child, // we must retain the chain by preserving `next_slots`. match self.purge_slot_cleanup_chaining(slot) { diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index fdd5653c65ce22..15a5c4890e9f05 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -147,7 +147,7 @@ impl Blockstore { /// Purges all columns relating to `slot`. /// - /// Additionally we cleanup the parent of `slot`, by clearing `slot` from + /// Additionally, we cleanup the parent of `slot` by clearing `slot` from /// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot` /// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is /// replayable upon repair of `slot`. @@ -177,7 +177,7 @@ impl Blockstore { } } - // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` field. + // Retain a SlotMeta for `slot` with the `next_slots` field retained slot_meta.clear_unconfirmed_slot(); write_batch.put::(slot, &slot_meta)?;