diff --git a/src/common/status.cpp b/src/common/status.cpp index 64d163f074..174133587a 100644 --- a/src/common/status.cpp +++ b/src/common/status.cpp @@ -408,8 +408,8 @@ Status Status::TransactionNotFound(TransactionID txn_id) { } // 4. TXN fail -Status Status::TxnRollback(u64 txn_id) { - return Status(ErrorCode::kTxnRollback, MakeUnique(fmt::format("Transaction: {} is rollback", txn_id))); +Status Status::TxnRollback(u64 txn_id, const String &rollback_reason) { + return Status(ErrorCode::kTxnRollback, MakeUnique(fmt::format("Transaction: {} is rollback. {}", txn_id, rollback_reason))); } Status Status::TxnConflict(u64 txn_id, const String &conflict_reason) { diff --git a/src/common/status.cppm b/src/common/status.cppm index 230a40a42b..16a1b92b50 100644 --- a/src/common/status.cppm +++ b/src/common/status.cppm @@ -275,7 +275,7 @@ public: static Status TransactionNotFound(TransactionID txn_id); // 4. TXN fail - static Status TxnRollback(u64 txn_id); + static Status TxnRollback(u64 txn_id, const String &rollback_reason = "no reanson gived"); static Status TxnConflict(u64 txn_id, const String &conflict_reason); // 5. Insufficient resource or exceed limits diff --git a/src/executor/operator/physical_compact.cpp b/src/executor/operator/physical_compact.cpp index 9082880978..455db7a9ca 100644 --- a/src/executor/operator/physical_compact.cpp +++ b/src/executor/operator/physical_compact.cpp @@ -33,6 +33,7 @@ import default_values; import logger; import infinity_exception; import third_party; +import status; namespace infinity { @@ -121,18 +122,23 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat const Vector &candidate_segments = compact_operator_state->segment_groups_[group_idx]; Vector compactible_segments; { + String log_str = fmt::format("PhysicalCompact::Execute: group_idx: {}, candidate_segments: ", group_idx); for (auto *candidate_segment : candidate_segments) { if (candidate_segment->TrySetCompacting(compact_state_data)) { compactible_segments.push_back(candidate_segment); } + log_str += fmt::format("{}, ", candidate_segment->segment_id()); } + LOG_INFO(log_str); } auto *txn = query_context->GetTxn(); + if (compactible_segments.empty()) { + RecoverableError(Status::TxnRollback(txn->TxnID(), "No segment to compact.")); + } auto *txn_mgr = txn->txn_mgr(); auto *buffer_mgr = query_context->storage()->buffer_manager(); TxnTimeStamp scan_ts = txn_mgr->GetNewTimeStamp(); - LOG_INFO(fmt::format("PhysicalCompact::Execute: scan_ts: {}", scan_ts)); TableEntry *table_entry = base_table_ref_->table_entry_ptr_; BlockIndex *block_index = base_table_ref_->block_index_.get(); @@ -141,6 +147,7 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat auto new_segment = SegmentEntry::NewSegmentEntry(table_entry, Catalog::GetNextSegmentID(table_entry), txn); SegmentID new_segment_id = new_segment->segment_id(); + LOG_INFO(fmt::format("PhysicalCompact::Execute: txn_id: {}, scan_ts: {}, new segment id: {}", txn->TxnID(), scan_ts, new_segment_id)); UniquePtr new_block = BlockEntry::NewBlockEntry(new_segment.get(), new_segment->GetNextBlockID(), 0 /*checkpoint_ts*/, column_count, txn); @@ -190,6 +197,9 @@ bool PhysicalCompact::Execute(QueryContext *query_context, OperatorState *operat if (new_block->row_count() > 0) { new_segment->AppendBlockEntry(std::move(new_block)); } + if (new_segment->actual_row_count() > new_segment->row_capacity()) { + UnrecoverableError(fmt::format("Compact segment {} error because of row count overflow.", new_segment_id)); + } compact_state_data->AddNewSegment(new_segment, std::move(compactible_segments), txn); compact_operator_state->compact_idx_ = ++group_idx; if (group_idx == compact_operator_state->segment_groups_.size()) { diff --git a/src/storage/compaction/DBT_compaction_alg.cpp b/src/storage/compaction/DBT_compaction_alg.cpp index 9b48ff14cb..89066ac5e1 100644 --- a/src/storage/compaction/DBT_compaction_alg.cpp +++ b/src/storage/compaction/DBT_compaction_alg.cpp @@ -48,13 +48,35 @@ void SegmentLayer::RemoveSegment(SegmentEntry *shrink_segment) { } } -Vector SegmentLayer::PickCompacting(TransactionID txn_id, SizeT M) { +Vector SegmentLayer::PickCompacting(TransactionID txn_id, SizeT M, SizeT max_capacity) { + SizeT segment_n = segments_.size(); + if (segment_n < M) { + UnrecoverableError(fmt::format("SegmentLayer::PickCompacting error.")); + } + Vector ret; - SizeT pick_n = std::min(M, segments_.size()); - for (SizeT i = 0; i < pick_n; ++i) { - auto iter = segments_.begin(); - ret.push_back(iter->second); - segments_.erase(iter); + { + Vector> segments; + for (auto &[segment_id, segment_entry] : segments_) { + segments.emplace_back(segment_entry, segment_entry->actual_row_count()); + } + Vector idx(segment_n); + std::iota(idx.begin(), idx.end(), 0); + std::nth_element(idx.begin(), idx.begin() + M, idx.end(), [&](int i, int j) { + return segments[i].second < segments[j].second; + }); + SizeT total_row_cnt = 0; + for (SizeT i = 0; i < M; ++i) { + ret.push_back(segments[idx[i]].first); + total_row_cnt += segments[idx[i]].second; + } + if (total_row_cnt > max_capacity) { + return {}; + } + } + + for (auto *compact_segment : ret) { + segments_.erase(compact_segment->segment_id()); } auto [iter, insert_ok] = compacting_segments_map_.emplace(txn_id, ret); // copy here if (!insert_ok) { @@ -102,10 +124,13 @@ Vector DBTCompactionAlg::CheckCompaction(TransactionID txn_id) { for (int layer = cur_layer_n - 1; layer >= 0; --layer) { auto &segment_layer = segment_layers_[layer]; if (segment_layer.LayerSize() >= config_.m_) { + Vector compact_segments = segment_layer.PickCompacting(txn_id, config_.m_, max_segment_capacity_); + if (compact_segments.empty()) { + continue; + } if (++running_task_n_ == 1) { status_ = CompactionStatus::kRunning; } - Vector compact_segments = segment_layer.PickCompacting(txn_id, config_.m_); txn_2_layer_.emplace(txn_id, layer); return compact_segments; diff --git a/src/storage/compaction/DBT_compaction_alg.cppm b/src/storage/compaction/DBT_compaction_alg.cppm index 619b716c15..ae1c1024cb 100644 --- a/src/storage/compaction/DBT_compaction_alg.cppm +++ b/src/storage/compaction/DBT_compaction_alg.cppm @@ -59,7 +59,7 @@ public: void RemoveSegment(SegmentEntry *shrink_segment); - Vector PickCompacting(TransactionID txn_id, SizeT M); + Vector PickCompacting(TransactionID txn_id, SizeT M, SizeT layer); void CommitCompact(TransactionID txn_id); @@ -77,8 +77,7 @@ private: export class DBTCompactionAlg final : public CompactionAlg { public: DBTCompactionAlg(int m, int c, int s, SizeT max_segment_capacity, TableEntry *table_entry = nullptr) - : CompactionAlg(), config_(m, c, s), max_layer_(config_.CalculateLayer(max_segment_capacity)), table_entry_(table_entry), running_task_n_(0) { - } + : CompactionAlg(), config_(m, c, s), max_segment_capacity_(max_segment_capacity), table_entry_(table_entry), running_task_n_(0) {} virtual Vector CheckCompaction(TransactionID txn_id) override; @@ -102,7 +101,7 @@ private: private: const DBTConfig config_; - const int max_layer_; + const SizeT max_segment_capacity_; TableEntry *table_entry_; std::mutex mtx_; diff --git a/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm b/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm index f3a87dce22..d8b172ec23 100644 --- a/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm +++ b/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm @@ -148,6 +148,9 @@ private: public: static AbstractHnsw InitAbstractIndex(const IndexBase *index_base, const ColumnDef *column_def); + HnswIndexInMem(const HnswIndexInMem &) = delete; + HnswIndexInMem &operator=(const HnswIndexInMem &) = delete; + ~HnswIndexInMem(); SizeT GetRowCount() const; diff --git a/src/storage/knn_index/sparse/abstract_bmp.cppm b/src/storage/knn_index/sparse/abstract_bmp.cppm index f7a676dba1..94f550da4b 100644 --- a/src/storage/knn_index/sparse/abstract_bmp.cppm +++ b/src/storage/knn_index/sparse/abstract_bmp.cppm @@ -96,6 +96,9 @@ private: public: static AbstractBMP InitAbstractIndex(const IndexBase *index_base, const ColumnDef *column_def); + BMPIndexInMem(const BMPIndexInMem &) = delete; + BMPIndexInMem &operator=(const BMPIndexInMem &) = delete; + ~BMPIndexInMem(); SizeT GetRowCount() const; diff --git a/src/storage/meta/entry/segment_entry.cpp b/src/storage/meta/entry/segment_entry.cpp index 1d8788bef8..306d997709 100644 --- a/src/storage/meta/entry/segment_entry.cpp +++ b/src/storage/meta/entry/segment_entry.cpp @@ -394,8 +394,7 @@ void SegmentEntry::CommitSegment(TransactionID txn_id, const DeleteState *delete_state) { std::unique_lock w_lock(rw_locker_); if (status_ == SegmentStatus::kDeprecated) { - String error_message = "Assert: Should not commit delete to deprecated segment."; - UnrecoverableError(error_message); + return; } if (delete_state != nullptr) { diff --git a/src/storage/txn/txn.cpp b/src/storage/txn/txn.cpp index c85ebaf35b..a4d6d7a5e4 100644 --- a/src/storage/txn/txn.cpp +++ b/src/storage/txn/txn.cpp @@ -110,8 +110,8 @@ Status Txn::Delete(TableEntry *table_entry, const Vector &row_ids, bool c this->CheckTxn(db_name); if (check_conflict && table_entry->CheckDeleteConflict(row_ids, txn_id_)) { - LOG_WARN(fmt::format("Rollback delete in table {} due to conflict.", table_name)); - RecoverableError(Status::TxnRollback(TxnID())); + String log_msg = fmt::format("Rollback delete in table {} due to conflict.", table_name); + RecoverableError(Status::TxnRollback(TxnID(), log_msg)); } TxnTableStore *table_store = this->GetTxnTableStore(table_entry); diff --git a/src/storage/txn/txn_manager.cpp b/src/storage/txn/txn_manager.cpp index 285f9fd88d..3b943eec30 100644 --- a/src/storage/txn/txn_manager.cpp +++ b/src/storage/txn/txn_manager.cpp @@ -291,6 +291,7 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() { } TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS(); TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts); + LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", res, checkpointed_ts)); for (auto *txn : finished_txns_) { res = std::min(res, txn->BeginTS()); } diff --git a/src/storage/txn/txn_store.cpp b/src/storage/txn/txn_store.cpp index 89281a5d51..db6d074bd7 100644 --- a/src/storage/txn/txn_store.cpp +++ b/src/storage/txn/txn_store.cpp @@ -251,8 +251,10 @@ Tuple, Status> TxnTableStore::Compact(Vectorsegment_id(), TxnSegmentStore(old_segment)); + } this->AddSegmentStore(new_segment.get()); this->AddSealedSegment(new_segment.get()); this->flushed_segments_.emplace_back(new_segment.get());