10000 Fix compact by small-turtle-1 · Pull Request #1537 · infiniflow/infinity · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Fix compact #1537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ou 8000 r terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
10000
Diff view
4 changes: 2 additions & 2 deletions src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ Status Status::TransactionNotFound(TransactionID txn_id) {
}

// 4. TXN fail
Status Status::TxnRollback(u64 txn_id) {
return Status(ErrorCode::kTxnRollback, MakeUnique<String>(fmt::format("Transaction: {} is rollback", txn_id)));
Status Status::TxnRollback(u64 txn_id, const String &rollback_reason) {
return Status(ErrorCode::kTxnRollback, MakeUnique<String>(fmt::format("Transaction: {} is rollback. {}", txn_id, rollback_reason)));
}

Status Status::TxnConflict(u64 txn_id, const String &conflict_reason) {
Expand Down
2 changes: 1 addition & 1 deletion src/common/status.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public:
static Status TransactionNotFound(TransactionID txn_id);

// 4. TXN fail
static Status TxnRollback(u64 txn_id);
static Status TxnRollback(u64 txn_id, const String &rollback_reason = "no reanson gived");
static Status TxnConflict(u64 txn_id, const String &conflict_reason);

// 5. Insufficient resource or exceed limits
Expand Down
12 changes: 11 additions & 1 deletion src/executor/operator/physical_compact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import default_values;
import logger;
import infinity_exception;
import third_party;
import status;

namespace infinity {

Expand Down Expand Up @@ -121,18 +122,23 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat
const Vector<SegmentEntry *> &candidate_segments = compact_operator_state->segment_groups_[group_idx];
Vector<SegmentEntry *> compactible_segments;
{
String log_str = fmt::format("PhysicalCompact::Execute: group_idx: {}, candidate_segments: ", group_idx);
for (auto 10000 *candidate_segment : candidate_segments) {
if (candidate_segment->TrySetCompacting(compact_state_data)) {
compactible_segments.push_back(candidate_segment);
}
log_str += fmt::format("{}, ", candidate_segment->segment_id());
}
LOG_INFO(log_str);
}

auto *txn = query_context->GetTxn();
if (compactible_segments.empty()) {
RecoverableError(Status::TxnRollback(txn->TxnID(), "No segment to compact."));
}
auto *txn_mgr = txn->txn_mgr();
auto *buffer_mgr = query_context->storage()->buffer_manager();
TxnTimeStamp scan_ts = txn_mgr->GetNewTimeStamp();
LOG_INFO(fmt::format("PhysicalCompact::Execute: scan_ts: {}", scan_ts));

TableEntry *table_entry = base_table_ref_->table_entry_ptr_;
BlockIndex *block_index = base_table_ref_->block_index_.get();
Expand All @@ -141,6 +147,7 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat

auto new_segment = SegmentEntry::NewSegmentEntry(table_entry, Catalog::GetNextSegmentID(table_entry), txn);
SegmentID new_segment_id = new_segment->segment_id();
LOG_INFO(fmt::format("PhysicalCompact::Execute: txn_id: {}, scan_ts: {}, new segment id: {}", txn->TxnID(), scan_ts, new_segment_id));

UniquePtr<BlockEntry> new_block =
BlockEntry::NewBlockEntry(new_segment.get(), new_segment->GetNextBlockID(), 0 /*checkpoint_ts*/, column_count, txn);
Expand Down Expand Up @@ -190,6 +197,9 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat
if (new_block->row_count() > 0) {
new_segment->AppendBlockEntry(std::move(new_block));
}
if (new_segment->actual_row_count() > new_segment->row_capacity()) {
UnrecoverableError(fmt::format("Compact segment {} error because of row count overflow.", new_segment_id));
}
compact_state_data->AddNewSegment(new_segment, std::move(compactible_segments), txn);
compact_operator_state->compact_idx_ = ++group_idx;
if (group_idx == compact_operator_state->segment_groups_.size()) {
Expand Down
39 changes: 32 additions & 7 deletions src/storage/compaction/DBT_compaction_alg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,35 @@ void SegmentLayer::RemoveSegment(SegmentEntry *shrink_segment) {
}
}

Vector<SegmentEntry *> SegmentLayer::PickCompacting(TransactionID txn_id, SizeT M) {
Vector<SegmentEntry *> SegmentLayer::PickCompacting(TransactionID txn_id, SizeT M, SizeT max_capacity) {
SizeT segment_n = segments_.size();
if (segment_n < M) {
UnrecoverableError(fmt::format("SegmentLayer::PickCompacting error."));
}

Vector<SegmentEntry *> ret;
SizeT pick_n = std::min(M, segments_.size());
for (SizeT i = 0; i < pick_n; ++i) {
auto iter = segments_.begin();
ret.push_back(iter->second);
segments_.erase(iter);
{
Vector<Pair<SegmentEntry *, SizeT>> segments;
for (auto &[segment_id, segment_entry] : segments_) {
segments.emplace_back(segment_entry, segment_entry->actual_row_count());
}
Vector<int> idx(segment_n);
std::iota(idx.begin(), idx.end(), 0);
std::nth_element(idx.begin(), idx.begin() + M, idx.end(), [&](int i, int j) {
return segments[i].second < segments[j].second;
});
SizeT total_row_cnt = 0;
for (SizeT i = 0; i < M; ++i) {
ret.push_back(segments[idx[i]].first);
total_row_cnt += segments[idx[i]].second;
}
if (total_row_cnt > max_capacity) {
return {};
}
}

for (auto *compact_segment : ret) {
segments_.erase(compact_segment->segment_id());
}
auto [iter, insert_ok] = compacting_segments_map_.emplace(txn_id, ret); // copy here
if (!insert_ok) {
Expand Down Expand Up @@ -102,10 +124,13 @@ Vector<SegmentEntry *> DBTCompactionAlg::CheckCompaction(TransactionID txn_id) {
for (int layer = cur_layer_n - 1; layer >= 0; --layer) {
auto &segment_layer = segment_layers_[layer];
if (segment_layer.LayerSize() >= config_.m_) {
Vector<SegmentEntry *> compact_segments = segment_layer.PickCompacting(txn_id, config_.m_, max_segment_capacity_);
if (compact_segments.empty()) {
continue;
}
if (++running_task_n_ == 1) {
status_ = CompactionStatus::kRunning;
}
Vector<SegmentEntry *> compact_segments = segment_layer.PickCompacting(txn_id, config_.m_);

txn_2_layer_.emplace(txn_id, layer);
return compact_segments;
Expand Down
7 changes: 3 additions & 4 deletions src/storage/compaction/DBT_compaction_alg.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public:

void RemoveSegment(SegmentEntry *shrink_segment);

Vector<SegmentEntry *> PickCompacting(TransactionID txn_id, SizeT M);
Vector<SegmentEntry *> PickCompacting(TransactionID txn_id, SizeT M, SizeT layer);

void CommitCompact(TransactionID txn_id);

Expand All @@ -77,8 +77,7 @@ private:
export class DBTCompactionAlg final : public CompactionAlg {
public:
DBTCompactionAlg(int m, int c, int s, SizeT max_segment_capacity, TableEntry *table_entry = nullptr)
: CompactionAlg(), config_(m, c, s), max_layer_(config_.CalculateLayer(max_segment_capacity)), table_entry_(table_entry), running_task_n_(0) {
}
: CompactionAlg(), config_(m, c, s), max_segment_capacity_(max_segment_capacity), table_entry_(table_entry), running_task_n_(0) {}

virtual Vector<SegmentEntry *> CheckCompaction(TransactionID txn_id) override;

Expand All @@ -102,7 +101,7 @@ private:

private:
const DBTConfig config_;
const int max_layer_;
const SizeT max_segment_capacity_;
TableEntry *table_entry_;

std::mutex mtx_;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ private:
public:
static AbstractHnsw InitAbstractIndex(const IndexBase *index_base, const ColumnDef *column_def);

HnswIndexInMem(const HnswIndexInMem &) = delete;
HnswIndexInMem &operator=(const HnswIndexInMem &) = delete;

~HnswIndexInMem();

SizeT GetRowCount() const;
Expand Down
3 changes: 3 additions & 0 deletions src/storage/knn_index/sparse/abstract_bmp.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ private:
public:
static AbstractBMP InitAbstractIndex(const IndexBase *index_base, const ColumnDef *column_def);

BMPIndexInMem(const BMPIndexInMem &) = delete;
BMPIndexInMem &operator=(const BMPIndexInMem &) = delete;

~BMPIndexInMem();

SizeT GetRowCount() const;
Expand Down
3 changes: 1 addition & 2 deletions src/storage/meta/entry/segment_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@ void SegmentEntry::CommitSegment(TransactionID txn_id,
const DeleteState *delete_state) {
std::unique_lock w_lock(rw_locker_);
if (status_ == SegmentStatus::kDeprecated) {
String error_message = "Assert: Should not commit delete to deprecated segment.";
UnrecoverableError(error_message);
return;
}

if (delete_state != nullptr) {
Expand Down
4 changes: 2 additions & 2 deletions 10000 src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ Status Txn::Delete(TableEntry *table_entry, const Vector<RowID> &row_ids, bool c
this->CheckTxn(db_name);

if (check_conflict && table_entry->CheckDeleteConflict(row_ids, txn_id_)) {
LOG_WARN(fmt::format("Rollback delete in table {} due to conflict.", table_name));
RecoverableError(Status::TxnRollback(TxnID()));
String log_msg = fmt::format("Rollback delete in table {} due to conflict.", table_name);
RecoverableError(Status::TxnRollback(TxnID(), log_msg));
}

TxnTableStore *table_store = this->GetTxnTableStore(table_entry);
Expand Down
1 change: 1 addition & 0 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() {
}
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts);
LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", res, checkpointed_ts));
for (auto *txn : finished_txns_) {
res = std::min(res, txn->BeginTS());
}
Expand Down
6 changes: 4 additions & 2 deletions src/storage/txn/txn_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,10 @@ Tuple<UniquePtr<String>, Status> TxnTableStore::Compact(Vector<Pair<SharedPtr<Se
compact_state_ = TxnCompactStore(type);
for (auto &[new_segment, old_segments] : segment_data) {
auto txn_segment_store = TxnSegmentStore::AddSegmentStore(new_segment.get());
compact_state_.compact_data_.emplace_back(std::move(txn_segment_store), std::move(old_segments));

compact_state_.compact_data_.emplace_back(std::move(txn_segment_store), old_segments);
for (auto *old_segment : old_segments) {
txn_segments_store_.emplace(old_segment->segment_id(), TxnSegmentStore(old_segment));
}
this->AddSegmentStore(new_segment.get());
this->AddSealedSegment(new_segment.get());
this->flushed_segments_.emplace_back(new_segment.get());
Expand Down
Loading
0