Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into uncache_obsolete_files
Browse files Browse the repository at this point in the history
  • Loading branch information
pdillinger committed Jun 7, 2024
2 parents d079d4f + 44aceb8 commit a6be066
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 32 deletions.
228 changes: 196 additions & 32 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/listener.h"
#include "rocksdb/utilities/object_registry.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
Expand All @@ -35,6 +36,14 @@

namespace ROCKSDB_NAMESPACE {

namespace {
std::string EncodeAsUint64(uint64_t v) {
std::string dst;
PutFixed64(&dst, v);
return dst;
}
} // namespace

static const int kValueSize = 1000;

// counts how many operations were performed
Expand Down Expand Up @@ -3674,21 +3683,17 @@ TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
// No `full_history_ts_low` explicitly set by user, flush is continued
// without checking if its UDTs expired.
ASSERT_OK(Flush(0));

// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand All @@ -3705,12 +3710,8 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
// All keys expired w.r.t the configured `full_history_ts_low`, flush continue
// without the need for a re-schedule.
ASSERT_OK(Flush(0));
Expand All @@ -3719,13 +3720,13 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffInMemtableSealCb) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
Expand All @@ -3735,12 +3736,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string cutoff_ts;
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}}));
// Not all keys expired, but flush is continued without a re-schedule because
// of risk of write stall.
Expand All @@ -3752,24 +3749,194 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));

cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

// The user selectively increase cutoff timestamp in the `OnMemtableSealed`
// callback when it is invoked during a manual flush. It's suitable for when the
// user does not know an effective new cutoff timestamp and the callback will
// provide this info.
// The caveat of this approach is that the user need to track when manual flush
// is ongoing. In this example listener, the `manual_flush_count_` variable is
// for this purpose, it's designed to be a counter to allow concurrent manual
// flush to control the increase cutoff timestamp behavior independently.
// Also, a lot of operations can indirectly cause a manual flush, such as
// manual compaction/file ingestion. And the user needs to
// explicitly track each of such operation. So this callback is not ideal. Check
// out below `ManualFlushScheduledEventListener` for a different approach.
class MemtableSealEventListener : public EventListener {
private:
DB* db_;
std::vector<ColumnFamilyHandle*> handles_;
std::atomic<int> manual_flush_count_{0};

public:
std::atomic<int> memtable_seal_count_{0};
std::atomic<int> increase_cutoff_count_{0};

void OnMemTableSealed(const MemTableInfo& info) override {
memtable_seal_count_.fetch_add(1);
if (manual_flush_count_.load() == 0) {
return;
}
if (!info.newest_udt.empty()) {
uint64_t int_newest_udt = 0;
Slice udt_slice = info.newest_udt;
Status s = DecodeU64Ts(udt_slice, &int_newest_udt);
if (!s.ok()) {
return;
}
// An error indicates others have already set the cutoff to a higher
// point, so it's OK to proceed.
db_->IncreaseFullHistoryTsLow(handles_[0],
EncodeAsUint64(int_newest_udt + 1))
.PermitUncheckedError();
increase_cutoff_count_.fetch_add(1);
}
}

void PopulateDBAndHandles(DB* db, std::vector<ColumnFamilyHandle*> handles) {
db_ = db;
handles_ = handles;
}

void MarkManualFlushStart() { manual_flush_count_.fetch_add(1); }

void MarkManualFlushEnd() { manual_flush_count_.fetch_sub(1); }
};

TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffOnMemtableSealedCb) {
std::shared_ptr<MemtableSealEventListener> listener =
std::make_shared<MemtableSealEventListener>();
db_options_.listeners.push_back(listener);
const int kNumEntriesPerMemTable = 2;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumEntriesPerMemTable - 1));
// Make sure there is no memory pressure to not retain udts.
column_family_options_.max_write_buffer_number = 8;
Open();

listener->PopulateDBAndHandles(db_, handles_);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1"));
ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1"));
// Event listener not attempt to increase cutoff timestamp if there is no
// manual flush going on.
ASSERT_EQ(listener->memtable_seal_count_.load(), 1);
ASSERT_EQ(listener->increase_cutoff_count_.load(), 0);

// Created the first memtable and scheduled it for flush.
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1"));
listener->MarkManualFlushStart();
// Cutoff increased to 3 in `OnMemTableSealed` callback.
ASSERT_OK(dbfull()->Flush(FlushOptions(), handles_[0]));
listener->MarkManualFlushEnd();

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);

ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2"));
// Cutoff increased to 5 in `OnMemtableSealed` callback.
listener->MarkManualFlushStart();
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[0], nullptr,
nullptr));
listener->MarkManualFlushEnd();

ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low);

// There are two attempts to increase cutoff timestamp, one for each manual
// compaction.
ASSERT_EQ(listener->increase_cutoff_count_.load(), 2);
Close();
}

// The user explicitly increase cutoff timestamp in the `OnManualFlushScheduled`
// callback. It's suitable for when the user already knows an effective cutoff
// timestamp to let the flush proceed.
class ManualFlushScheduledEventListener : public EventListener {
private:
std::vector<ColumnFamilyHandle*> handles_;
// this is a workaround to get a meaningful cutoff timestamp to use.
std::atomic<uint64_t> counter{0};

public:
void OnManualFlushScheduled(
DB* db, const std::vector<ManualFlushInfo>& manual_flush_info) override {
// This vector should always be 1 for non atomic flush case.
EXPECT_EQ(manual_flush_info.size(), 1);
EXPECT_EQ(manual_flush_info[0].cf_name, kDefaultColumnFamilyName);
if (counter.load() == 0) {
EXPECT_EQ(manual_flush_info[0].flush_reason, FlushReason::kManualFlush);
// An error indicates others have already set the cutoff to a higher
// point, so it's OK to proceed.
db->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3))
.PermitUncheckedError();
} else if (counter.load() == 1) {
EXPECT_EQ(manual_flush_info[0].flush_reason,
FlushReason::kManualCompaction);
db->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(5))
.PermitUncheckedError();
}
counter.fetch_add(1);
}

void PopulateHandles(std::vector<ColumnFamilyHandle*> handles) {
handles_ = handles;
}
};

TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffOnManualFlushScheduledCb) {
std::shared_ptr<ManualFlushScheduledEventListener> listener =
std::make_shared<ManualFlushScheduledEventListener>();
db_options_.listeners.push_back(listener);
const int kNumEntriesPerMemTable = 2;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(kNumEntriesPerMemTable - 1));
// Make sure there is no memory pressure to not retain udts.
column_family_options_.max_write_buffer_number = 8;
Open();

listener->PopulateHandles(handles_);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1"));
ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1"));
// Created the first memtable and scheduled it for flush.
ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1"));
// Cutoff increased to 3 in the `OnManualFlushScheduled` callback.
ASSERT_OK(dbfull()->Flush(FlushOptions(), handles_[0]));

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);

ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2"));
// Cutoff increased to 5 in the `OnManualFlushScheduled` callback.
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[0], nullptr,
nullptr));

ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low);
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
std::string cutoff_ts;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the initial
// FlushRequest is rescheduled
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(
db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
Expand All @@ -3780,11 +3947,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
SyncPoint::GetInstance()->EnableProcessing();

Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1"));
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1)));
// Not all keys expired, and there is no risk of write stall. Flush is
// rescheduled. The actual flush happens after `full_history_ts_low` is
// increased to mark all keys expired.
Expand All @@ -3794,7 +3958,7 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,9 @@ class DBImpl : public DB {
Status RenameTempFileToOptionsFile(const std::string& file_name);
Status DeleteObsoleteOptionsFiles();

void NotifyOnManualFlushScheduled(autovector<ColumnFamilyData*> cfds,
FlushReason flush_reason);

void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, FlushReason flush_reason);
Expand Down
20 changes: 20 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,23 @@ void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
}
}

void DBImpl::NotifyOnManualFlushScheduled(autovector<ColumnFamilyData*> cfds,
FlushReason flush_reason) {
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
std::vector<ManualFlushInfo> info;
for (ColumnFamilyData* cfd : cfds) {
info.push_back({cfd->GetID(), cfd->GetName(), flush_reason});
}
for (const auto& listener : immutable_db_options_.listeners) {
listener->OnManualFlushScheduled(this, info);
}
}

Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason,
Expand Down Expand Up @@ -2356,6 +2373,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
}

NotifyOnManualFlushScheduled({cfd}, flush_reason);
TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
if (s.ok() && flush_options.wait) {
Expand Down Expand Up @@ -2500,6 +2519,7 @@ Status DBImpl::AtomicFlushMemTables(
}
}
}
NotifyOnManualFlushScheduled(cfds, flush_reason);
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
if (s.ok() && flush_options.wait) {
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2239,6 +2239,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
memtable_info.num_entries = cfd->mem()->num_entries();
memtable_info.num_deletes = cfd->mem()->num_deletes();
if (!cfd->ioptions()->persist_user_defined_timestamps &&
cfd->user_comparator()->timestamp_size() > 0) {
const Slice& newest_udt = cfd->mem()->GetNewestUDT();
memtable_info.newest_udt.assign(newest_udt.data(), newest_udt.size());
}
// Log this later after lock release. It may be outdated, e.g., if background
// flush happens before logging, but that should be ok.
int num_imm_unflushed = cfd->imm()->NumNotFlushed();
Expand Down
Loading

0 comments on commit a6be066

Please sign in to comment.