8000 feat(meta): support move state-table between compaction-group by Little-Wallace · Pull Request #8390 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(meta): support move state-table between compaction-group #8390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ message GroupMetaChange {
repeated uint32 table_ids_remove = 2;
}

message GroupTableChange {
repeated uint32 table_ids = 1;
uint64 target_group_id = 2;
uint64 origin_group_id = 3;
uint64 new_sst_start_id = 4;
}

message GroupDestroy {}

message GroupDelta {
Expand All @@ -79,6 +86,7 @@ message GroupDelta {
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMetaChange group_meta_change = 4;
GroupTableChange group_table_change = 5;
}
}

Expand Down
241 changes: 157 additions & 84 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::hummock_version_delta::GroupDeltas;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig;
use risingwave_pb::hummock::{
CompactionConfig, CompactionGroupInfo, GroupConstruct, GroupDelta, GroupDestroy,
GroupMetaChange,
compact_task, CompactionConfig, CompactionGroupInfo, GroupConstruct, GroupDelta, GroupDestroy,
GroupMetaChange, GroupTableChange,
};
use tokio::sync::{OnceCell, RwLock};

Expand Down Expand Up @@ -61,7 +62,7 @@ impl<S: MetaStore> HummockManager<S> {
) -> Result<RwLock<CompactionGroupManager>> {
let compaction_group_manager = RwLock::new(CompactionGroupManager {
compaction_groups: BTreeMap::new(),
provided_default_config_for_test: config,
default_config: config,
});
compaction_group_manager
.write()
Expand Down Expand Up @@ -421,11 +422,24 @@ impl<S: MetaStore> HummockManager<S> {

/// Splits a compaction group into two. The new one will contain `table_ids`.
/// Returns the newly created compaction group id.
#[named]
pub async fn split_compaction_group(
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
) -> Result<CompactionGroupId> {
self.move_state_table_to_compaction_group(parent_group_id, table_ids, None, false)
.await
}

/// move some table to another compaction-group. Create a new compaction group if it does not
/// exist.
#[named]
pub async fn move_state_table_to_compaction_group(
&self,
parent_group_id: CompactionGroupId,
table_ids: &[StateTableId],
target_group_id: Option<CompactionGroupId>,
allow_split_by_table: bool,
) -> Result<CompactionGroupId> {
if table_ids.is_empty() {
return Ok(parent_group_id);
Expand Down Expand Up @@ -453,120 +467,176 @@ impl<S: MetaStore> HummockManager<S> {
parent_group_id
)));
}
if let Some(compaction_group_id) = target_group_id {
if !versioning.check_branched_sst_in_target_group(
&table_ids,
&parent_group_id,
&compaction_group_id,
) {
return Err(Error::CompactionGroup(format!(
"invalid split attempt for group {}: we shall wait some time for parent group and target group could compact stale sst files",
parent_group_id
)));
}
}

let mut new_version_delta = BTreeMapEntryTransaction::new_insert(
&mut versioning.hummock_version_deltas,
current_version.id + 1,
build_version_delta_after_version(current_version),
);

// Remove tables from parent group.
for table_id in &table_ids {
let group_deltas = &mut new_version_delta
.group_deltas
.entry(parent_group_id)
.or_default()
.group_deltas;
group_deltas.push(GroupDelta {
delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange {
table_ids_remove: vec![*table_id],
..Default::default()
})),
});
}

// Add tables to new group.
let new_group_id = self
.env
.id_gen_manager()
.generate::<{ IdCategory::CompactionGroup }>()
.await?;
let new_sst_start_id = self
.env
.id_gen_manager()
.generate_interval::<{ IdCategory::HummockSstableId }>(
versioning.current_version.count_new_ssts_in_group_split(
current_version.count_new_ssts_in_group_split(
parent_group_id,
&HashSet::from_iter(table_ids.iter().cloned()),
HashSet::from_iter(table_ids.clone()),
),
)
.await?;
let group_deltas = &mut new_version_delta
.group_deltas
.entry(new_group_id)
.or_default()
.group_deltas;
let config = self
.compaction_group_manager
.read()
.await
.get_compaction_group_config(new_group_id)
.compaction_config
.as_ref()
.clone();
group_deltas.push(GroupDelta {
delta_type: Some(DeltaType::GroupConstruct(GroupConstruct {
group_config: Some(config),
group_id: new_group_id,
parent_group_id,
table_ids,
new_sst_start_id,
})),
});
let mut new_group = None;
let target_compaction_group_id = match target_group_id {
Some(compaction_group_id) => {
match current_version.levels.get(&compaction_group_id) {
Some(group) => {
for table_id in &table_ids {
if group.member_table_ids.contains(table_id) {
return Err(Error::CompactionGroup(format!(
"table {} already exist in group {}",
*table_id, compaction_group_id,
)));
}
}
}
None => {
return Err(Error::CompactionGroup(format!(
"target group {} does not exist",
compaction_group_id,
)));
}
}
let group_deltas = &mut new_version_delta
.group_deltas
.entry(compaction_group_id)
.or_default()
.group_deltas;
group_deltas.push(GroupDelta {
delta_type: Some(DeltaType::GroupTableChange(GroupTableChange {
table_ids: table_ids.to_vec(), 5D40
origin_group_id: parent_group_id,
target_group_id: compaction_group_id,
new_sst_start_id,
})),
});
compaction_group_id
}
None => {
// All NewCompactionGroup pairs are mapped to one new compaction group.
let new_compaction_group_id = self
.env
.id_gen_manager()
.generate::<{ IdCategory::CompactionGroup }>()
.await?;
let mut config = self
.compaction_group_manager
.read()
.await
.get_default_compaction_group_config();
config.split_by_state_table = allow_split_by_table;

new_version_delta.group_deltas.insert(
new_compaction_group_id,
GroupDeltas {
group_deltas: vec![GroupDelta {
delta_type: Some(DeltaType::GroupConstruct(GroupConstruct {
group_config: Some(config.clone()),
group_id: new_compaction_group_id,
parent_group_id,
new_sst_start_id,
table_ids: table_ids.to_vec(),
})),
}],
},
);

new_group = Some((new_compaction_group_id, config));
new_version_delta.group_deltas.insert(
parent_group_id,
GroupDeltas {
group_deltas: vec![GroupDelta {
delta_type: Some(DeltaType::GroupMetaChange(GroupMetaCha AE88 nge {
table_ids_remove: table_ids.to_vec(),
..Default::default()
})),
}],
},
);
new_compaction_group_id
}
};
let mut branched_ssts = BTreeMapTransaction::new(&mut versioning.branched_ssts);
let mut trx = Transaction::default();
new_version_delta.apply_to_txn(&mut trx)?;
self.env.meta_store().txn(trx).await?;
if let Some((new_compaction_group_id, config)) = new_group {
let mut compaction_group_manager = self.compaction_group_manager.write().await;
let insert = BTreeMapEntryTransaction::new_insert(
&mut compaction_group_manager.compaction_groups,
new_compaction_group_id,
CompactionGroup {
group_id: new_compaction_group_id,
compaction_config: Arc::new(config),
},
);
insert.apply_to_txn(&mut trx)?;
self.env.meta_store().txn(trx).await?;
insert.commit();
} else {
self.env.meta_store().txn(trx).await?;
}
let sst_split_info = versioning
.current_version
.apply_version_delta(&new_version_delta);
// Updates SST split info
for (object_id, sst_id, parent_old_sst_id, parent_new_sst_id) in sst_split_info {
for (object_id, sst_id, _parent_old_sst_id, parent_new_sst_id) in sst_split_info {
match branched_ssts.get_mut(object_id) {
Some(mut entry) => {
let p = entry.get_mut(&parent_group_id).unwrap();
let parent_pos = p.iter().position(|id| *id == parent_old_sst_id).unwrap();
if let Some(parent_new_sst_id) = parent_new_sst_id {
p[parent_pos] = parent_new_sst_id;
entry.insert(parent_group_id, parent_new_sst_id);
} else {
p.remove(parent_pos);
if p.is_empty() {
entry.remove(&parent_group_id);
}
entry.remove(&parent_group_id);
}
entry.entry(new_group_id).or_default().push(sst_id);
entry.insert(target_compaction_group_id, sst_id);
}
None => {
branched_ssts.insert(
object_id,
if let Some(parent_new_sst_id) = parent_new_sst_id {
[
(parent_group_id, vec![parent_new_sst_id]),
(new_group_id, vec![sst_id]),
]
.into_iter()
.collect()
} else {
[(new_group_id, vec![sst_id])].into_iter().collect()
},
);
let mut groups = HashMap::from_iter([(target_compaction_group_id, sst_id)]);
if let Some(parent_new_sst_id) = parent_new_sst_id {
groups.insert(parent_group_id, parent_new_sst_id);
}
branched_ssts.insert(object_id, groups);
}
}
}
new_version_delta.commit();
branched_ssts.commit_memory();
self.notify_last_version_delta(versioning);

Ok(new_group_id)
// Don't trigger compactions if we enable deterministic compaction
if !self.env.opts.compaction_deterministic_test {
// commit_epoch may contains SSTs from any compaction group
self.try_send_compaction_request(parent_group_id, compact_task::TaskType::SpaceReclaim);
self.try_send_compaction_request(
target_compaction_group_id,
compact_task::TaskType::SpaceReclaim,
);
}
Ok(target_compaction_group_id)
}
}

#[derive(Default)]
pub(super) struct CompactionGroupManager {
compaction_groups: BTreeMap<CompactionGroupId, CompactionGroup>,
/// Provided default config, only used in test.
provided_default_config_for_test: CompactionConfig,
default_config: CompactionConfig,
}

impl CompactionGroupManager {
Expand Down Expand Up @@ -602,14 +672,20 @@ impl CompactionGroupManager {
compaction_group_ids
.iter()
.map(|id| {
let group = self.compaction_groups.get(id).cloned().unwrap_or_else(|| {
CompactionGroup::new(*id, self.provided_default_config_for_test.clone())
});
let group = self
.compaction_groups
.get(id)
.cloned()
.unwrap_or_else(|| CompactionGroup::new(*id, self.default_config.clone()));
(*id, group)
})
.collect()
}

fn get_default_compaction_group_config(&self) -> CompactionConfig {
self.default_config.clone()
}

async fn update_compaction_config<S: MetaStore>(
&mut self,
compaction_group_ids: &[CompactionGroupId],
Expand All @@ -621,10 +697,7 @@ impl CompactionGroupManager {
if !compaction_groups.contains_key(compaction_group_id) {
compaction_groups.insert(
*compaction_group_id,
CompactionGroup::new(
*compaction_group_id,
self.provided_default_config_for_test.clone(),
),
CompactionGroup::new(*compaction_group_id, self.default_config.clone()),
);
}
let group = compaction_groups.get(compaction_group_id).unwrap();
Expand Down
Loading
0