Skip to content

Commit

Permalink
titandb: add TitanSnapshot (facebook#28)
Browse files Browse the repository at this point in the history
* titandb: add TitanSnapshot

Wraps the current version together with the snapshot from base DB
so that we can safely recycle a steal version when it is dropped.
This also implies a guarantee that the current version must contain
all the data accessable from base DB.

* titandb: fix some warnings

* titandb: fix some warnings
  • Loading branch information
huachaohuang authored and DorianZheng committed Sep 5, 2018
1 parent 9cd8875 commit 190e9e2
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 88 deletions.
4 changes: 2 additions & 2 deletions utilities/titandb/blob_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ class BlobFileReader {
// Constructs a reader with the shared blob file. The provided blob
// file must be corresponding to the "file".
BlobFileReader(const TitanDBOptions& options,
std::shared_ptr<BlobFile> blob_file,
std::shared_ptr<BlobFile> _blob_file,
std::unique_ptr<RandomAccessFileReader> file)
: options_(options),
blob_file_(blob_file),
blob_file_(_blob_file),
file_(std::move(file)) {}

// Gets the blob record pointed by the handle in this file. The data
Expand Down
91 changes: 72 additions & 19 deletions utilities/titandb/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,22 @@ Status TitanDBImpl::Close() {
Status TitanDBImpl::Get(const ReadOptions& options,
ColumnFamilyHandle* cf_handle,
const Slice& key, PinnableSlice* value) {
ReadContext ctx(db_, vset_, &mutex_, options);
return GetImpl(ctx, cf_handle, key, value);
if (options.snapshot) {
return GetImpl(options, cf_handle, key, value);
}
ReadOptions ro(options);
ManagedSnapshot snapshot(this);
ro.snapshot = snapshot.snapshot();
return GetImpl(ro, cf_handle, key, value);
}

Status TitanDBImpl::GetImpl(const ReadContext& ctx,
Status TitanDBImpl::GetImpl(const ReadOptions& options,
ColumnFamilyHandle* cf_handle,
const Slice& key, PinnableSlice* value) {
Status s;
auto current = ctx.current();
auto options = ctx.options();
auto snapshot = reinterpret_cast<const TitanSnapshot*>(options.snapshot);
auto current = snapshot->current();

Status s;
bool is_blob_index = false;
s = db_impl_->GetImpl(options, cf_handle, key, value,
nullptr /*value_found*/,
Expand All @@ -200,7 +205,19 @@ std::vector<Status> TitanDBImpl::MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& cf_handles,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
ReadContext ctx(db_, vset_, &mutex_, options);
if (options.snapshot) {
return MultiGetImpl(options, cf_handles, keys, values);
}
ReadOptions ro(options);
ManagedSnapshot snapshot(this);
ro.snapshot = snapshot.snapshot();
return MultiGetImpl(ro, cf_handles, keys, values);
}

std::vector<Status> TitanDBImpl::MultiGetImpl(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& cf_handles,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
std::vector<Status> res;
res.reserve(keys.size());
values->clear();
Expand All @@ -209,7 +226,7 @@ std::vector<Status> TitanDBImpl::MultiGet(
Status s;
std::string value;
PinnableSlice pinnable_value(&value);
s = GetImpl(ctx, cf_handles[i], keys[i], &pinnable_value);
s = GetImpl(options, cf_handles[i], keys[i], &pinnable_value);
res.emplace_back(s);
values->emplace_back(value);
}
Expand All @@ -218,35 +235,71 @@ std::vector<Status> TitanDBImpl::MultiGet(

Iterator* TitanDBImpl::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* cf_handle) {
std::shared_ptr<ReadContext> ctx(
new ReadContext(db_, vset_, &mutex_, options));
return NewIteratorImpl(ctx, cf_handle);
std::shared_ptr<ManagedSnapshot> snapshot;
if (options.snapshot) {
return NewIteratorImpl(options, cf_handle, snapshot);
}
ReadOptions ro(options);
snapshot.reset(new ManagedSnapshot(this));
ro.snapshot = snapshot->snapshot();
return NewIteratorImpl(ro, cf_handle, snapshot);
}

Iterator* TitanDBImpl::NewIteratorImpl(std::shared_ptr<ReadContext> ctx,
ColumnFamilyHandle* cf_handle) {
Iterator* TitanDBImpl::NewIteratorImpl(
const ReadOptions& options,
ColumnFamilyHandle* cf_handle,
std::shared_ptr<ManagedSnapshot> snapshot) {
assert(snapshot->snapshot() == nullptr ||
snapshot->snapshot() == options.snapshot);

auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cf_handle)->cfd();
std::unique_ptr<ArenaWrappedDBIter> iter(
db_impl_->NewIteratorImpl(
ctx->options(), cfd,
ctx->options().snapshot->GetSequenceNumber(),
options, cfd,
options.snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/));
return new TitanDBIterator(ctx, std::move(iter));
return new TitanDBIterator(options, snapshot, std::move(iter));
}

Status TitanDBImpl::NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& cf_handles,
std::vector<Iterator*>* iterators) {
std::shared_ptr<ReadContext> ctx(
new ReadContext(db_, vset_, &mutex_, options));
ReadOptions ro(options);
std::shared_ptr<ManagedSnapshot> snapshot;
if (!ro.snapshot) {
snapshot.reset(new ManagedSnapshot(this));
ro.snapshot = snapshot->snapshot();
}
iterators->clear();
iterators->reserve(cf_handles.size());
for (auto& cf_handle : cf_handles) {
iterators->emplace_back(NewIteratorImpl(ctx, cf_handle));
iterators->emplace_back(NewIteratorImpl(ro, cf_handle, snapshot));
}
return Status::OK();
}

const Snapshot* TitanDBImpl::GetSnapshot() {
Version* current;
const Snapshot* snapshot;
{
MutexLock l(&mutex_);
current = vset_->current();
current->Ref();
snapshot = db_->GetSnapshot();
}
return new TitanSnapshot(current, snapshot);
}

void TitanDBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
auto s = reinterpret_cast<const TitanSnapshot*>(snapshot);
{
MutexLock l(&mutex_);
s->current()->Unref();
db_->ReleaseSnapshot(s->snapshot());
}
delete s;
}

} // namespace titandb
} // namespace rocksdb
17 changes: 13 additions & 4 deletions utilities/titandb/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "db/db_impl.h"
#include "utilities/titandb/db.h"
#include "utilities/titandb/version_set.h"
#include "utilities/titandb/read_context.h"
#include "utilities/titandb/blob_file_manager.h"

namespace rocksdb {
Expand Down Expand Up @@ -41,16 +40,26 @@ class TitanDBImpl : public TitanDB {
const std::vector<ColumnFamilyHandle*>& cf_handles,
std::vector<Iterator*>* iterators) override;

const Snapshot* GetSnapshot() override;

void ReleaseSnapshot(const Snapshot* snapshot) override;

private:
class FileManager;
friend class FileManager;

Status GetImpl(const ReadContext& ctx,
Status GetImpl(const ReadOptions& options,
ColumnFamilyHandle* cf_handle,
const Slice& key, PinnableSlice* value);

Iterator* NewIteratorImpl(std::shared_ptr<ReadContext> ctx,
ColumnFamilyHandle* cf_handle);
std::vector<Status> MultiGetImpl(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& cf_handles,
const std::vector<Slice>& keys, std::vector<std::string>* values);

Iterator* NewIteratorImpl(const ReadOptions& options,
ColumnFamilyHandle* cf_handle,
std::shared_ptr<ManagedSnapshot> snapshot);

Env* env_;
EnvOptions env_options_;
Expand Down
46 changes: 36 additions & 10 deletions utilities/titandb/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,42 @@

#include "rocksdb/iterator.h"
#include "db/db_iter.h"
#include "utilities/titandb/version.h"
#include "utilities/titandb/blob_format.h"
#include "utilities/titandb/version_set.h"
#include "utilities/titandb/blob_file_reader.h"

namespace rocksdb {
namespace titandb {

// Wraps the current version together with the snapshot from base DB
// so that we can safely recycle a steal version when it is dropped.
// This also implies a guarantee that the current version must contain
// all the data accessible from base DB.
class TitanSnapshot : public Snapshot {
public:
TitanSnapshot(Version* _current, const Snapshot* _snapshot)
: current_(_current), snapshot_(_snapshot) {}

Version* current() const { return current_; }

const Snapshot* snapshot() const { return snapshot_; }

SequenceNumber GetSequenceNumber() const override {
return snapshot_->GetSequenceNumber();
}

private:
Version* current_;
const Snapshot* snapshot_;
};

class TitanDBIterator : public Iterator {
public:
TitanDBIterator(std::shared_ptr<ReadContext> ctx,
TitanDBIterator(const ReadOptions& options,
std::shared_ptr<ManagedSnapshot> snap,
std::unique_ptr<ArenaWrappedDBIter> iter)
: ctx_(ctx),
: options_(options),
snap_(snap),
iter_(std::move(iter)) {}

bool Valid() const override {
Expand Down Expand Up @@ -72,27 +97,28 @@ class TitanDBIterator : public Iterator {
status_ = DecodeInto(iter_->value(), &index);
if (!status_.ok()) return;

auto current = ctx_->current();
auto options = ctx_->options();
if (!options.readahead_size) {
status_ = current->Get(options, index, &record_, &buffer_);
auto snapshot = reinterpret_cast<const TitanSnapshot*>(options_.snapshot);
auto current = snapshot->current();
if (!options_.readahead_size) {
status_ = current->Get(options_, index, &record_, &buffer_);
return;
}

auto it = cache_.find(index.file_number);
if (it == cache_.end()) {
std::unique_ptr<BlobFileReader> reader;
status_ = current->NewReader(options, index.file_number, &reader);
status_ = current->NewReader(options_, index.file_number, &reader);
if (!status_.ok()) return;
it = cache_.emplace(index.file_number, std::move(reader)).first;
}
status_ = it->second->Get(options, index.blob_handle, &record_, &buffer_);
status_ = it->second->Get(options_, index.blob_handle, &record_, &buffer_);
}

Status status_;
BlobRecord record_;
std::string buffer_;
std::shared_ptr<ReadContext> ctx_;
ReadOptions options_;
std::shared_ptr<ManagedSnapshot> snap_;
std::unique_ptr<ArenaWrappedDBIter> iter_;
std::map<uint64_t, std::unique_ptr<BlobFileReader>> cache_;
};
Expand Down
45 changes: 0 additions & 45 deletions utilities/titandb/read_context.h

This file was deleted.

4 changes: 2 additions & 2 deletions utilities/titandb/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ class VersionEdit {

void AddBlobFile(const BlobFileMeta& file) {
auto it = added_files_.emplace(file.file_number, file);
assert(it.second == true);
if (!it.second) abort();
}

void DeleteBlobFile(uint64_t file_number) {
auto it = deleted_files_.emplace(file_number);
assert(it.second == true);
if (!it.second) abort();
}

void EncodeTo(std::string* dst) const;
Expand Down
10 changes: 5 additions & 5 deletions utilities/titandb/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ Status VersionSet::Recover() {
};

// Reads "CURRENT" file, which contains the name of the current manifest file.
std::string current;
Status s = ReadFileToString(env_, CurrentFileName(dirname_), &current);
std::string manifest;
Status s = ReadFileToString(env_, CurrentFileName(dirname_), &manifest);
if (!s.ok()) return s;
if (current.empty() || current.back() != '\n') {
if (manifest.empty() || manifest.back() != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
current.resize(current.size() - 1);
manifest.resize(manifest.size() - 1);

// Opens the current manifest file.
auto file_name = dirname_ + "/" + current;
auto file_name = dirname_ + "/" + manifest;
std::unique_ptr<SequentialFileReader> file;
{
std::unique_ptr<SequentialFile> f;
Expand Down
2 changes: 1 addition & 1 deletion utilities/titandb/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class VersionSet {

// Sets up the storage specified in "tdb_options.dirname".
// If the manifest doesn't exist, it will create one.
// If the manifest exists, it will recover from the lastest one.
// If the manifest exists, it will recover from the latest one.
Status Open();

// Applies *edit on the current version to form a new version that is
Expand Down

0 comments on commit 190e9e2

Please sign in to comment.