8000 Optimize slice handling to accelerate the large batch transfer operation by SCDESPERTATE · Pull Request #557 · kvcache-ai/Mooncake · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Optimize slice handling to accelerate the large batch transfer operation #557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion doc/en/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,4 +438,5 @@ For advanced users, TransferEngine provides the following advanced runtime optio
- `MC_HANDSHAKE_LISTEN_BACKLOG` The backlog size of socket listening for handshaking, default value is 128
- `MC_LOG_DIR` Specify the directory path for log redirection files. If invalid, log to stderr instead.
- `MC_REDIS_PASSWORD` The password for Redis storage plugin, only takes effect when Redis is specified as the metadata server. If not set, no authentication will be attempted to log in to the Redis.
- `MC_REDIS_DB_INDEX` The database index for Redis storage plugin, must be an integer between 0 and 255. Only takes effect when Redis is specified as the metadata server. If not set or invalid, the default value is 0.
- `MC_REDIS_DB_INDEX` The database index for Redis storage plugin, must be an integer between 0 and 255. Only takes effect when Redis is specified as the metadata server. If not set or invalid, the default value is 0.
- `MC_FRAGMENT_RATIO ` In RdmaTransport::submitTransferTask, if the last data piece after division is ≤ 1/MC_FRAGMENT_RATIO of the block size, it merges with the previous block to reduce overhead. The default value is 4
1 change: 1 addition & 0 deletions doc/zh/transfer-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,4 @@ int init(const std::string &metadata_conn_string,
- `MC_LOG_DIR` 该选项指定存放日志重定向文件的目录路径。如果路径无效,glog将回退到向标准错误[stderr]输出日志。
- `MC_REDIS_PASSWORD` Redis 存储插件的密码,仅在指定 Redis 作为 metadata server 时生效。如果未设置,将不会尝试进行密码认证登录 Redis。
- `MC_REDIS_DB_INDEX` Redis 存储插件的数据库索引,必须为 0 到 255 之间的整数。仅在指定 Redis 作为 metadata server 时生效。如果未设置或无效,默认值为 0。
- `MC_FRAGMENT_RATIO` 在将RdmaTransport::submitTransferTask中切割传输任务为传输块时,当切割完成后最后一块数据大小小于等于切割块大小的1/MC_FRAGMENT_RATIO,最后一块数据将合并进前一块的切割块进行传输以减少开销,默认值为4。
1 change: 1 addition & 0 deletions mooncake-transfer-engine/include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct GlobalConfig {
bool trace = false;
int64_t slice_timeout = -1;
bool use_ipv6 = false;
size_t fragment_limit = 16384;
};

8000 void loadGlobalConfig(GlobalConfig &config);
Expand Down
13 changes: 10 additions & 3 deletions mooncake-transfer-engine/include/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class Transport {
std::string peer_nic_path;
SliceStatus status;
TransferTask *task;
bool from_cache;

union {
struct {
Expand Down Expand Up @@ -154,12 +155,18 @@ class Transport {
}

Slice *allocate() {
Slice *slice;

if (head_ - tail_ == 0) {
allocated_++;
return new Slice();
slice = new Slice();
slice->from_cache = false;
} else {
slice = lazy_delete_slices_[tail_ % kLazyDeleteSliceCapacity];
tail_++;
slice->from_cache = true;
}
auto slice = lazy_delete_slices_[tail_ % kLazyDeleteSliceCapacity];
tail_++;

return slice;
}

Expand Down
12 changes: 12 additions & 0 deletions mooncake-transfer-engine/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,18 @@ void loadGlobalConfig(GlobalConfig &config) {
if (std::getenv("MC_USE_IPV6")) {
config.use_ipv6 = true;
}

const char *fragment_ratio = std::getenv("MC_FRAGMENT_RATIO");
if (fragment_ratio) {
size_t val = atoi(fragment_ratio);
if (val > 0 && val < config.slice_size)
config.fragment_limit = config.slice_size / val;
else {
LOG(WARNING)
<< "Ignore value from environment variable MC_FRAGMENT_RATIO and set it to 4 as default";
config.fragment_limit = config.slice_size / 4;
}
}
}
< 8000 span class='blob-code-inner blob-code-marker ' data-code-marker=" ">
std::string mtuLengthToString(ibv_mtu mtu) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,26 @@ Status RdmaTransport::submitTransferTask(
assert(local_segment_desc.get());
const size_t kBlockSize = globalConfig().slice_size;
const int kMaxRetryCount = globalConfig().retry_cnt;
const size_t kFragmentSize = globalConfig().fragment_limit;
const size_t kSubmitWatermark = globalConfig().max_wr * globalConfig().num_qp_per_ep;
uint64_t nr_slices;
for (size_t index = 0; index < request_list.size(); ++index) {
assert(request_list[index] && task_list[index]);
auto &request = *request_list[index];
auto &task = *task_list[index];
nr_slices = 0;
for (uint64_t offset = 0; offset < request.length;
offset += kBlockSize) {
Slice *slice = getSliceCache().allocate();
assert(slice);
if (!slice->from_cache) {
Copy link
5D32 Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this line? please add comments for it.

nr_slices++;
}

bool merge_final_slice = request.length - offset <= kBlockSize + kFragmentSize;

slice->source_addr = (char *)request.source + offset;
slice->length = std::min(request.length - offset, kBlockSize);
slice->length = merge_final_slice ? request.length - offset : kBlockSize;
slice->opcode = request.opcode;
slice->rdma.dest_addr = request.target_offset + offset;
slice->rdma.retry_cnt = request.advise_retry_cnt;
Expand Down Expand Up @@ -320,17 +330,31 @@ Status RdmaTransport::submitTransferTask(
auto source_addr = slice->source_addr;
for (auto &entry : slices_to_post)
for (auto s : entry.second) getSliceCache().deallocate(s);
nr_slices = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless line?

LOG(ERROR)
<< "Memory region not registered by any active device(s): "
<< source_addr;
return Status::AddressNotRegistered(
"Memory region not registered by any active device(s): " +
std::to_string(reinterpret_cast<uintptr_t>(source_addr)));
}

if (nr_slices >= kSubmitWatermark) {
for (auto &entry : slices_to_post)
entry.first->submitPostSend(entry.second);
slices_to_post.clear();
nr_slices = 0;
}

if (merge_final_slice) {
break;
}
}
}

for (auto &entry : slices_to_post)
entry.first->submitPostSend(entry.second);
if (!entry.second.empty())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this if?

entry.first->submitPostSend(entry.second);
return Status::OK();
}

Expand Down
Loading
0