Skip to content

Commit

Permalink
Bump to v6.6.4 and apply Pegasus changes (facebook#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 authored Mar 6, 2020
1 parent 551a110 commit f0e22c3
Show file tree
Hide file tree
Showing 33 changed files with 969 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,4 @@ script:
esac
notifications:
email:
- leveldb@fb.com
- pegasus-help@xiaomi.com
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
## RocksDB: A Persistent Key-Value Store for Flash and RAM Storage

[![Linux/Mac Build Status](https://travis-ci.org/facebook/rocksdb.svg?branch=master)](https://travis-ci.org/facebook/rocksdb)
[![Windows Build status](https://ci.appveyor.com/api/projects/status/fbgfu0so3afcno78/branch/master?svg=true)](https://ci.appveyor.com/project/Facebook/rocksdb/branch/master)
[![PPC64le Build Status](http://140.211.168.68:8080/buildStatus/icon?job=Rocksdb)](http://140.211.168.68:8080/job/Rocksdb)
[![Build Status](https://travis-ci.org/XiaoMi/pegasus-rocksdb.svg?branch=pegasus)](https://travis-ci.org/XiaoMi/pegasus-rocksdb)

RocksDB is developed and maintained by Facebook Database Engineering Team.
It is built on earlier work on [LevelDB](https://github.com/google/leveldb) by Sanjay Ghemawat ([email protected])
Expand Down
16 changes: 16 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,8 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
WriteController* write_controller,
BlockCacheTracer* const block_cache_tracer)
: max_column_family_(0),
pegasus_data_version_(db_options->pegasus_data_version),
last_manual_compact_finish_time_(0),
dummy_cfd_(new ColumnFamilyData(
0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
env_options, nullptr, block_cache_tracer)),
Expand Down Expand Up @@ -1397,6 +1399,20 @@ size_t ColumnFamilySet::NumberOfColumnFamilies() const {
return column_families_.size();
}

uint32_t ColumnFamilySet::GetPegasusDataVersion() const { return pegasus_data_version_; }

void ColumnFamilySet::SetPegasusDataVersion(uint32_t version) {
pegasus_data_version_ = version;
}

uint64_t ColumnFamilySet::GetLastManualCompactFinishTime() const {
return last_manual_compact_finish_time_;
}

void ColumnFamilySet::SetLastManualCompactFinishTime(uint64_t ms) {
last_manual_compact_finish_time_ = ms;
}

// under a DB mutex AND write thread
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
Expand Down
7 changes: 6 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,10 @@ class ColumnFamilySet {
uint32_t GetMaxColumnFamily();
void UpdateMaxColumnFamily(uint32_t new_max_column_family);
size_t NumberOfColumnFamilies() const;

uint32_t GetPegasusDataVersion() const;
void SetPegasusDataVersion(uint32_t version);
uint64_t GetLastManualCompactFinishTime() const;
void SetLastManualCompactFinishTime(uint64_t ms);
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,
const ColumnFamilyOptions& options);
Expand Down Expand Up @@ -681,6 +684,8 @@ class ColumnFamilySet {
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;

uint32_t max_column_family_;
uint32_t pegasus_data_version_;
uint64_t last_manual_compact_finish_time_;
ColumnFamilyData* dummy_cfd_;
// We don't hold the refcount here, since default column family always exists
// We are also not responsible for cleaning up default_cfd_cache_. This is
Expand Down
57 changes: 57 additions & 0 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,63 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
return Status::OK();
}

Status DBImpl::GetLiveFilesQuick(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
SequenceNumber* last_sequence,
uint64_t* last_decree) const {
// ATTENTION(laiyingchun): only used for Pegasus.
assert(pegasus_data_);
*manifest_file_size = 0;
*last_sequence = 0;
*last_decree = 0;

mutex_.Lock();

// ATTENTION(qinzuoyan): only use default column family.
assert(versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1u);

// Make a set of all of the live *.sst files
std::vector<FileDescriptor> live;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->current()->AddLiveFiles(&live);
// get last sequence/decree
SequenceNumber seq;
uint64_t d;
cfd->current()->GetLastFlushSeqDecree(&seq, &d);
if (seq > *last_sequence) {
assert(d >= *last_decree);
*last_sequence = seq;
*last_decree = d;
}
}

ret.clear();
ret.reserve(live.size() + 3); //*.sst + CURRENT + MANIFEST + OPTIONS

// Put current and manifest files firstly to make them copied quickly,
// because the manifest file may be deleted when copying sstables.
// TODO(qinzuoyan): but now it may be not necessary because the manifest file
// must not be deleted when copying sstables..
ret.push_back(CurrentFileName(""));
ret.push_back(DescriptorFileName("", versions_->manifest_file_number()));
ret.push_back(OptionsFileName("", versions_->options_file_number()));

// create names of the live files. The names are not absolute
// paths, instead they are relative to dbname_;
for (auto live_file : live) {
ret.push_back(MakeTableFileName("", live_file.GetNumber()));
}

// find length of manifest file while holding the mutex lock
*manifest_file_size = versions_->manifest_file_size();

mutex_.Unlock();
return Status::OK();
}

Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
{
// If caller disabled deletions, this function should return files that are
Expand Down
28 changes: 28 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
// requires a custom gc for compaction, we use that to set use_custom_gc_
// as well.
use_custom_gc_(seq_per_batch),
pegasus_data_(options.pegasus_data),
shutdown_initiated_(false),
own_sfm_(options.sst_file_manager == nullptr),
preserve_deletes_(options.preserve_deletes),
Expand Down Expand Up @@ -1269,6 +1270,33 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_->LastSequence();
}

uint64_t DBImpl::GetLastFlushedDecree() const {
SequenceNumber seq;
uint64_t d;

mutex_.Lock();
// ATTENTION(qinzuoyan): only use default column family.
assert(!pegasus_data_ || versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1u);
versions_->GetColumnFamilySet()->GetDefault()->current()->GetLastFlushSeqDecree(&seq, &d);
mutex_.Unlock();

return d;
}

uint32_t DBImpl::GetPegasusDataVersion() const {
mutex_.Lock();
uint32_t version = versions_->GetColumnFamilySet()->GetPegasusDataVersion();
mutex_.Unlock();
return version;
}

uint64_t DBImpl::GetLastManualCompactFinishTime() const {
mutex_.Lock();
uint64_t ms = versions_->GetColumnFamilySet()->GetLastManualCompactFinishTime();
mutex_.Unlock();
return ms;
}

void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
versions_->SetLastPublishedSequence(seq);
}
Expand Down
13 changes: 13 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ class DBImpl : public DB {
uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;

virtual uint64_t GetLastFlushedDecree() const override;

virtual uint32_t GetPegasusDataVersion() const override;

virtual uint64_t GetLastManualCompactFinishTime() const override;

#ifndef ROCKSDB_LITE
using DB::ResetStats;
virtual Status ResetStats() override;
Expand All @@ -350,6 +356,10 @@ class DBImpl : public DB {
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
bool flush_memtable = true) override;
virtual Status GetLiveFilesQuick(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
SequenceNumber* last_sequence,
uint64_t* last_decree) const override;
virtual Status GetSortedWalFiles(VectorLogPtr& files) override;
virtual Status GetCurrentWalFile(
std::unique_ptr<LogFile>* current_log_file) override;
Expand Down Expand Up @@ -1479,6 +1489,8 @@ class DBImpl : public DB {
// Used by WriteImpl to update bg_error_ in case of memtable insert error.
void MemTableInsertStatusCheck(const Status& memtable_insert_status);

Status UpdateManualCompactTime(ColumnFamilyHandle* column_family);

#ifndef ROCKSDB_LITE

Status CompactFilesImpl(const CompactionOptions& compact_options,
Expand Down Expand Up @@ -2027,6 +2039,7 @@ class DBImpl : public DB {
// flush/compaction and if it is not provided vis SnapshotChecker, we should
// disable gc to be safe.
const bool use_custom_gc_;
const bool pegasus_data_;
// Flag to indicate that the DB instance shutdown has been initiated. This
// different from shutting_down_ atomic in that it is set at the beginning
// of shutdown sequence, specifically in order to prevent any background
Expand Down
37 changes: 37 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,29 @@ void DBImpl::NotifyOnFlushCompleted(
#endif // ROCKSDB_LITE
}

Status DBImpl::UpdateManualCompactTime(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
uint64_t ms = env_->NowMicros() / 1000;

InstrumentedMutexLock guard_lock(&mutex_);

const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();

versions_->GetColumnFamilySet()->SetLastManualCompactFinishTime(ms);

VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
edit.SetLastManualCompactFinishTime(ms);
Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir());
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
edit.DebugString().data());

return status;
}

Status DBImpl::CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
Expand Down Expand Up @@ -777,8 +800,18 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
if (s.ok()) {
s = ReFitLevel(cfd, final_output_level, options.target_level);
}
VersionStorageInfo::LevelSummaryStorage tmp;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] Level summary with lsm_state after ReFitLevel: %s\n",
cfd->GetName().c_str(),
cfd->current()->storage_info()->LevelSummary(&tmp));
ContinueBackgroundWork();
}

if (s.ok()) {
s = UpdateManualCompactTime(column_family);
}

LogFlush(immutable_db_options_.info_log);

{
Expand Down Expand Up @@ -1532,6 +1565,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);

// ATTENTION(laiyingchun): An optimization to avoid switching empty memtable
// for Pegasus(single CF).
if (!pegasus_data_ || !cfd->mem()->IsEmpty()) {
WriteThread::Writer w;
WriteThread::Writer nonmem_w;
if (!writes_stopped) {
Expand Down Expand Up @@ -1605,6 +1641,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
}
} // if (!pegasus_data_ || !cfd->mem()->IsEmpty())
}
TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ Status DBImpl::NewDB() {
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
new_db.SetPegasusDataVersion(immutable_db_options_.pegasus_data_version);

ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
const std::string manifest = DescriptorFileName(dbname_, 1);
Expand Down
25 changes: 24 additions & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
disable_memtable);

// ATTENTION(qinzuoyan): always only use default column family under
// replication framework.
assert(!pegasus_data_ || single_column_family_mode_);

Status status;
if (write_options.low_pri) {
status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
Expand All @@ -108,6 +112,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
}

// ATTENTION(laiyingchun): disable_memtable is always false in pegasus
assert(!pegasus_data_ || !disable_memtable);
if (two_write_queues_ && disable_memtable) {
AssignOrder assign_order =
seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder;
Expand Down Expand Up @@ -144,6 +150,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status;
}

// ATTENTION(laiyingchun): enable_pipelined_write is always false in pegasus
assert(!pegasus_data_ || !immutable_db_options_.enable_pipelined_write);
if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
Expand All @@ -160,6 +168,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

write_thread_.JoinBatchGroup(&w);
// ATTENTION(qinzuoyan): because write is always applied in single thread
// under replication framework, so we must be the only write batch and
// must be STATE_GROUP_LEADER.
assert(!pegasus_data_ || w.state == WriteThread::STATE_GROUP_LEADER);
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group

Expand Down Expand Up @@ -249,6 +261,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);

if (status.ok()) {
// ATTENTION(qinzuoyan): because write is always applied in single thread
// under replication framework, so we must be the only write batch.
assert(!pegasus_data_ || write_group.size == 1);

// Rules for when we can update the memtable concurrently
// 1. supported by memtable
// 2. Puts are not okay if inplace_update_support
Expand Down Expand Up @@ -280,6 +296,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
}
}
// ATTENTION(qinzuoyan): under replication framework, batch should not be empty.
assert(!pegasus_data_ || total_count > 0);
// ATTENTION(laiyingchun): seq_per_batch_ should always be false as default value.
assert(!pegasus_data_ || !seq_per_batch_);

// Note about seq_per_batch_: either disableWAL is set for the entire write
// group or not. In either case we inc seq for each write batch with no
// failed callback. This means that there could be a batch with
Expand Down Expand Up @@ -373,14 +394,16 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);

// ATTENTION(laiyingchun): parallel should always be false because write_group.size == 1.
assert(!pegasus_data_ || !parallel);
if (!parallel) {
// w.sequence will be set inside InsertInto
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
batch_per_txn_);
batch_per_txn_, write_options.given_decree, pegasus_data_);
} else {
write_group.last_sequence = last_sequence;
write_thread_.LaunchParallelMemTableWriters(&write_group);
Expand Down
11 changes: 11 additions & 0 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ void FlushJob::PickMemTable() {
// will no longer be picked up for recovery.
edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
edit_->SetColumnFamily(cfd_->GetID());
if (db_options_.pegasus_data) {
// Mark the last sequence/decree of all memtables to be flushed. Although
// entries mems are sorted in ascending order by their created, we should
// iterate all mems but not take the last one because memtable may be empty.
for (auto mem : mems_) {
SequenceNumber seq;
uint64_t d;
mem->GetLastSeqDecree(&seq, &d);
edit_->UpdateLastFlushSeqDecree(seq, d);
}
}

// path 0 for level 0 file.
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
Expand Down
2 changes: 2 additions & 0 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
earliest_seqno_(latest_seq),
creation_seq_(latest_seq),
mem_next_logfile_number_(0),
last_sequence_(0),
last_decree_(0),
min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support
? moptions_.inplace_update_num_locks
Expand Down
Loading

0 comments on commit f0e22c3

Please sign in to comment.