Skip to content

Commit

Permalink
Support for single-primary, multi-secondary instances (#4899)
Browse files Browse the repository at this point in the history
Summary:
This PR allows RocksDB to run in single-primary, multi-secondary process mode.
The writer is a regular RocksDB (e.g. an `DBImpl`) instance playing the role of a primary.
Multiple `DBImplSecondary` processes (secondaries) share the same set of SST files, MANIFEST, WAL files with the primary. Secondaries tail the MANIFEST of the primary and apply updates to their own in-memory state of the file system, e.g. `VersionStorageInfo`.

This PR has several components:
1. (Originally in #4745). Add a `PathNotFound` subcode to `IOError` to denote the failure when a secondary tries to open a file which has been deleted by the primary.

2. (Similar to #4602). Add `FragmentBufferedReader` to handle partially-read, trailing record at the end of a log from where future read can continue.

3. (Originally in #4710 and #4820). Add implementation of the secondary, i.e. `DBImplSecondary`.
3.1 Tail the primary's MANIFEST during recovery.
3.2 Tail the primary's MANIFEST during normal processing by calling `ReadAndApply`.
3.3 Tailing WAL will be in a future PR.

4. Add an example in 'examples/multi_processes_example.cc' to demonstrate the usage of secondary RocksDB instance in a multi-process setting. Instructions to run the example can be found at the beginning of the source code.
Pull Request resolved: #4899

Differential Revision: D14510945

Pulled By: riversand963

fbshipit-source-id: 4ac1c5693e6012ad23f7b4b42d3c374fecbe8886
  • Loading branch information
riversand963 authored and facebook-github-bot committed Mar 26, 2019
1 parent 2a5463a commit 9358178
Show file tree
Hide file tree
Showing 32 changed files with 2,597 additions and 283 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ set(SOURCES
db/db_impl_debug.cc
db/db_impl_experimental.cc
db/db_impl_readonly.cc
db/db_impl_secondary.cc
db/db_info_dumper.cc
db/db_iter.cc
db/dbformat.cc
Expand Down Expand Up @@ -873,6 +874,7 @@ if(WITH_TESTS)
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc
db/db_secondary_test.cc
db/db_sst_test.cc
db/db_statistics_test.cc
db/db_table_properties_test.cc
Expand Down
13 changes: 3 additions & 10 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,17 @@
## Unreleased
### New Features
* Introduce two more stats levels, kExceptHistogramOrTimers and kExceptTimers.
* Added a feature to perform data-block sampling for compressibility, and report stats to user.
* Add support for trace filtering.

### Public API Change
* Remove bundled fbson library.
* statistics.stats_level_ becomes atomic. It is preferred to use statistics.set_stats_level() and statistics.get_stats_level() to access it.

* Introduce a new IOError subcode, PathNotFound, to indicate trying to open a nonexistent file or directory for read.
* Add initial support for multiple db instances sharing the same data in single-writer, multi-reader mode.
### Bug Fixes
* Fix JEMALLOC_CXX_THROW macro missing from older Jemalloc versions, causing build failures on some platforms.
* Fix SstFileReader not able to open file ingested with write_glbal_seqno=true.


## Unreleased
### New Features
* Added a feature to perform data-block sampling for compressibility, and report stats to user.
### Public API Change
### Bug fixes


## 6.0.0 (2/19/2019)
### New Features
* Enabled checkpoint on readonly db (DBImplReadOnly).
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ TESTS = \
db_merge_operator_test \
db_options_test \
db_range_del_test \
db_secondary_test \
db_sst_test \
db_tailing_iter_test \
db_io_failure_test \
Expand Down Expand Up @@ -547,6 +548,7 @@ TESTS = \
range_tombstone_fragmenter_test \
range_del_aggregator_test \
sst_file_reader_test \
db_secondary_test \

PARALLEL_TEST = \
backupable_db_test \
Expand Down Expand Up @@ -1571,6 +1573,9 @@ range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test
sst_file_reader_test: table/sst_file_reader_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

db_secondary_test: db/db_secondary_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

#-------------------------------------------------
# make install related stuff
INSTALL_PATH ?= /usr/local
Expand Down
6 changes: 6 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ cpp_library(
"db/db_impl_files.cc",
"db/db_impl_open.cc",
"db/db_impl_readonly.cc",
"db/db_impl_secondary.cc",
"db/db_impl_write.cc",
"db/db_info_dumper.cc",
"db/db_iter.cc",
Expand Down Expand Up @@ -605,6 +606,11 @@ ROCKS_TESTS = [
"db/db_range_del_test.cc",
"serial",
],
[
"db_secondary_test",
"db/db_secondary_test.cc",
"serial",
],
[
"db_sst_test",
"db/db_sst_test.cc",
Expand Down
12 changes: 6 additions & 6 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,21 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
immutable_db_options_(initial_db_options_),
mutable_db_options_(initial_db_options_),
stats_(immutable_db_options_.statistics.get()),
db_lock_(nullptr),
mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
immutable_db_options_.use_adaptive_mutex),
default_cf_handle_(nullptr),
max_total_in_memory_state_(0),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_, immutable_db_options_)),
db_lock_(nullptr),
shutting_down_(false),
bg_cv_(&mutex_),
logfile_number_(0),
log_dir_synced_(false),
log_empty_(true),
default_cf_handle_(nullptr),
log_sync_cv_(&mutex_),
total_log_size_(0),
max_total_in_memory_state_(0),
is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
write_thread_(immutable_db_options_),
Expand All @@ -186,9 +189,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
next_job_id_(1),
has_unpersisted_data_(false),
unable_to_release_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_, immutable_db_options_)),
num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_, seq_per_batch),
Expand Down
60 changes: 33 additions & 27 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,29 @@ class DBImpl : public DB {
std::unique_ptr<Tracer> tracer_;
InstrumentedMutex trace_mutex_;

// State below is protected by mutex_
// With two_write_queues enabled, some of the variables that accessed during
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
// logs_, logfile_number_. Refer to the definition of each variable below for
// more description.
mutable InstrumentedMutex mutex_;

ColumnFamilyHandleImpl* default_cf_handle_;
InternalStats* default_cf_internal_stats_;

// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families
uint64_t max_total_in_memory_state_;
// If true, we have only one (default) column family. We use this to optimize
// some code-paths
bool single_column_family_mode_;

// The options to access storage files
const EnvOptions env_options_;

// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;

// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
Expand Down Expand Up @@ -845,6 +868,14 @@ class DBImpl : public DB {
// Actual implementation of Close()
Status CloseImpl();

// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
virtual Status Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);

private:
friend class DB;
friend class ErrorHandler;
Expand Down Expand Up @@ -893,13 +924,6 @@ class DBImpl : public DB {
struct PrepickedCompaction;
struct PurgeFileInfo;

// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);

Status ResumeImpl();

void MaybeIgnoreError(Status* s) const;
Expand Down Expand Up @@ -1216,12 +1240,6 @@ class DBImpl : public DB {
// and log_empty_. Refer to the definition of each variable below for more
// details.
InstrumentedMutex log_write_mutex_;
// State below is protected by mutex_
// With two_write_queues enabled, some of the variables that accessed during
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
// logs_, logfile_number_. Refer to the definition of each variable below for
// more description.
mutable InstrumentedMutex mutex_;

std::atomic<bool> shutting_down_;
// This condition variable is signaled on these conditions:
Expand Down Expand Up @@ -1253,8 +1271,7 @@ class DBImpl : public DB {
// read and writes are protected by log_write_mutex_ instead. This is to avoid
// expesnive mutex_ lock during WAL write, which update log_empty_.
bool log_empty_;
ColumnFamilyHandleImpl* default_cf_handle_;
InternalStats* default_cf_internal_stats_;

std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number)
Expand Down Expand Up @@ -1321,12 +1338,7 @@ class DBImpl : public DB {
WriteBatch cached_recoverable_state_;
std::atomic<bool> cached_recoverable_state_empty_ = {true};
std::atomic<uint64_t> total_log_size_;
// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families
uint64_t max_total_in_memory_state_;
// If true, we have only one (default) column family. We use this to optimize
// some code-paths
bool single_column_family_mode_;

// If this is non-empty, we need to delete these log files in background
// threads. Protected by db mutex.
autovector<log::Writer*> logs_to_free_;
Expand Down Expand Up @@ -1545,12 +1557,6 @@ class DBImpl : public DB {

std::string db_absolute_path_;

// The options to access storage files
const EnvOptions env_options_;

// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;

// Number of running IngestExternalFile() calls.
// REQUIRES: mutex held
int num_running_ingest_file_;
Expand Down
3 changes: 1 addition & 2 deletions db/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
&reporter, true /*checksum*/, log_number,
false /* retry_after_eof */);
&reporter, true /*checksum*/, log_number);

// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable
Expand Down
Loading

0 comments on commit 9358178

Please sign in to comment.