8000 Using Read/Write Lock guard in all corresponding places by toktarev · Pull Request #7148 · facebook/rocksdb · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Using Read/Write Lock guard in all corresponding places #7148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 39 additions & 35 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <array>
#include <limits>
#include <memory>

#include "db/dbformat.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
Expand Down Expand Up @@ -239,9 +240,8 @@ int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
return comparator.CompareKeySeq(k1, k2);
}

int MemTable::KeyComparator::operator()(const char* prefix_len_key,
const KeyComparator::DecodedType& key)
const {
int MemTable::KeyComparator::operator()(
const char* prefix_len_key, const KeyComparator::DecodedType& key) const {
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(prefix_len_key);
return comparator.CompareKeySeq(a, key);
Expand Down Expand Up @@ -623,6 +623,35 @@ struct Saver {
};
} // namespace

static void mergeValue(Saver* s, const char* key_ptr, uint32_t key_length,
const MergeOperator* merge_operator,
MergeContext* merge_context) {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
if (s->do_merge) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger, s->statistics,
s->env_, nullptr /* result_operand */, true);
}
} else {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
}
} else if (!s->do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
} else if (s->value != nullptr) {
s->value->assign(v.data(), v.size());
}
}

static bool SaveValue(void* arg, const char* entry) {
Saver* s = reinterpret_cast<Saver*>(arg);
assert(s != nullptr);
Expand Down Expand Up @@ -684,35 +713,12 @@ static bool SaveValue(void* arg, const char* entry) {
FALLTHROUGH_INTENDED;
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
if (s->do_merge) {
if (s->value != nullptr) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), &v,
merge_context->GetOperands(), s->value, s->logger,
s->statistics, s->env_, nullptr /* result_operand */, true);
}
} else {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
}
} else if (!s->do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
} else if (s->value != nullptr) {
s->value->assign(v.data(), v.size());
}
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadUnlock();
ReadLock aReadLock(s->mem->GetLock(s->key->user_key()));
mergeValue(s, key_ptr, key_length, merge_operator, merge_context);
} else {
mergeValue(s, key_ptr, key_length, merge_operator, merge_context);
}

*(s->found_final_value) = true;
if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = (type == kTypeBlobIndex);
Expand Down Expand Up @@ -948,8 +954,7 @@ void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
PERF_COUNTER_ADD(get_from_memtable_count, 1);
}

void MemTable::Update(SequenceNumber seq,
const Slice& key,
void MemTable::Update(SequenceNumber seq, const Slice& key,
const Slice& value) {
LookupKey lkey(key, seq);
Slice mem_key = lkey.memtable_key();
Expand Down Expand Up @@ -1007,8 +1012,7 @@ void MemTable::Update(SequenceNumber seq,
assert(add_res);
}

bool MemTable::UpdateCallback(SequenceNumber seq,
const Slice& key,
bool MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
const Slice& delta) {
LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key();
Expand Down
86 changes: 45 additions & 41 deletions memtable/vectorrep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
// (found in the LICENSE.Apache file in the root directory).
//
#ifndef ROCKSDB_LITE
#include "rocksdb/memtablerep.h"

#include <unordered_set>
#include <set>
#include <memory>
#include <algorithm>
#include <memory>
#include <set>
#include <type_traits>
#include <unordered_set>

#include "db/memtable.h"
#include "memory/arena.h"
#include "memtable/stl_wrappers.h"
#include "port/port.h"
#include "rocksdb/memtablerep.h"
#include "util/mutexlock.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -50,13 +49,14 @@ class VectorRep : public MemTableRep {
std::shared_ptr<std::vector<const char*>> bucket_;
std::vector<const char*>::const_iterator mutable cit_;
const KeyComparator& compare_;
std::string tmp_; // For passing to EncodeKey
std::string tmp_; // For passing to EncodeKey
bool mutable sorted_;
void DoSort() const;

public:
explicit Iterator(class VectorRep* vrep,
std::shared_ptr<std::vector<const char*>> bucket,
const KeyComparator& compare);
std::shared_ptr<std::vector<const char*>> bucket,
const KeyComparator& compare);

// Initialize an iterator over the specified collection.
// The returned iterator is not valid.
Expand Down Expand Up @@ -125,12 +125,10 @@ void VectorRep::MarkReadOnly() {
}

size_t VectorRep::ApproximateMemoryUsage() {
return
sizeof(bucket_) + sizeof(*bucket_) +
bucket_->size() *
sizeof(
std::remove_reference<decltype(*bucket_)>::type::value_type
);
return sizeof(bucket_) + sizeof(*bucket_) +
bucket_->size() *
sizeof(
std::remove_reference<decltype(*bucket_)>::type::value_type);
}

VectorRep::VectorRep(const KeyComparator& compare, Allocator* allocator,
Expand All @@ -144,13 +142,13 @@ VectorRep::VectorRep(const KeyComparator& compare, Allocator* allocator,
}

VectorRep::Iterator::Iterator(class VectorRep* vrep,
std::shared_ptr<std::vector<const char*>> bucket,
const KeyComparator& compare)
: vrep_(vrep),
bucket_(bucket),
cit_(bucket_->end()),
compare_(compare),
sorted_(false) { }
std::shared_ptr<std::vector<const char*>> bucket,
const KeyComparator& compare)
: vrep_(vrep),
bucket_(bucket),
cit_(bucket_->end()),
compare_(compare),
sorted_(false) {}

void VectorRep::Iterator::DoSort() const {
// vrep is non-null means that we are working on an immutable memtable
Expand Down Expand Up @@ -216,12 +214,11 @@ void VectorRep::Iterator::Seek(const Slice& user_key,
// Do binary search to find first value not less than the target
const char* encoded_key =
(memtable_key != nullptr) ? memtable_key : EncodeKey(&tmp_, user_key);
cit_ = std::equal_range(bucket_->begin(),
bucket_->end(),
encoded_key,
[this] (const char* a, const char* b) {
cit_ = std::equal_range(bucket_->begin(), bucket_->end(), encoded_key,
[this](const char* a, const char* b) {
return compare_(a, b) < 0;
}).first;
})
.first;
}

// Advance to the first entry with a key <= target
Expand Down Expand Up @@ -249,20 +246,27 @@ void VectorRep::Iterator::SeekToLast() {

void VectorRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
rwlock_.ReadLock();
VectorRep* vector_rep;
std::shared_ptr<Bucket> bucket;
if (immutable_) {
vector_rep = this;
} else {
vector_rep = nullptr;
bucket.reset(new Bucket(*bucket_)); // make a copy
std::unique_ptr<VectorRep::Iterator> iter{};

{
ReadLock aReadLock(&rwlock_);
VectorRep* vector_rep;
std::shared_ptr<Bucket> bucket;

if (immutable_) {
vector_rep = this;
} else {
vector_rep = nullptr;
bucket = std::make_shared<Bucket>(*bucket_); // make a copy
}

iter.reset(new VectorRep::Iterator(
vector_rep, immutable_ ? bucket_ : bucket, compare_));
}
VectorRep::Iterator iter(vector_rep, immutable_ ? bucket_ : bucket, compare_);
rwlock_.ReadUnlock();

for (iter.Seek(k.user_key(), k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) {
for (iter->Seek(k.user_key(), k.memtable_key().data());
iter->Valid() && callback_func(callback_args, iter->key());
iter->Next()) {
}
}

Expand All @@ -281,16 +285,16 @@ MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) {
return new (mem) Iterator(this, bucket_, compare_);
}
} else {
std::shared_ptr<Bucket> tmp;
tmp.reset(new Bucket(*bucket_)); // make a copy
std::shared_ptr<Bucket> tmp =
std::make_shared<Bucket>(*bucket_); // make a copy
if (arena == nullptr) {
return new Iterator(nullptr, tmp, compare_);
} else {
return new (mem) Iterator(nullptr, tmp, compare_);
}
}
}
} // anon namespace
} // namespace

MemTableRep* VectorRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, Allocator* allocator,
Expand Down
0