8000 [BugFix] Fix some bugs in scenarios where file_bundling and alter operations intersect by sevev · Pull Request #60091 · StarRocks/starrocks · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[BugFix] Fix some bugs in scenarios where file_bundling and alter operations intersect #60091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 93 additions & 63 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,67 +280,103 @@ Status TabletManager::put_tablet_metadata(const TabletMetadata& metadata) {
return put_tablet_metadata(std::move(metadata_ptr));
}

DEFINE_FAIL_POINT(get_real_location_failed);
DEFINE_FAIL_POINT(tablet_meta_not_found);
// NOTE: tablet_metas is non-const and we will clear schemas for optimization.
// Callers should ensure thread safety.
Status TabletManager::put_bundle_tablet_metadata(std::map<int64_t, TabletMetadataPB>& tablet_metas) {
if (tablet_metas.empty()) {
return Status::InternalError("tablet_metas cannot be empty");
}

BundleTabletMetadataPB bundle_meta;
auto partition_location = tablet_metadata_root_location(tablet_metas.begin()->first);
std::unordered_map<int64_t, TabletSchemaPB> unique_schemas;
for (auto& [tablet_id, meta] : tablet_metas) {
(*bundle_meta.mutable_tablet_to_schema())[tablet_id] = meta.schema().id();
unique_schemas.emplace(meta.schema().id(), meta.schema());
for (const auto& [schema_id, schema] : meta.historical_schemas()) {
unique_schemas.emplace(schema_id, schema);
// For compatibility
// tablets under the same partition should be in the same directory.
// However, due to a bug in the LakeRollup implementation(https://github.com/StarRocks/starrocks/pull/60073),
// the rollup and base tables might not be in the same directory.
// When upgrading from an old version and enabling file bundling, publishing may incorrectly reference the directory.
std::map<std::string, std::set<int64_t>> partition_location_to_tablets;
for (auto& [tablet_id, _] : tablet_metas) {
auto real_location = _location_provider->real_location(tablet_metadata_root_location(tablet_id));
FAIL_POINT_TRIGGER_EXECUTE(get_re 8000 al_location_failed,
{ real_location = Status::InternalError("get location failed"); });
if (!real_location.ok()) {
std::string msg = strings::Substitute("tablet:$0 get real location failed, $1", tablet_id,
real_location.status().to_string());
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
auto iter = partition_location_to_tablets.find(*real_location);
if (iter == partition_location_to_tablets.end()) {
partition_location_to_tablets[*real_location] = {tablet_id};
} else {
iter->second.insert(tablet_id);
}
}

for (auto& [schema_id, schema] : unique_schemas) {
(*bundle_meta.mutable_schemas())[schema_id] = std::move(schema);
}
for (auto& [partition_location, tablets] : partition_location_to_tablets) {
BundleTabletMetadataPB bundle_meta;
std::unordered_map<int64_t, TabletSchemaPB> unique_schemas;
int64_t commit_version = 0;
for (auto& tablet_id : tablets) {
auto iter = tablet_metas.find(tablet_id);
FAIL_POINT_TRIGGER_EXECUTE(tablet_meta_not_found, { iter = tablet_metas.end(); });
RETURN_IF((iter == tablet_metas.end()),
Status::InternalError(strings::Substitute("tablet {} metadata not found", tablet_id)));
const auto& meta = iter->second;
RETURN_IF((commit_version != 0 && commit_version != meta.version()),
Status::InternalError(strings::Substitute("commit version not match: $0 vs $1", commit_version,
meta.version())));
commit_version = meta.version();
(*bundle_meta.mutable_tablet_to_schema())[tablet_id] = meta.schema().id();
unique_schemas.emplace(meta.schema().id(), meta.schema());
for (const auto& [schema_id, schema] : meta.historical_schemas()) {
unique_schemas.emplace(schema_id, schema);
}
}

auto make_page_pointer = [](int64_t offset, int64_t size) {
PagePointerPB pointer;
pointer.set_offset(offset);
pointer.set_size(size);
return pointer;
};
for (auto& [schema_id, schema] : unique_schemas) {
(*bundle_meta.mutable_schemas())[schema_id] = std::move(schema);
}

const std::string meta_location =
bundle_tablet_metadata_location(tablet_metas.begin()->first, tablet_metas.begin()->second.version());

ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(meta_location));
WritableFileOptions opts{.sync_on_close = true, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
ASSIGN_OR_RETURN(auto meta_file, fs->new_writable_file(opts, meta_location));
std::string serialized_buf;
int64_t current_offset = 0;
for (auto& [tablet_id, meta] : tablet_metas) {
meta.clear_schema();
meta.mutable_historical_schemas()->clear();
serialized_buf.clear();
if (!meta.SerializeToString(&serialized_buf)) {
return Status::InternalError("Failed to serialize tablet metadata");
auto make_page_pointer = [](int64_t offset, int64_t size) {
PagePointerPB pointer;
pointer.set_offset(offset);
pointer.set_size(size);
return pointer;
};

const std::string meta_location = bundle_tablet_metadata_location(*tablets.begin(), commit_version);
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(meta_location));
WritableFileOptions opts{.sync_on_close = true, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
ASSIGN_OR_RETURN(auto meta_file, fs->new_writable_file(opts, meta_location));
std::string serialized_buf;
int64_t current_offset = 0;
for (auto& tablet_id : tablets) {
auto iter = tablet_metas.find(tablet_id);
iter->second.clear_schema();
iter->second.mutable_historical_schemas()->clear();
serialized_buf.clear();
if (!iter->second.SerializeToString(&serialized_buf)) {
return Status::InternalError("Failed to serialize tablet metadata");
}

(*bundle_meta.mutable_tablet_meta_pages())[tablet_id] =
make_page_pointer(current_offset, serialized_buf.size());
RETURN_IF_ERROR(meta_file->append(Slice(serialized_buf)));
current_offset += serialized_buf.size();
}

(*bundle_meta.mutable_tablet_meta_pages())[tablet_id] =
make_page_pointer(current_offset, serialized_buf.size());
serialized_buf.clear();
if (!bundle_meta.SerializeToString(&serialized_buf)) {
return Status::IOError("Failed to write shared metadata header");
}
RETURN_IF_ERROR(meta_file->append(Slice(serialized_buf)));
current_offset += serialized_buf.size();
}

serialized_buf.clear();
if (!bundle_meta.SerializeToString(&serialized_buf)) {
return Status::IOError("Failed to write shared metadata header");
std::string fixed_buf;
put_fixed64_le(&fixed_buf, serialized_buf.size());
RETURN_IF_ERROR(meta_file->append(Slice(fixed_buf)));
RETURN_IF_ERROR(meta_file->close());
_metacache->cache_aggregation_partition(partition_location, true);
}
RETURN_IF_ERROR(meta_file->append(Slice(serialized_buf)));
std::string fixed_buf;
put_fixed64_le(&fixed_buf, serialized_buf.size());
RETURN_IF_ERROR(meta_file->append(Slice(fixed_buf)));
RETURN_IF_ERROR(meta_file->close());
_metacache->cache_aggregation_partition(partition_location, true);
return Status::OK();
}

Expand Down Expand Up @@ -393,31 +429,24 @@ StatusOr<TabletMetadataPtr> TabletManager::get_tablet_metadata(int64_t tablet_id
int64_t expected_gtid,
const std::shared_ptr<FileSystem>& fs) {
StatusOr<TabletMetadataPtr> tablet_metadata_or;
if (_metacache->lookup_aggregation_partition(tablet_metadata_root_location(tablet_id))) {
if (version == kInitialVersion) {
// Handle tablet initial metadata
if (version == kInitialVersion) {
tablet_metadata_or =
get_tablet_metadata(tablet_metadata_location(tablet_id, version), fill_cache, expected_gtid, fs);
if (tablet_metadata_or.status().is_not_found()) {
tablet_metadata_or =
get_tablet_metadata(tablet_initial_metadata_location(tablet_id), fill_cache, expected_gtid, fs);
} else {
}
} else {
auto cache_key = _location_provider->real_location(tablet_metadata_root_location(tablet_id));
if (cache_key.ok() && _metacache->lookup_aggregation_partition(*cache_key)) {
tablet_metadata_or = get_single_tablet_metadata(tablet_id, version, fill_cache, expected_gtid, fs);
if (tablet_metadata_or.status().is_not_found()) {
tablet_metadata_or = get_tablet_metadata(tablet_metadata_location(tablet_id, version), fill_cache,
expected_gtid, fs);
}
}
} else {
tablet_metadata_or =
get_tablet_metadata(tablet_metadata_location(tablet_id, version), fill_cache, expected_gtid, fs);
if (!tablet_metadata_or.status().is_not_found()) {
return tablet_metadata_or;
}
if (tablet_metadata_or.status().is_not_found() && version == kInitialVersion) {
// Handle tablet initial metadata
} else {
tablet_metadata_or =
get_tablet_metadata(tablet_initial_metadata_location(tablet_id), fill_cache, expected_gtid, fs);
} else if (tablet_metadata_or.status().is_not_found()) {
// get single tablet metadata
tablet_metadata_or = get_single_tablet_metadata(tablet_id, version, fill_cache, expected_gtid, fs);
get_tablet_metadata(tablet_metadata_location(tablet_id, version), fill_cache, expected_gtid, fs);
}
}

Expand All @@ -439,7 +468,8 @@ StatusOr<TabletMetadataPtr> TabletManager::get_tablet_metadata(const string& pat
}
StatusOr<TabletMetadataPtr> metadata_or;
auto [tablet_id, version] = parse_tablet_metadata_filename(basename(path));
if (_metacache->lookup_aggregation_partition(tablet_metadata_root_location(tablet_id))) {
auto cache_key = _location_provider->real_location(tablet_metadata_root_location(tablet_id));
if (cache_key.ok() && _metacache->lookup_aggregation_partition(*cache_key)) {
metadata_or = get_single_tablet_metadata(tablet_id, version, fill_cache, expected_gtid, fs);
if (metadata_or.status().is_not_found()) {
metadata_or = load_tablet_metadata(path, fill_cache, expected_gtid, fs);
Expand Down
10 changes: 10 additions & 0 deletions be/src/storage/lake/vacuum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,16 @@ static Status vacuum_tablet_metadata(TabletManager* tablet_mgr, std::string_view
// collect meta files to vacuum at partition level
AsyncFileDeleter metafile_deleter(INT64_MAX, metafile_delete_cb);
auto meta_dir = join_path(root_dir, kMetadataDirectoryName);
// a special case:
// if a table enable file_bundling and finished alter job, the new created tablet will create initial tablet metadata
// its own tablet_id to avoid overwriting the initial tablet metadata.
// After that, we need to vacuum these metadata file using its own tablet_id
if (vacuum_version_range->min_version <= 1) {
for (auto& tablet_i 5D32 nfo : tablet_infos) {
RETURN_IF_ERROR(metafile_deleter.delete_file(
join_path(meta_dir, tablet_metadata_filename(tablet_info.tablet_id(), 1))));
}
}
for (auto v = vacuum_version_range->min_version; v < vacuum_version_range->max_version; v++) {
RETURN_IF_ERROR(metafile_deleter.delete_file(join_path(meta_dir, tablet_metadata_filename(0, v))));
}
Expand Down
28 changes: 26 additions & 2 deletions be/test/storage/lake/tablet_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ TEST_F(LakeTabletManagerTest, put_bundle_tablet_metadata) {
starrocks::TabletMetadataPB metadata2;
{
metadata2.set_id(2);
metadata2.set_version(2);
metadata2.set_version(3);
metadata2.mutable_schema()->CopyFrom(schema_pb2);
auto& item1 = (*metadata2.mutable_historical_schemas())[10];
item1.CopyFrom(schema_pb1);
Expand All @@ -646,7 +646,31 @@ TEST_F(LakeTabletManagerTest, put_bundle_tablet_metadata) {

metadatas.emplace(1, metadata1);
metadatas.emplace(2, metadata2);
EXPECT_OK(_tablet_manager->put_bundle_tablet_metadata(metadatas));

{
auto fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("get_real_location_failed");
PFailPointTriggerMode trigger_mode;
trigger_mode.set_mode(FailPointTriggerModeType::ENABLE);
fp->setMode(trigger_mode);
ASSERT_FALSE(_tablet_manager->put_bundle_tablet_metadata(metadatas).ok());
trigger_mode.set_mode(FailPointTriggerModeType::DISABLE);
fp->setMode(trigger_mode);

fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("tablet_meta_not_found");
trigger_mode.set_mode(FailPointTriggerModeType::ENABLE);
fp->setMode(trigger_mode);
ASSERT_FALSE(_tablet_manager->put_bundle_tablet_metadata(metadatas).ok());
trigger_mode.set_mode(FailPointTriggerModeType::DISABLE);
fp->setMode(trigger_mode);
}

ASSERT_FALSE(_tablet_manager->put_bundle_tablet_metadata(metadatas).ok());

metadata2.set_version(2);
metadatas.clear();
metadatas.emplace(1, metadata1);
metadatas.emplace(2, metadata2);
ASSERT_OK(_tablet_manager->put_bundle_tablet_metadata(metadatas));

{
auto res = _tablet_manager->get_tablet_metadata(1, 2);
Expand Down
10 changes: 5 additions & 5 deletions be/test/storage/lake/vacuum_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,7 @@ TEST_P(LakeVacuumTest, test_vacuum_bundle_metadata) {
vacuum(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(0, response.vacuumed_files());
EXPECT_EQ(2, resp EA3A onse.vacuumed_files());
// The size of deleted metadata files is not counted in vacuumed_file_size.
EXPECT_EQ(0, response.vacuumed_file_size());

Expand Down Expand Up @@ -1802,7 +1802,7 @@ TEST_P(LakeVacuumTest, test_vacuum_bundle_metadata) {
vacuum(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(2, response.vacuumed_files());
EXPECT_EQ(4, response.vacuumed_files());
EXPECT_EQ(0, response.vacuumed_file_size());

EXPECT_FALSE(file_exist(tablet_metadata_filename(0, 1)));
Expand Down Expand Up @@ -2054,7 +2054,7 @@ TEST_P(LakeVacuumTest, test_vacuum_shared_data_files) {
vacuum(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(0, response.vacuumed_files());
EXPECT_EQ(2, response.vacuumed_files());
// The size of deleted metadata files is not counted in vacuumed_file_size.
EXPECT_EQ(0, response.vacuumed_file_size());

Expand Down Expand Up @@ -2083,7 +2083,7 @@ TEST_P(LakeVacuumTest, test_vacuum_shared_data_files) {
vacuum(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(2, response.vacuumed_files());
EXPECT_EQ(4, response.vacuumed_files());
EXPECT_EQ(0, response.vacuumed_file_size());

EXPECT_FALSE(file_exist(tablet_metadata_filename(0, 1)));
Expand Down Expand Up @@ -2123,7 +2123,7 @@ TEST_P(LakeVacuumTest, test_vacuum_shared_data_files) {
vacuum(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(5, response.vacuumed_files());
EXPECT_EQ(7, response.vacuumed_files());
EXPECT_EQ(16384, response.vacuumed_file_size());

EXPECT_FALSE(file_exist(tablet_metadata_filename(0, 1)));
Expand Down
Loading
Loading
0