8000 feat(compaction): introduce iceberg compaction pull strategy by Li0k · Pull Request #21824 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(compaction): introduce iceberg compaction pull strategy #21824

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 14, 2025

Conversation

Li0k
Copy link
Contributor
@Li0k Li0k commented May 12, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

introduce iceberg compaction pull strategy base on CommitInfo

  • support handle_pull_task_event function for IcebergCompactionEventDispatcher
  • support compaction trigger with fixed interval

tips: Whether compaction is triggered needs to be supported by sink-param. The pr will be initiated to support compaction settings per sink

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • I have checked the Release Timeline and Currently Supported Versions to determine which release branches I need to cherry-pick this PR into.

Documentation

  • My PR needs documentation updates.
Release note

Copy link
Contributor
@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This pull request introduces an iceberg compaction pull strategy to support and trigger iceberg compaction tasks based on commit information.

  • Updates protobuf build configuration to include the new IcebergCompactionTask type.
  • Refactors the IcebergCompactionManager to record commit counts alongside timing data and to provide a method for retrieving top sink IDs based on commit activity.
  • Updates the compaction event handler in the hummock module to asynchronously pull tasks and dispatch new iceberg compaction events.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
src/prost/build.rs Added the new IcebergCompactionTask type for protobuf generation.
src/meta/src/manager/iceberg_compaction.rs Introduced a CommitInfo struct and updated commit recording and retrieval logic with sorting based on commit count and time.
src/meta/src/hummock/manager/compaction/compaction_event_loop.rs Converted the pull task handling to async and integrated the new top-N logic for iceberg compaction events.
Comments suppressed due to low confidence (2)

src/meta/src/manager/iceberg_compaction.rs:126

  • The sorting logic for commit times is unclear; using duration_since with a zero duration may not produce the intended order. Consider comparing the Instants directly or computing durations relative to a common reference for a more natural ordering.
b.1.first_commit_time.duration_since(a.1.first_commit_time).cmp(&std::time::Duration::from_secs(0))

src/meta/src/hummock/manager/compaction/compaction_event_loop.rs:646

  • [nitpick] Using .expect here may lead to runtime panics if sink parameters are missing; consider handling the error gracefully to improve robustness.
.await.expect("sink params not found")

Copy link
Contributor
@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Comment on lines 133 to 139
const MIN_SINK_COMMIT_INTERVAL: u64 = 3600; // 1 hour

sink_ids
.iter()
.max_by_key(|&(_, count)| count)
.map(|(sink_id, _)| *sink_id)
.filter(|(_, commit_info)| {
Instant::now().duration_since(commit_info.first_commit_time)
> std::time::Duration::from_secs(MIN_SINK_COMMIT_INTERVAL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the initial 1 hour is passed since the sink first commit, are we going to aggressively trigger compaction? I thought we will still trigger compaction based on some interval instead of every time get_top_n_iceberg_commit_sink_ids is called..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.clear_iceberg_commits_by_sink_id(sink_id);

You’re right, so we need to actively clean up this record after sending the msg to avoid repeated execution. I don’t think this is very intuitive.
I also considered recording last_compaction_ts, but this still depends on the external call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL @hzxa21

@Li0k Li0k enabled auto-merge May 14, 2025 09:13
@Li0k Li0k added this pull request to the merge queue May 14, 2025
Merged via the queue into main with commit 436277b May 14, 2025
32 of 33 checks passed
@Li0k Li0k deleted the li0k/iceberg_pull_task branch May 14, 2025 10:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants
0