-
Notifications
You must be signed in to change notification settings - Fork 4.9k
blockstore: atomize slot clearing, relax parent slot meta check #35124
Changes from all commits
42b2668
8c142ba
d0e8b7f
68a02a4
69431c6
a6bcc31
4a95caa
bc0f888
File filter
Filter by extension
Conversations
Jum 8000 p to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -135,6 +135,7 @@ impl Blockstore { | |
} | ||
} | ||
|
||
#[cfg(test)] | ||
pub(crate) fn run_purge( | ||
&self, | ||
from_slot: Slot, | ||
|
@@ -144,90 +145,181 @@ impl Blockstore { | |
self.run_purge_with_stats(from_slot, to_slot, purge_type, &mut PurgeStats::default()) | ||
} | ||
|
||
/// Purges all columns relating to `slot`. | ||
/// | ||
/// Additionally, we cleanup the parent of `slot` by clearing `slot` from | ||
/// the parent's `next_slots`. We reinsert an orphaned `slot_meta` for `slot` | ||
/// that preserves `slot`'s `next_slots`. This ensures that `slot`'s fork is | ||
/// replayable upon repair of `slot`. | ||
pub(crate) fn purge_slot_cleanup_chaining(&self, slot: Slot) -> Result<bool> { | ||
let Some(mut slot_meta) = self.meta(slot)? else { | ||
return Err(BlockstoreError::SlotUnavailable); | ||
}; | ||
let mut write_batch = self.db.batch()?; | ||
|
||
let columns_purged = self.purge_range(&mut write_batch, slot, slot, PurgeType::Exact)?; | ||
|
||
if let Some(parent_slot) = slot_meta.parent_slot { | ||
let parent_slot_meta = self.meta(parent_slot)?; | ||
if let Some(mut parent_slot_meta) = parent_slot_meta { | ||
// .retain() is a linear scan; however, next_slots should | ||
// only contain several elements so this isn't so bad | ||
parent_slot_meta | ||
.next_slots | ||
.retain(|&next_slot| next_slot != slot); | ||
write_batch.put::<cf::SlotMeta>(parent_slot, &parent_slot_meta)?; | ||
} else { | ||
error!( | ||
"Parent slot meta {} for child {} is missing or cleaned up. | ||
Falling back to orphan repair to remedy the situation", | ||
parent_slot, slot | ||
); | ||
} | ||
} | ||
|
||
// Retain a SlotMeta for `slot` with the `next_slots` field retained | ||
slot_meta.clear_unconfirmed_slot(); | ||
write_batch.put::<cf::SlotMeta>(slot, &slot_meta)?; | ||
|
||
self.db.write(write_batch).inspect_err(|e| { | ||
error!( | ||
"Error: {:?} while submitting write batch for slot {:?}", | ||
e, slot | ||
) | ||
})?; | ||
Ok(columns_purged) | ||
} | ||
|
||
/// A helper function to `purge_slots` that executes the ledger clean up. | ||
/// The cleanup applies to \[`from_slot`, `to_slot`\]. | ||
/// | ||
/// When `from_slot` is 0, any sst-file with a key-range completely older | ||
/// than `to_slot` will also be deleted. | ||
/// | ||
/// Note: slots > `to_slot` that chained to a purged slot are not properly | ||
/// cleaned up. This function is not intended to be used if such slots need | ||
/// to be replayed. | ||
pub(crate) fn run_purge_with_stats( | ||
&self, | ||
from_slot: Slot, | ||
to_slot: Slot, | ||
purge_type: PurgeType, | ||
purge_stats: &mut PurgeStats, | ||
) -> Result<bool> { | ||
let mut write_batch = self | ||
.db | ||
.batch() | ||
.expect("Database Error: Failed to get write batch"); | ||
let mut write_batch = self.db.batch()?; | ||
|
||
steviez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut delete_range_timer = Measure::start("delete_range"); | ||
let columns_purged = self.purge_range(&mut write_batch, from_slot, to_slot, purge_type)?; | ||
delete_range_timer.stop(); | ||
|
||
let mut write_timer = Measure::start("write_batch"); | ||
self.db.write(write_batch).inspect(|e| { | ||
error!( | ||
"Error: {:?} while submitting write batch for purge from_slot {} to_slot {}", | ||
e, from_slot, to_slot | ||
) | ||
})?; | ||
write_timer.stop(); | ||
|
||
let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); | ||
// purge_files_in_range delete any files whose slot range is within | ||
// [from_slot, to_slot]. When from_slot is 0, it is safe to run | ||
// purge_files_in_range because if purge_files_in_range deletes any | ||
// sst file that contains any range-deletion tombstone, the deletion | ||
// range of that tombstone will be completely covered by the new | ||
// range-delete tombstone (0, to_slot) issued above. | ||
// | ||
// On the other hand, purge_files_in_range is more effective and | ||
// efficient than the compaction filter (which runs key-by-key) | ||
// because all the sst files that have key range below to_slot | ||
// can be deleted immediately. | ||
if columns_purged && from_slot == 0 { | ||
self.purge_files_in_range(from_slot, to_slot); | ||
} | ||
purge_files_in_range_timer.stop(); | ||
|
||
purge_stats.delete_range += delete_range_timer.as_us(); | ||
purge_stats.write_batch += write_timer.as_us(); | ||
purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us(); | ||
|
||
Ok(columns_purged) | ||
} | ||
|
||
fn purge_range( | ||
&self, | ||
write_batch: &mut WriteBatch, | ||
from_slot: Slot, | ||
to_slot: Slot, | ||
purge_type: PurgeType, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I think it's cleaner encapsulation to pass a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I see the merit of adding the new function The enum is part of the public API, so I think it is reasonable to expect someone to know about it. And the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
) -> Result<bool> { | ||
let columns_purged = self | ||
.db | ||
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::SlotMeta>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::BankHash>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::BankHash>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::Root>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::Root>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::ShredData>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::ShredData>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::ShredCode>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::ShredCode>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::DeadSlots>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::DeadSlots>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::DuplicateSlots>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::DuplicateSlots>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::ErasureMeta>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::ErasureMeta>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::Orphans>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::Orphans>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::Index>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::Index>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::Rewards>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::Rewards>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::Blocktime>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::Blocktime>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::PerfSamples>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::PerfSamples>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::BlockHeight>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::BlockHeight>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::OptimisticSlots>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::OptimisticSlots>(write_batch, from_slot, to_slot) | ||
.is_ok() | ||
& self | ||
.db | ||
.delete_range_cf::<cf::MerkleRootMeta>(&mut write_batch, from_slot, to_slot) | ||
.delete_range_cf::<cf::MerkleRootMeta>(write_batch, from_slot, to_slot) | ||
.is_ok(); | ||
|
||
match purge_type { | ||
PurgeType::Exact => { | ||
self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?; | ||
self.purge_special_columns_exact(write_batch, from_slot, to_slot)?; | ||
} | ||
PurgeType::CompactionFilter => { | ||
// No explicit action is required here because this purge type completely and | ||
|
@@ -237,39 +329,6 @@ impl Blockstore { | |
// in no spiky periodic huge delete_range for them. | ||
} | ||
} | ||
delete_range_timer.stop(); | ||
|
||
let mut write_timer = Measure::start("write_batch"); | ||
if let Err(e) = self.db.write(write_batch) { | ||
error!( | ||
"Error: {:?} while submitting write batch for slot {:?} retrying...", | ||
e, from_slot | ||
); | ||
return Err(e); | ||
} | ||
write_timer.stop(); | ||
|
||
let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); | ||
// purge_files_in_range delete any files whose slot range is within | ||
// [from_slot, to_slot]. When from_slot is 0, it is safe to run | ||
// purge_files_in_range because if purge_files_in_range deletes any | ||
// sst file that contains any range-deletion tombstone, the deletion | ||
// range of that tombstone will be completely covered by the new | ||
// range-delete tombstone (0, to_slot) issued above. | ||
// | ||
// On the other hand, purge_files_in_range is more effective and | ||
// efficient than the compaction filter (which runs key-by-key) | ||
// because all the sst files that have key range below to_slot | ||
// can be deleted immediately. | ||
if columns_purged && from_slot == 0 { | ||
self.purge_files_in_range(from_slot, to_slot); | ||
} | ||
purge_files_in_range_timer.stop(); | ||
|
||
purge_stats.delete_range += delete_range_timer.as_us(); | ||
purge_stats.write_batch += write_timer.as_us(); | ||
purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us(); | ||
|
||
Ok(columns_purged) | ||
} | ||
|
||
|
@@ -1103,4 +1162,51 @@ pub mod tests { | |
} | ||
assert_eq!(count, 1); | ||
} | ||
|
||
#[test] | ||
fn test_purge_slot_cleanup_chaining_missing_slot_meta() { | ||
let ledger_path = get_tmp_ledger_path_auto_delete!(); | ||
let blockstore = Blockstore::open(ledger_path.path()).unwrap(); | ||
|
||
let (shreds, _) = make_many_slot_entries(0, 10, 5); | ||
blockstore.insert_shreds(shreds, None, false).unwrap(); | ||
|
||
assert!(matches!( | ||
blockstore.purge_slot_cleanup_chaining(11).unwrap_err(), | ||
BlockstoreError::SlotUnavailable | ||
)); | ||
} | ||
|
||
#[test] | ||
fn test_purge_slot_cleanup_chaining() { | ||
let ledger_path = get_tmp_ledger_path_auto_delete!(); | ||
let blockstore = Blockstore::open(ledger_path.path()).unwrap(); | ||
|
||
let (shreds, _) = make_many_slot_entries(0, 10, 5); | ||
blockstore.insert_shreds(shreds, None, false).unwrap(); | ||
let (slot_11, _) = make_slot_entries(11, 4, 5, true); | ||
blockstore.insert_shreds(slot_11, None, false).unwrap(); | ||
let (slot_12, _) = make_slot_entries(12, 5, 5, true); | ||
blockstore.insert_shreds(slot_12, None, false).unwrap(); | ||
|
||
blockstore.purge_slot_cleanup_chaining(5).unwrap(); | ||
|
||
let slot_meta = blockstore.meta(5).unwrap().unwrap(); | ||
let expected_slot_meta = SlotMeta { | ||
slot: 5, | ||
// Only the next_slots should be preserved | ||
next_slots: vec![6, 12], | ||
..SlotMeta::default() | ||
}; | ||
assert_eq!(slot_meta, expected_slot_meta); | ||
|
||
let parent_slot_meta = blockstore.meta(4).unwrap().unwrap(); | ||
assert_eq!(parent_slot_meta.next_slots, vec![11]); | ||
|
||
let child_slot_meta = blockstore.meta(6).unwrap().unwrap(); | ||
assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); | ||
|
||
let child_slot_meta = blockstore.meta(12).unwrap().unwrap(); | ||
assert_eq!(child_slot_meta.parent_slot.unwrap(), 5); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.