8000 Refactor meta data by JinHai-CN · Pull Request #1459 · infiniflow/infinity · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Refactor meta data #1459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading 8000
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "Debug")
set(CMAKE_C_FLAGS "-O0 -g")

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-stack-protector -fno-var-tracking ")
add_compile_options(-fsanitize=address -fsanitize-recover=all -fsanitize=leak)
add_link_options(-fsanitize=address -fsanitize-recover=all -fsanitize=leak)
add_compile_options(-fsanitize=address -fsanitize-recover=all)
add_link_options(-fsanitize=address -fsanitize-recover=all)

option(LEAK "Memory leak detection" OFF)
message("Check memory leak: " "${LEAK}")
Expand Down
13 changes: 6 additions & 7 deletions docs/getstarted/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ See [Build from Source](https://infiniflow.org/docs/dev/build_from_source).

### Try our Python examples

- [Create table, insert data, and search](../../example/simple_example.py)
- [Import file and export data](../../example/import_data.py)
- [Delete or update data](../../example/delete_update_data.py)
- [Conduct a vector search](../../example/vector_search.py)
- [Conduct a full-text search](../../example/fulltext_search.py)
- [Conduct a fused search](../../example/fusion_search.py)
- [ColBERT reranker examples](../../example)
- [Create table, insert data, and search](https://github.com/infiniflow/infinity/blob/main/example/simple_example.py)
- [Import file and export data](https://github.com/infiniflow/infinity/blob/main/example/import_data.py)
- [Delete or update data](https://github.com/infiniflow/infinity/blob/main/example/delete_update_data.py)
- [Conduct a vector search](https://github.com/infiniflow/infinity/blob/main/example/vector_search.py)
- [Conduct a full-text search](https://github.com/infiniflow/infinity/blob/main/example/fulltext_search.py)
- [Conduct a fused search](https://github.com/infiniflow/infinity/blob/main/example/fusion_search.py)

## Python API reference

Expand Down
3 changes: 1 addition & 2 deletions src/storage/meta/db_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ DBMeta::GetDatabaseInfo(std::shared_lock<std::shared_mutex> &&r_lock, Transactio
}

SharedPtr<String> DBMeta::ToString() {
std::shared_lock<std::shared_mutex> r_locker(this->rw_locker());
SharedPtr<String> res = MakeShared<String>(
fmt::format("DBMeta, data_dir: {}, db name: {}, entry count: ", *this->data_dir_, *this->db_name_, this->db_entry_list().size()));
fmt::format("DBMeta, data_dir: {}, db name: {}, entry count: ", *this->data_dir_, *this->db_name_, db_entry_list_.size()));
return res;
}

Expand Down
35 changes: 35 additions & 0 deletions src/storage/meta/entry/base_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,41 @@ export enum class EntryType : i8 {
kBlockColumn,
};

export String ToString(EntryType entry_type) {
switch(entry_type) {
case EntryType::kDatabase: {
return "database";
}
case EntryType::kTable: {
return "table";
}
case EntryType::kTableIndex: {
return "table_index";
}
case EntryType::kSegmentIndex: {
return "segment_index";
}
case EntryType::kChunkIndex: {
return "chunk_index";
}
case EntryType::kView: {
return "view";
}
case EntryType::kColumn: {
return "column";
}
case EntryType::kSegment: {
return "segment";
}
case EntryType::kBlock: {
return "block";
}
case EntryType::kBlockColumn: {
return "block_column";
}
}
}

export struct BaseEntry {
explicit BaseEntry(EntryType entry_type, bool is_delete, String encode)
: deleted_(is_delete), entry_type_(entry_type), base_dir_(nullptr), encode_(MakeUnique<String>(std::move(encode))) {}
Expand Down
42 changes: 28 additions & 14 deletions src/storage/meta/entry/entry_list.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,42 @@ public:

void Iterate(std::function<void(Entry *)> func, TxnTimeStamp visible_ts);

bool Empty() {
inline bool Empty() const {
std::shared_lock lock(rw_locker_);
return entry_list_.empty();
}

inline SizeT size() const {
std::shared_lock lock(rw_locker_);
return entry_list_.size();
}
inline void EDBE PushFrontEntry(const SharedPtr<Entry>& entry) {
std::unique_lock lock(rw_locker_);
entry_list_.push_front(entry);
}

inline void PushBackEntry(const SharedPtr<Entry>& entry) {
std::unique_lock lock(rw_locker_);
entry_list_.push_back(entry);
}
private:
// helper
FindResult FindEntry(TransactionID txn_id, TxnTimeStamp begin_ts, TxnManager *txn_mgr);
FindResult FindEntryNoLock(TransactionID txn_id, TxnTimeStamp begin_ts, TxnManager *txn_mgr);

FindResult FindEntryReplay(TransactionID txn_id, TxnTimeStamp begin_ts);

Pair<Entry *, FindResult> GetEntryInner1(TransactionID txn_id, TxnTimeStamp begin_ts);
Pair<Entry *, FindResult> GetEntryInner1NoLock(TransactionID txn_id, TxnTimeStamp begin_ts);

Pair<Entry *, Status> GetEntryInner2(Entry *entry_ptr, FindResult find_res);
Pair<Entry *, Status> GetEntryInner2NoLock(Entry *entry_ptr, FindResult find_res);

public: // TODO: make both private
std::shared_mutex rw_locker_{};
mutable std::shared_mutex rw_locker_{};

List<SharedPtr<Entry>> entry_list_;
};

template <EntryConcept Entry>
FindResult EntryList<Entry>::FindEntry(TransactionID txn_id, TxnTimeStamp begin_ts, TxnManager *txn_mgr) {
FindResult EntryList<Entry>::FindEntryNoLock(TransactionID txn_id, TxnTimeStamp begin_ts, TxnManager *txn_mgr) {
FindResult find_res = FindResult::kNotFound;
bool continue_loop = true;
for (auto iter = entry_list_.begin(); iter != entry_list_.end() && continue_loop; ++iter) {
Expand Down Expand Up @@ -194,7 +207,7 @@ Tuple<Entry *, Status> EntryList<Entry>::AddEntry(std::shared_lock<std::shared_m
ConflictType conflict_type) {
std::unique_lock lock(rw_locker_);
parent_r_lock.unlock();
FindResult find_res = this->FindEntry(txn_id, begin_ts, txn_mgr);
FindResult find_res = this->FindEntryNoLock(txn_id, begin_ts, txn_mgr);
switch (find_res) {
case FindResult::kUncommittedDelete:
case FindResult::kNotFound: {
Expand Down Expand Up @@ -254,7 +267,7 @@ Tuple<SharedPtr<Entry>, Status> EntryList<Entry>::DropEntry(std::shared_lock<std
ConflictType conflict_type) {
std::unique_lock lock(rw_locker_);
parent_r_lock.unlock();
FindResult find_res = this->FindEntry(txn_id, begin_ts, txn_mgr);
FindResult find_res = this->FindEntryNoLock(txn_id, begin_ts, txn_mgr);
switch (find_res) {
case FindResult::kUncommittedDelete:
case FindResult::kNotFound: {
Expand Down Expand Up @@ -306,7 +319,7 @@ Tuple<SharedPtr<Entry>, Status> EntryList<Entry>::DropEntry(std::shared_lock<std
}

template <EntryConcept Entry>
Pair<Entry *, FindResult> EntryList<Entry>::GetEntryInner1(TransactionID txn_id, TxnTimeStamp begin_ts) {
Pair<Entry *, FindResult> EntryList<Entry>::GetEntryInner1NoLock(TransactionID txn_id, TxnTimeStamp begin_ts) {
Entry *entry_ptr = nullptr;
FindResult find_res = FindResult::kNotFound;
for (const auto &entry : entry_list_) {
Expand Down Expand Up @@ -334,7 +347,7 @@ Pair<Entry *, FindResult> EntryList<Entry>::GetEntryInner1(TransactionID txn_id,
}

template <EntryConcept Entry>
Pair<Entry *, Status> EntryList<Entry>::GetEntryInner2(Entry *entry_ptr, FindResult find_res) {
Pair<Entry *, Status> EntryList<Entry>::GetEntryInner2NoLock(Entry *entry_ptr, FindResult find_res) {
switch (find_res) {
case FindResult::kNotFound: {
auto err_msg = MakeUnique<String>("Not existed entry.");
Expand Down Expand Up @@ -367,18 +380,18 @@ template <EntryConcept Entry>
Tuple<Entry *, Status> EntryList<Entry>::GetEntry(std::shared_lock<std::shared_mutex> &&parent_lock, TransactionID txn_id, TxnTimeStamp begin_ts) {
std::shared_lock r_lock(rw_locker_);
parent_lock.unlock();
auto [entry_ptr, find_res] = this->GetEntryInner1(txn_id, begin_ts);
auto [entry_ptr, find_res] = this->GetEntryInner1NoLock(txn_id, begin_ts);
r_lock.unlock();

return this->GetEntryInner2(entry_ptr, find_res);
return this->GetEntryInner2NoLock(entry_ptr, find_res);
}

template <EntryConcept Entry>
Tuple<Entry *, Status> EntryList<Entry>::GetEntryNolock(TransactionID txn_id, TxnTimeStamp begin_ts) {
std::shared_lock r_lock(rw_locker_);
auto [entry_ptr, find_res] = this->GetEntryInner1(txn_id, begin_ts);
auto [entry_ptr, find_res] = this->GetEntryInner1NoLock(txn_id, begin_ts);
r_lock.unlock();
return this->GetEntryInner2(entry_ptr, find_res);
return this->GetEntryInner2NoLock(entry_ptr, find_res);
}

template <EntryConcept Entry>
Expand Down Expand Up @@ -505,6 +518,7 @@ bool EntryList<Entry>::PickCleanup(CleanupScanner *scanner) {
return entry_list_.empty();
}

// TODO: check if this need to lock
template <EntryConcept Entry>
void EntryList<Entry>::Cleanup() {
for (auto iter = entry_list_.begin(); iter != entry_list_.end(); ++iter) {
Expand Down
10 changes: 10 additions & 0 deletions src/storage/meta/entry/segment_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ export enum class SegmentStatus : u8 {
kDeprecated,
};

export String ToString(SegmentStatus segment_status) {
switch(segment_status) {
case SegmentStatus::kUnsealed: return "Unsealed";
case SegmentStatus::kSealed: return "Sealed";
case SegmentStatus::kCompacting: return "Compacting";
case SegmentStatus::kNoDelete: return "NoDelete";
case SegmentStatus::kDeprecated: return "Deprecated";
}
}

export struct SegmentEntry : public BaseEntry, public EntryInterface {
public:
friend class BlockEntryIter;
Expand Down
8 changes: 6 additions & 2 deletions src/storage/meta/meta_map.cppm
10000
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public: // TODO: make both private

template <MetaConcept Meta>
Tuple<Meta *, std::shared_lock<std::shared_mutex>> MetaMap<Meta>::GetMeta(const String &name, std::function<UniquePtr<Meta>()> &&init_func) {
Meta* return_meta_ptr{nullptr};
std::shared_lock r_lock(rw_locker_);
auto iter = meta_map_.find(name);
if (iter == meta_map_.end()) {
Expand All @@ -89,13 +90,16 @@ Tuple<Meta *, std::shared_lock<std::shared_mutex>> MetaMap<Meta>::GetMeta(const
} else {
LOG_TRACE("Add new entry in existed meta_map");
iter = meta_map_.emplace(name, std::move(new_meta)).first;

}
return_meta_ptr = iter->second.get();
}
r_lock.lock();
r_lock.lock(); // FIXME: This lock gap might introduce bug.
} else {
LOG_TRACE("Add new entry in existed meta_map");
return_meta_ptr = iter->second.get();
}
return {iter->second.get(), std::move(r_lock)};
return {return_meta_ptr, std::move(r_lock)};
}

template <MetaConcept Meta>
Expand Down
1 change: 1 addition & 0 deletions src/storage/meta/table_index_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import extra_ddl_info;
import local_file_system;
import txn;
import create_index_info;
import base_entry;

namespace infinity {

Expand Down
3 changes: 1 addition & 2 deletions src/storage/meta/table_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ TableEntry *TableMeta::GetEntryReplay(TransactionID txn_id, TxnTimeStamp begin_t
const SharedPtr<String> &TableMeta::db_name_ptr() const { return db_entry_->db_name_ptr(); }

SharedPtr<String> TableMeta::ToString() {
std::shared_lock<std::shared_mutex> r_locker(this->rw_locker());
SharedPtr<String> res = MakeShared<String>(
fmt::format("TableMeta, db_entry_dir: {}, table name: {}, entry count: ", *db_entry_dir_, *table_name_, table_entry_list().size()));
fmt::format("TableMeta, db_entry_dir: {}, table name: {}, entry count: ", *db_entry_dir_, *table_name_, table_entry_list_.size()));
return res;
}

Expand Down
6 changes: 4 additions & 2 deletions src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -992,12 +992,14 @@ void WalManager::WalCmdCompactReplay(const WalCmdCompact &cmd, TransactionID txn
for (const SegmentID segment_id : cmd.deprecated_segment_ids_) {
auto segment_entry = table_entry->GetSegmentByID(segment_id, commit_ts);
if (!segment_entry->TrySetCompacting(nullptr)) { // fake set because check
String error_message = "Assert: Replay segment should be compactable.";
String error_message = fmt::format("Replaying segment: {} from table: {} with status: {}, can't be compacted",
segment_id, cmd.table_name_, ToString(segment_entry->status()));
LOG_CRITICAL(error_message);
UnrecoverableError(error_message);
}
if (!segment_entry->SetNoDelete()) {
String error_message = "Assert: Replay segment should be compactable.";
String error_message = fmt::format("Replaying segment: {} from table: {} can't be set no delete, can't be compacted",
segment_id, cmd.table_name_);
LOG_CRITICAL(error_message);
UnrecoverableError(error_message);
}
Expand Down
Loading
0