Skip to content

Commit

Permalink
Fix a bug in WAL tracking (facebook#10087)
Browse files Browse the repository at this point in the history
Summary:
Closing facebook#10080

When `SyncWAL()` calls `MarkLogsSynced()`, even if there is only one active WAL file,
this event should still be added to the MANIFEST.

Pull Request resolved: facebook#10087

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D36797580

Pulled By: riversand963

fbshipit-source-id: 24184c9dd606b3939a454ed41de6e868d1519999
  • Loading branch information
riversand963 committed Jun 8, 2022
1 parent c8bae6e commit 1095f37
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 13 deletions.
4 changes: 4 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Rocksdb Change Log
## 7.3.1 (06/08/2022)
### Bug Fixes
* Fix a bug in WAL tracking. Before this PR (#10087), calling `SyncWAL()` on the only WAL file of the db will not log the event in MANIFEST, thus allowing a subsequent `DB::Open` even if the WAL file is missing or corrupted.

## 7.3.0 (05/20/2022)
### Bug Fixes
* Fixed a bug where manual flush would block forever even though flush options had wait=false.
Expand Down
21 changes: 21 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4096,6 +4096,27 @@ TEST_F(DBBasicTest, VerifyFileChecksums) {
Reopen(options);
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());
}

TEST_F(DBBasicTest, ManualWalSync) {
Options options = CurrentOptions();
options.track_and_verify_wals_in_manifest = true;
options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
DestroyAndReopen(options);

ASSERT_OK(Put("x", "y"));
// This does not create a new WAL.
ASSERT_OK(db_->SyncWAL());
EXPECT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty());

std::unique_ptr<LogFile> wal;
Status s = db_->GetCurrentWalFile(&wal);
ASSERT_OK(s);
Close();

EXPECT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));

ASSERT_TRUE(TryReopen(options).IsCorruption());
}
#endif // !ROCKSDB_LITE

// A test class for intercepting random reads and injecting artificial
Expand Down
11 changes: 6 additions & 5 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1441,12 +1441,13 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
assert(wal.getting_synced);
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.writer->file()->GetFileSize() > 0) {
synced_wals.AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
}

if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.writer->file()->GetFileSize() > 0) {
synced_wals.AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
}
logs_to_free_.push_back(wal.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
Expand Down
16 changes: 10 additions & 6 deletions file/writable_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,

TEST_KILL_RANDOM("WritableFileWriter::Append:1");
if (s.ok()) {
filesize_ += data.size();
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
filesize_.store(cur_size + data.size(), std::memory_order_release);
}
return s;
}
Expand Down Expand Up @@ -191,7 +192,8 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
cap = buf_.Capacity() - buf_.CurrentSize();
}
pending_sync_ = true;
filesize_ += pad_bytes;
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
filesize_.store(cur_size + pad_bytes, std::memory_order_release);
if (perform_data_verification_) {
buffered_data_crc32c_checksum_ =
crc32c::Extend(buffered_data_crc32c_checksum_,
Expand Down Expand Up @@ -227,14 +229,15 @@ IOStatus WritableFileWriter::Close() {
start_ts = FileOperationInfo::StartNow();
}
#endif
interim = writable_file_->Truncate(filesize_, io_options, nullptr);
uint64_t filesz = filesize_.load(std::memory_order_acquire);
interim = writable_file_->Truncate(filesz, io_options, nullptr);
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
auto finish_ts = FileOperationInfo::FinishNow();
NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
if (!interim.ok()) {
NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(),
filesize_);
filesz);
}
}
#endif
Expand Down Expand Up @@ -372,8 +375,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
const uint64_t kBytesNotSyncRange =
1024 * 1024; // recent 1MB is not synced.
const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
if (filesize_ > kBytesNotSyncRange) {
uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
if (cur_size > kBytesNotSyncRange) {
uint64_t offset_sync_to = cur_size - kBytesNotSyncRange;
offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
assert(offset_sync_to >= last_sync_size_);
if (offset_sync_to > 0 &&
Expand Down
6 changes: 4 additions & 2 deletions file/writable_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class WritableFileWriter {
size_t max_buffer_size_;
// Actually written data size can be used for truncate
// not counting padding data
uint64_t filesize_;
std::atomic<uint64_t> filesize_;
#ifndef ROCKSDB_LITE
// This is necessary when we use unbuffered access
// and writes must happen on aligned offsets
Expand Down Expand Up @@ -255,7 +255,9 @@ class WritableFileWriter {
// returns NotSupported status.
IOStatus SyncWithoutFlush(bool use_fsync);

uint64_t GetFileSize() const { return filesize_; }
uint64_t GetFileSize() const {
return filesize_.load(std::memory_order_acquire);
}

IOStatus InvalidateCache(size_t offset, size_t length) {
return writable_file_->InvalidateCache(offset, length);
Expand Down

0 comments on commit 1095f37

Please sign in to comment.