8000 [storage/archive] Use `items_per_section` instead of `section_mask` by patrick-ogrady · Pull Request #1193 · commonwarexyz/monorepo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[storage/archive] Use items_per_section instead of section_mask #1193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load co 8000 mments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions storage/fuzz/fuzz_targets/archive_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn fuzz(data: FuzzInput) {
runner.start(|context| async move {
let cfg = Config {
partition: "test".into(),
section_mask: 0xffff_ffff_ffff_ff00u64,
items_per_section: 256,
write_buffer: 1024,
translator: EightCap,
replay_buffer: 1024*1024,
Expand Down Expand Up @@ -186,7 +186,7 @@ fn fuzz(data: FuzzInput) {
}

ArchiveOperation::Prune(min) => {
let min = cfg.section_mask & min;
let min = (*min / cfg.items_per_section) * cfg.items_per_section;
archive.prune(min).await.expect("prune failed");
match oldest_allowed {
None => {
Expand Down
6 changes: 3 additions & 3 deletions storage/src/archive/benches/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub const PARTITION: &str = "archive_bench_partition";
/// Number of bytes that can be buffered in a section before being written to disk.
const WRITE_BUFFER: usize = 1024;

/// Section-mask that yields reasonably small blobs for local testing.
const SECTION_MASK: u64 = 0xffff_ffff_ffff_ff00u64;
/// Number of items per section.
const ITEMS_PER_SECTION: u64 = 256;

/// Number of bytes to buffer when replaying.
const REPLAY_BUFFER: usize = 1024 * 1024; // 1MB
Expand All @@ -39,7 +39,7 @@ pub async fn init(ctx: Context, compression: Option<u8>) -> ArchiveType {
translator: TwoCap,
compression,
codec_config: (),
section_mask: SECTION_MASK,
items_per_section: ITEMS_PER_SECTION,
write_buffer: WRITE_BUFFER,
replay_buffer: REPLAY_BUFFER,
};
Expand Down
42 changes: 20 additions & 22 deletions storage/src/archive/fast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
//! partition: "demo".into(),
//! compression: Some(3),
//! codec_config: (),
//! section_mask: 0xffff_ffff_ffff_0000u64,
//! items_per_section: 1024,
//! pending_writes: 10,
//! write_buffer: 1024 * 1024,
//! replay_buffer: 4096,
Expand Down Expand Up @@ -166,10 +166,8 @@ pub struct Config<T: Translator, C> {
/// The codec configuration to use for the value stored in the archive.
pub codec_config: C,

/// Mask to apply to indices to determine section.
///
/// This value is `index & section_mask`.
pub section_mask: u64,
/// The number of items per section.
pub items_per_section: u64,

/// The amount of bytes that can be buffered in a section before being written to disk.
pub write_buffer: usize,
Expand All @@ -193,7 +191,7 @@ mod tests {
use rand::Rng;
use std::collections::BTreeMap;

const DEFAULT_SECTION_MASK: u64 = 0xffff_ffff_ffff_0000u64;
const DEFAULT_ITEMS_PER_SECTION: u64 = 65536;
const DEFAULT_WRITE_BUFFER: usize = 1024;
const DEFAULT_REPLAY_BUFFER: usize = 4096;

Expand All @@ -217,7 +215,7 @@ mod tests {
codec_config: (),
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
Expand Down Expand Up @@ -315,7 +313,7 @@ mod tests {
compression: Some(3),
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
Expand All @@ -341,7 +339,7 @@ mod tests {
compression: None,
write_buffer: 1024,
replay_buffer: 4096,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
};
let result = Archive::<_, _, FixedBytes<64>, i32>::init(context, cfg.clone()).await;
assert!(matches!(
Expand All @@ -364,7 +362,7 @@ mod tests {
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
Expand All @@ -384,7 +382,7 @@ mod tests {
archive.close().await.expect("Failed to close archive");

// Corrupt the value
let section = index & DEFAULT_SECTION_MASK;
let section = (index / DEFAULT_ITEMS_PER_SECTION) * DEFAULT_ITEMS_PER_SECTION;
let (blob, _) = context
.open("test_partition", &section.to_be_bytes())
.await
Expand All @@ -403,7 +401,7 @@ mod tests {
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
},
)
.await.expect("Failed to initialize archive");
Expand All @@ -430,7 +428,7 @@ mod tests {
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
Expand Down Expand Up @@ -488,7 +486,7 @@ mod tests {
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
};
let archive = Archive::init(context.clone(), cfg.clone())
.await
Expand Down Expand Up @@ -531,7 +529,7 @@ mod tests {
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
Expand Down Expand Up @@ -593,7 +591,7 @@ mod tests {
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
Expand Down Expand Up @@ -649,7 +647,7 @@ mod tests {
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: 0xffff_ffff_ffff_ffffu64, // no mask
items_per_section: 1, // no mask - each item is its own section
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
Expand Down Expand Up @@ -726,15 +724,15 @@ mod tests {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
// Initialize the archive
let section_mask = 0xffff_ffff_ffff_ff00u64;
let items_per_section = 256u64;
let cfg = Config {
partition: "test_partition".into(),
translator: TwoCap,
codec_config: (),
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask,
items_per_section,
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
Expand Down Expand Up @@ -791,7 +789,7 @@ mod tests {
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask,
items_per_section,
};
let mut archive =
Archive::<_, _, _, FixedBytes<1024>>::init(context.clone(), cfg.clone())
Expand Down Expand Up @@ -819,7 +817,7 @@ mod tests {
archive.prune(min).await.expect("Failed to prune");

// Ensure all keys can be retrieved that haven't been pruned
let min = min & section_mask;
let min = (min / items_per_section) * items_per_section;
let mut removed = 0;
for (key, (index, data)) in keys {
if index >= min {
Expand Down Expand Up @@ -888,7 +886,7 @@ mod tests {
compression: None,
write_buffer: DEFAULT_WRITE_BUFFER,
replay_buffer: DEFAULT_REPLAY_BUFFER,
section_mask: DEFAULT_SECTION_MASK,
items_per_section: DEFAULT_ITEMS_PER_SECTION,
};
let mut archive = Archive::init(context.clone(), cfg.clone())
.await
Expand Down
18 changes: 11 additions & 7 deletions storage/src/archive/fast/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ impl<K: Array, V: Codec> EncodeSize for Record<K, V> {

/// Implementation of `Archive` storage.
pub struct Archive<T: Translator, E: Storage + Metrics, K: Array, V: Codec> {
// The section mask is used to determine which section of the journal to write to.
section_mask: u64,
items_per_section: u64,
journal: Journal<E, Record<K, V>>,
pending: BTreeSet<u64>,
< F438 span class='blob-code-inner blob-code-marker ' data-code-marker=" ">
Expand All @@ -85,6 +84,11 @@ pub struct Archive<T: Translator, E: Storage + Metrics, K: Array, V: Codec> {
}

impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> Archive<T, E, K, V> {
/// Calculate the section for a given index.
fn section(&self, index: u64) -> u64 {
(index / self.items_per_section) * self.items_per_section
}

/// Initialize a new `Archive` instance.
///
/// The in-memory index for `Archive` is populated during this call
Expand Down Expand Up @@ -155,7 +159,7 @@ impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> Archive<T, E, K, V

// Return populated archive
Ok(Self {
section_mask: cfg.section_mask,
items_per_section: cfg.items_per_section,
journal,
pending: BTreeSet::new(),
oldest_allowed: None,
Expand All @@ -182,7 +186,7 @@ impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> Archive<T, E, K, V
};

// Fetch item from disk
let section = self.section_mask & index;
let section = self.section(index);
let record = self
.journal
.get_exact(section, location.offset, location.len)
Expand All @@ -206,7 +210,7 @@ impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> Archive<T, E, K, V

// Fetch item from disk
let location = self.indices.get(index).ok_or(Error::RecordCorrupted)?;
let section = self.section_mask & index;
let section = self.section(*index);
let record = self
.journal
.get_exact(section, location.offset, location.len)
Expand Down Expand Up @@ -249,7 +253,7 @@ impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> crate::archive::Ar

// Store item in journal
let record = Record::new(index, key.clone(), data);
let section = self.section_mask & index;
let section = self.section(index);
let (offset, len) = self.journal.append(section, record).await?;

// Store index
Expand Down Expand Up @@ -287,7 +291,7 @@ impl<T: Translator, E: Storage + Metrics, K: Array, V: Codec> crate::archive::Ar

async fn prune(&mut self, min: u64) -> Result<(), Error> {
// Update `min` to reflect section mask
let min = self.section_mask & min;
let min = self.section(min);

// Check if min is less than last pruned
if let Some(oldest_allowed) = self.oldest_allowed {
Expand Down
Loading
0