-
Notifications
You must be signed in to change notification settings - Fork 647
feat(compaction): introduce iceberg compaction pull strategy #21824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request introduces an iceberg compaction pull strategy to support and trigger iceberg compaction tasks based on commit information.
- Updates protobuf build configuration to include the new IcebergCompactionTask type.
- Refactors the IcebergCompactionManager to record commit counts alongside timing data and to provide a method for retrieving top sink IDs based on commit activity.
- Updates the compaction event handler in the hummock module to asynchronously pull tasks and dispatch new iceberg compaction events.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
File | Description |
---|---|
src/prost/build.rs | Added the new IcebergCompactionTask type for protobuf generation. |
src/meta/src/manager/iceberg_compaction.rs | Introduced a CommitInfo struct and updated commit recording and retrieval logic with sorting based on commit count and time. |
src/meta/src/hummock/manager/compaction/compaction_event_loop.rs | Converted the pull task handling to async and integrated the new top-N logic for iceberg compaction events. |
Comments suppressed due to low confidence (2)
src/meta/src/manager/iceberg_compaction.rs:126
- The sorting logic for commit times is unclear; using duration_since with a zero duration may not produce the intended order. Consider comparing the Instants directly or computing durations relative to a common reference for a more natural ordering.
b.1.first_commit_time.duration_since(a.1.first_commit_time).cmp(&std::time::Duration::from_secs(0))
src/meta/src/hummock/manager/compaction/compaction_event_loop.rs:646
- [nitpick] Using .expect here may lead to runtime panics if sink parameters are missing; consider handling the error gracefully to improve robustness.
.await.expect("sink params not found")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
const MIN_SINK_COMMIT_INTERVAL: u64 = 3600; // 1 hour | ||
|
||
sink_ids | ||
.iter() | ||
.max_by_key(|&(_, count)| count) | ||
.map(|(sink_id, _)| *sink_id) | ||
.filter(|(_, commit_info)| { | ||
Instant::now().duration_since(commit_info.first_commit_time) | ||
> std::time::Duration::from_secs(MIN_SINK_COMMIT_INTERVAL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the initial 1 hour is passed since the sink first commit, are we going to aggressively trigger compaction? I thought we will still trigger compaction based on some interval instead of every time get_top_n_iceberg_commit_sink_ids
is called..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.clear_iceberg_commits_by_sink_id(sink_id); |
You’re right, so we need to actively clean up this record after sending the msg to avoid repeated execution. I don’t think this is very intuitive.
I also considered recording last_compaction_ts, but this still depends on the external call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL @hzxa21
…nto li0k/iceberg_pull_task
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
introduce iceberg compaction pull strategy base on
CommitInfo
handle_pull_task_event
function forIcebergCompactionEventDispatcher
tips: Whether compaction is triggered needs to be supported by
sink-param
. The pr will be initiated to support compaction settings per sinkChecklist
Documentation
Release note