8000 Supports cleaning up vfs by Ma-cat · Pull Request #1529 · infiniflow/infinity · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Supports cleaning up vfs #1529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions conf/infinity_conf.toml
Original file line number Diff line number Diff line change
@@ -1,56 +1,56 @@
[general]
version = "0.3.0"
version = "0.3.0"
time_zone = "utc-8"

[network]
server_address = "0.0.0.0"
postgres_port = 5432
http_port = 23820
client_port = 23817
connection_pool_size = 128
server_address = "0.0.0.0"
postgres_port = 5432
http_port = 23820
client_port = 23817
connection_pool_size = 128

[log]
log_filename = "infinity.log"
log_dir = "/var/infinity/log"
log_to_stdout = false
log_file_max_size = "10GB"
log_file_rotate_count = 10
log_filename = "infinity.log"
log_dir = "/var/infinity/log"
log_to_stdout = false
log_file_max_size = "10GB"
log_file_rotate_count = 10

# trace/info/warning/error/critical 5 log levels, default: info
log_level = "info"
log_level = "info"

[storage]
data_dir = "/var/infinity/data"
data_dir = "/var/infinity/data"

# periodically activates garbage collection:
# 0 means real-time,
# s means seconds, for example "60s", 60 seconds
# m means minutes, for example "60m", 60 minutes
# h means hours, for example "1h", 1 hour
cleanup_interval = "60s"
compact_interval = "120s"
cleanup_interval = "60s"
compact_interval = "120s"

# dump memory index entry when it reachs the capacity
mem_index_capacity = 1048576

[buffer]
buffer_manager_size = "4GB"
lru_num = 7
temp_dir = "/var/infinity/tmp"
buffer_manager_size = "4GB"
lru_num = 7
temp_dir = "/var/infinity/tmp"

[wal]
wal_dir = "/var/infinity/wal"
wal_dir = "/var/infinity/wal"
full_checkpoint_interval = "86400s"
delta_checkpoint_interval = "60s"
# delta_checkpoint_threshold = 1000000000
wal_compact_threshold = "1GB"
# delta_checkpoint_threshold = 1000000000
wal_compact_threshold = "1GB"

# flush_at_once: write and flush log each commit
# only_write: write log, OS control when to flush the log, default
# flush_per_second: logs are written after each commit and flushed to disk per second.
wal_flush = "only_write"
wal_flush = "only_write"

[resource]
resource_dir = "/var/infinity/resource"
resource_dir = "/var/infinity/resource"

[persistence]
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
EmbeddingDataType GetType() const;
Expand Down Expand Up @@ -136,7 +136,7 @@ void AnnIVFFlatIndexFileWorker<DataType>::WriteToFileImpl(bool to_spill, bool &p
}

template <typename DataType>
void AnnIVFFlatIndexFileWorker<DataType>::ReadFromFileImpl() {
void AnnIVFFlatIndexFileWorker<DataType>::ReadFromFileImpl(SizeT file_size) {
data_ = new AnnIVFFlatIndexData<DataType>();
auto *index = static_cast<AnnIVFFlatIndexData<DataType> *>(data_);
index->ReadIndexInner(*file_handler_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/bmp_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void BMPIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
prepare_success = true;
}

void BMPIndexFileWorker::ReadFromFileImpl() {
void BMPIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/bmp_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
SizeT index_size_{};
Expand Down
3 changes: 1 addition & 2 deletions src/storage/buffer/file_worker/data_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,9 @@ void DataFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
prepare_success = true; // Not run defer_fn
}

void DataFileWorker::ReadFromFileImpl() {
void DataFileWorker::ReadFromFileImpl(SizeT file_size) {
LocalFileSystem fs;

SizeT file_size = fs.GetFileSize(*file_handler_);
if (file_size < sizeof(u64) * 3) {
Status status = Status::DataIOError(fmt::format("Incorrect file length {}.", file_size));
RecoverableError(status);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/data_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
const SizeT buffer_size_;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/emvb_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void EMVBIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success)
prepare_success = true;
}

void EMVBIndexFileWorker::ReadFromFileImpl() {
void EMVBIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
if (data_) {
const auto error_message = "Data is already allocated.";
UnrecoverableError(error_message);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/emvb_index_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
const EmbeddingInfo *GetEmbeddingInfo() const;
Expand Down
19 changes: 13 additions & 6 deletions src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,17 @@ void FileWorker::ReadFromFile(bool from_spill) {
} else {
read_path = fmt::format("{}/{}", ChooseFileDir(from_spill), *file_name_);
}

SizeT file_size = 0;
u8 flags = FileFlags::READ_FLAG;
auto [file_handler, status] = fs.OpenFile(read_path, flags, FileLockType::kReadLock);
if(!status.ok()) {
UnrecoverableError(status.message());
}
if (use_object_cache) {
fs.Seek(*file_handler, obj_addr_.part_offset_);
file_size = obj_addr_.part_size_;
} else {
file_size = fs.GetFileSize(*file_handler);
}
file_handler_ = std::move(file_handler);
DeferFn defer_fn([&]() {
Expand All @@ -106,14 +109,10 @@ void FileWorker::ReadFromFile(bool from_spill) {
InfinityContext::instance().persistence_manager()->PutObjCache(obj_addr_);
}
});
ReadFromFileImpl();
ReadFromFileImpl(file_size);
}

void FileWorker::MoveFile() {
if (InfinityContext::instance().persistence_manager() != nullptr) {
LOG_DEBUG(fmt::format("Skipped MoveFile file since persistence manager is enabled: {}", *file_name_));
return;
}
LocalFileSystem fs;

String src_path = fmt::format("{}/{}", ChooseFileDir(true), *file_name_);
Expand All @@ -130,9 +129,17 @@ void FileWorker::MoveFile() {
// UnrecoverableError(fmt::format("File {} was already been created before.", dest_path));
// }
fs.Rename(src_path, dest_path);
if (InfinityContext::instance().persistence_manager() != nullptr) {
obj_addr_ = InfinityContext::instance().persistence_manager()->Persist(dest_path);
fs.DeleteFile(dest_path);
}
}

void FileWorker::CleanupFile() const {
if (InfinityContext::instance().persistence_manager() != nullptr) {
InfinityContext::instance().persistence_manager()->Cleanup(obj_addr_);
return;
}
LocalFileSystem fs;

String path = fmt::format("{}/{}", ChooseFileDir(false), *file_name_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public:
protected:
virtual void WriteToFileImpl(bool to_spill, bool &prepare_success) = 0;

virtual void ReadFromFileImpl() = 0;
virtual void ReadFromFileImpl(SizeT file_size) = 0;

private:
String ChooseFileDir(bool spill) const { return spill ? fmt::format("{}{}", *temp_dir_, *file_dir_) : *file_dir_; }
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
prepare_success = true;
}

void HnswFileWorker::ReadFromFileImpl() {
void HnswFileWorker::ReadFromFileImpl(SizeT file_size) {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
SizeT index_size_{};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void RawFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
prepare_success = true; // Not run defer_fn
}

void RawFileWorker::ReadFromFileImpl() {
void RawFileWorker::ReadFromFileImpl(SizeT file_size) {
LocalFileSystem fs;
buffer_size_ = fs.GetFileSize(*file_handler_);
data_ = static_cast<void *>(new char[buffer_size_]);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/raw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
SizeT buffer_size_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void SecondaryIndexFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_succ
}
}

void SecondaryIndexFileWorker::ReadFromFileImpl() {
void SecondaryIndexFileWorker::ReadFromFileImpl(SizeT file_size) {
if (!data_) [[likely]] {
auto index = GetSecondaryIndexData(column_def_->type(), row_count_, false);
index->ReadIndexInner(*file_handler_);
Expand Down Expand Up @@ -142,7 +142,7 @@ void SecondaryIndexFileWorkerParts::WriteToFileImpl(bool to_spill, bool &prepare
}
}

void SecondaryIndexFileWorkerParts::ReadFromFileImpl() {
void SecondaryIndexFileWorkerParts::ReadFromFileImpl(SizeT file_size) {
if (row_count_ < part_id_ * 8192) {
String error_message = fmt::format("ReadFromFileImpl: row_count_: {} < part_id_ * 8192: {}", row_count_, part_id_ * 8192);
UnrecoverableError(error_message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

const u32 row_count_{};
};
Expand All @@ -81,7 +81,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

const u32 row_count_;
const u32 part_id_;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/version_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void VersionFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success) {
}
}

void VersionFileWorker::ReadFromFileImpl() {
void VersionFileWorker::ReadFromFileImpl(SizeT file_size) {
if (data_ != nullptr) {
String error_message = "Data is already allocated.";
UnrecoverableError(error_message);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/version_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public:
protected:
void WriteToFileImpl(bool to_spill, bool &prepare_success) override;

void ReadFromFileImpl() override;
void ReadFromFileImpl(SizeT file_size) override;

private:
SizeT capacity_{};
Expand Down
30 changes: 22 additions & 8 deletions src/storage/meta/entry/chunk_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import secondary_index_file_worker;
import emvb_index_file_worker;
import bmp_index_file_worker;
import column_def;
import infinity_context;

namespace infinity {

Expand Down Expand Up @@ -292,15 +293,28 @@ void ChunkIndexEntry::Cleanup() {
const auto &index_dir = segment_index_entry_->index_dir();
const IndexBase *index_base = table_index_entry->index_base();
if (index_base->index_type_ == IndexType::kFullText) {
Path path = Path(*index_dir) / base_name_;
String index_prefix = path.string();
String posting_file = index_prefix + POSTING_SUFFIX;
String dict_file = index_prefix + DICT_SUFFIX;
if (InfinityContext::instance().persistence_manager() != nullptr) {
// TODO cleanup fulltext file obj,
Path path = Path(*index_dir) / base_name_;
String index_prefix = path.string();
String posting_file = index_prefix + POSTING_SUFFIX;
String dict_file = index_prefix + DICT_SUFFIX;

LocalFileSystem fs;
fs.DeleteFile(posting_file);
fs.DeleteFile(dict_file);
LOG_DEBUG(fmt::format("Cleaned chunk index entry {}, posting: {}, dictionary file: {}", index_prefix, posting_file, dict_file));
LocalFileSystem fs;
fs.DeleteFile(posting_file);
fs.DeleteFile(dict_file);
LOG_DEBUG(fmt::format("Cleaned chunk index entry {}, posting: {}, dictionary file: {}", index_prefix, posting_file, dict_file));
} else {
Path path = Path(*index_dir) / base_name_;
String index_prefix = path.string();
String posting_file = index_prefix + POSTING_SUFFIX;
String dict_file = index_prefix + DICT_SUFFIX;

LocalFileSystem fs;
fs.DeleteFile(posting_file);
fs.DeleteFile(dict_file);
LOG_DEBUG(fmt::format("Cleaned chunk index entry {}, posting: {}, dictionary file: {}", index_prefix, posting_file, dict_file));
}
} else {
LOG_DEBUG(fmt::format("Cleaned chunk index entry {}/{}", *index_dir, chunk_id_));
}
Expand Down
Loading
Loading
0