From 44aceb88d0de120847719c061aa3a8465daaee48 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Thu, 6 Jun 2024 17:29:01 -0700 Subject: [PATCH] Add a OnManualFlushScheduled callback in event listener (#12631) Summary: As titled. Also added the newest user-defined timestamp into the `MemTableInfo`. This can be a useful info in the callback. Added some unit tests as examples for how users can use two separate approaches to allow manual flush / manual compactions to go through when the user-defined timestamps in memtable only feature is enabled. One approach relies on selectively increase cutoff timestamp in `OnMemtableSeal` callback when it's initiated by a manual flush. Another approach is to increase cutoff timestamp in `OnManualFlushScheduled` callback. The caveats of the approaches are also documented in the unit test. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12631 Reviewed By: ajkr Differential Revision: D58260528 Pulled By: jowlyzhang fbshipit-source-id: bf446d7140affdf124744095e0a179fa6e427532 --- db/column_family_test.cc | 228 +++++++++++++++++++++---- db/db_impl/db_impl.h | 3 + db/db_impl/db_impl_compaction_flush.cc | 20 +++ db/db_impl/db_impl_write.cc | 5 + include/rocksdb/listener.h | 21 +++ 5 files changed, 245 insertions(+), 32 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 90f66077cea..02ed3b0049f 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -23,6 +23,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" +#include "rocksdb/listener.h" #include "rocksdb/utilities/object_registry.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" @@ -35,6 +36,14 @@ namespace ROCKSDB_NAMESPACE { +namespace { +std::string EncodeAsUint64(uint64_t v) { + std::string dst; + PutFixed64(&dst, v); + return dst; +} +} // namespace + static const int kValueSize = 1000; // counts how many operations were performed @@ -3674,21 +3683,17 @@ TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) { SyncPoint::GetInstance()->EnableProcessing(); Open(); - std::string write_ts; - PutFixed64(&write_ts, 1); - ASSERT_OK(Put(0, "foo", write_ts, "v1")); + ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1")); // No `full_history_ts_low` explicitly set by user, flush is continued // without checking if its UDTs expired. ASSERT_OK(Flush(0)); // After flush, `full_history_ts_low` should be automatically advanced to // the effective cutoff timestamp: write_ts + 1 - std::string cutoff_ts; - PutFixed64(&cutoff_ts, 2); std::string effective_full_history_ts_low; ASSERT_OK( db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); - ASSERT_EQ(cutoff_ts, effective_full_history_ts_low); + ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low); Close(); SyncPoint::GetInstance()->DisableProcessing(); @@ -3705,12 +3710,8 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) { SyncPoint::GetInstance()->EnableProcessing(); Open(); - std::string write_ts; - PutFixed64(&write_ts, 1); - ASSERT_OK(Put(0, "foo", write_ts, "v1")); - std::string cutoff_ts; - PutFixed64(&cutoff_ts, 3); - ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts)); + ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1")); + ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3))); // All keys expired w.r.t the configured `full_history_ts_low`, flush continue // without the need for a re-schedule. ASSERT_OK(Flush(0)); @@ -3719,13 +3720,13 @@ TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) { std::string effective_full_history_ts_low; ASSERT_OK( db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); - ASSERT_EQ(cutoff_ts, effective_full_history_ts_low); + ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low); Close(); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } -TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) { +TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffInMemtableSealCb) { SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) { ASSERT_NE(nullptr, arg); @@ -3735,12 +3736,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) { SyncPoint::GetInstance()->EnableProcessing(); Open(); - std::string cutoff_ts; - std::string write_ts; - PutFixed64(&write_ts, 1); - ASSERT_OK(Put(0, "foo", write_ts, "v1")); - PutFixed64(&cutoff_ts, 1); - ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts)); + ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1")); + ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1))); ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}})); // Not all keys expired, but flush is continued without a re-schedule because // of risk of write stall. @@ -3752,24 +3749,194 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) { ASSERT_OK( db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); - cutoff_ts.clear(); - PutFixed64(&cutoff_ts, 2); - ASSERT_EQ(cutoff_ts, effective_full_history_ts_low); + ASSERT_EQ(EncodeAsUint64(2), effective_full_history_ts_low); Close(); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } +// The user selectively increase cutoff timestamp in the `OnMemtableSealed` +// callback when it is invoked during a manual flush. It's suitable for when the +// user does not know an effective new cutoff timestamp and the callback will +// provide this info. +// The caveat of this approach is that the user need to track when manual flush +// is ongoing. In this example listener, the `manual_flush_count_` variable is +// for this purpose, it's designed to be a counter to allow concurrent manual +// flush to control the increase cutoff timestamp behavior independently. +// Also, a lot of operations can indirectly cause a manual flush, such as +// manual compaction/file ingestion. And the user needs to +// explicitly track each of such operation. So this callback is not ideal. Check +// out below `ManualFlushScheduledEventListener` for a different approach. +class MemtableSealEventListener : public EventListener { + private: + DB* db_; + std::vector handles_; + std::atomic manual_flush_count_{0}; + + public: + std::atomic memtable_seal_count_{0}; + std::atomic increase_cutoff_count_{0}; + + void OnMemTableSealed(const MemTableInfo& info) override { + memtable_seal_count_.fetch_add(1); + if (manual_flush_count_.load() == 0) { + return; + } + if (!info.newest_udt.empty()) { + uint64_t int_newest_udt = 0; + Slice udt_slice = info.newest_udt; + Status s = DecodeU64Ts(udt_slice, &int_newest_udt); + if (!s.ok()) { + return; + } + // An error indicates others have already set the cutoff to a higher + // point, so it's OK to proceed. + db_->IncreaseFullHistoryTsLow(handles_[0], + EncodeAsUint64(int_newest_udt + 1)) + .PermitUncheckedError(); + increase_cutoff_count_.fetch_add(1); + } + } + + void PopulateDBAndHandles(DB* db, std::vector handles) { + db_ = db; + handles_ = handles; + } + + void MarkManualFlushStart() { manual_flush_count_.fetch_add(1); } + + void MarkManualFlushEnd() { manual_flush_count_.fetch_sub(1); } +}; + +TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffOnMemtableSealedCb) { + std::shared_ptr listener = + std::make_shared(); + db_options_.listeners.push_back(listener); + const int kNumEntriesPerMemTable = 2; + column_family_options_.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumEntriesPerMemTable - 1)); + // Make sure there is no memory pressure to not retain udts. + column_family_options_.max_write_buffer_number = 8; + Open(); + + listener->PopulateDBAndHandles(db_, handles_); + ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1))); + ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1")); + ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1")); + // Event listener not attempt to increase cutoff timestamp if there is no + // manual flush going on. + ASSERT_EQ(listener->memtable_seal_count_.load(), 1); + ASSERT_EQ(listener->increase_cutoff_count_.load(), 0); + + // Created the first memtable and scheduled it for flush. + ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1")); + listener->MarkManualFlushStart(); + // Cutoff increased to 3 in `OnMemTableSealed` callback. + ASSERT_OK(dbfull()->Flush(FlushOptions(), handles_[0])); + listener->MarkManualFlushEnd(); + + std::string effective_full_history_ts_low; + ASSERT_OK( + db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); + ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low); + + ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2")); + // Cutoff increased to 5 in `OnMemtableSealed` callback. + listener->MarkManualFlushStart(); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[0], nullptr, + nullptr)); + listener->MarkManualFlushEnd(); + + ASSERT_OK( + db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); + ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low); + + // There are two attempts to increase cutoff timestamp, one for each manual + // compaction. + ASSERT_EQ(listener->increase_cutoff_count_.load(), 2); + Close(); +} + +// The user explicitly increase cutoff timestamp in the `OnManualFlushScheduled` +// callback. It's suitable for when the user already knows an effective cutoff +// timestamp to let the flush proceed. +class ManualFlushScheduledEventListener : public EventListener { + private: + std::vector handles_; + // this is a workaround to get a meaningful cutoff timestamp to use. + std::atomic counter{0}; + + public: + void OnManualFlushScheduled( + DB* db, const std::vector& manual_flush_info) override { + // This vector should always be 1 for non atomic flush case. + EXPECT_EQ(manual_flush_info.size(), 1); + EXPECT_EQ(manual_flush_info[0].cf_name, kDefaultColumnFamilyName); + if (counter.load() == 0) { + EXPECT_EQ(manual_flush_info[0].flush_reason, FlushReason::kManualFlush); + // An error indicates others have already set the cutoff to a higher + // point, so it's OK to proceed. + db->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3)) + .PermitUncheckedError(); + } else if (counter.load() == 1) { + EXPECT_EQ(manual_flush_info[0].flush_reason, + FlushReason::kManualCompaction); + db->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(5)) + .PermitUncheckedError(); + } + counter.fetch_add(1); + } + + void PopulateHandles(std::vector handles) { + handles_ = handles; + } +}; + +TEST_F(ColumnFamilyRetainUDTTest, IncreaseCutoffOnManualFlushScheduledCb) { + std::shared_ptr listener = + std::make_shared(); + db_options_.listeners.push_back(listener); + const int kNumEntriesPerMemTable = 2; + column_family_options_.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumEntriesPerMemTable - 1)); + // Make sure there is no memory pressure to not retain udts. + column_family_options_.max_write_buffer_number = 8; + Open(); + + listener->PopulateHandles(handles_); + ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1))); + ASSERT_OK(Put(0, "bar", EncodeAsUint64(2), "v1")); + ASSERT_OK(Put(0, "baz", EncodeAsUint64(2), "v1")); + // Created the first memtable and scheduled it for flush. + ASSERT_OK(Put(0, "foo", EncodeAsUint64(2), "v1")); + // Cutoff increased to 3 in the `OnManualFlushScheduled` callback. + ASSERT_OK(dbfull()->Flush(FlushOptions(), handles_[0])); + + std::string effective_full_history_ts_low; + ASSERT_OK( + db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); + ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low); + + ASSERT_OK(Put(0, "foo", EncodeAsUint64(4), "v2")); + // Cutoff increased to 5 in the `OnManualFlushScheduled` callback. + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[0], nullptr, + nullptr)); + + ASSERT_OK( + db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); + ASSERT_EQ(EncodeAsUint64(5), effective_full_history_ts_low); + Close(); +} + TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) { std::string cutoff_ts; SyncPoint::GetInstance()->SetCallBack( "DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) { // Increasing full_history_ts_low so all keys expired after the initial // FlushRequest is rescheduled - cutoff_ts.clear(); - PutFixed64(&cutoff_ts, 3); - ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts)); + ASSERT_OK( + db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(3))); }); SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) { @@ -3780,11 +3947,8 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) { SyncPoint::GetInstance()->EnableProcessing(); Open(); - std::string write_ts; - PutFixed64(&write_ts, 1); - ASSERT_OK(Put(0, "foo", write_ts, "v1")); - PutFixed64(&cutoff_ts, 1); - ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts)); + ASSERT_OK(Put(0, "foo", EncodeAsUint64(1), "v1")); + ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], EncodeAsUint64(1))); // Not all keys expired, and there is no risk of write stall. Flush is // rescheduled. The actual flush happens after `full_history_ts_low` is // increased to mark all keys expired. @@ -3794,7 +3958,7 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) { ASSERT_OK( db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low)); // `full_history_ts_low` stays unchanged. - ASSERT_EQ(cutoff_ts, effective_full_history_ts_low); + ASSERT_EQ(EncodeAsUint64(3), effective_full_history_ts_low); Close(); SyncPoint::GetInstance()->DisableProcessing(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index b45f69b3090..b22152cb9b7 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1460,6 +1460,9 @@ class DBImpl : public DB { Status RenameTempFileToOptionsFile(const std::string& file_name); Status DeleteObsoleteOptionsFiles(); + void NotifyOnManualFlushScheduled(autovector cfds, + FlushReason flush_reason); + void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, int job_id, FlushReason flush_reason); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 3eef4c5fdfb..f2ce88a86d7 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2240,6 +2240,23 @@ void DBImpl::GenerateFlushRequest(const autovector& cfds, } } +void DBImpl::NotifyOnManualFlushScheduled(autovector cfds, + FlushReason flush_reason) { + if (immutable_db_options_.listeners.size() == 0U) { + return; + } + if (shutting_down_.load(std::memory_order_acquire)) { + return; + } + std::vector info; + for (ColumnFamilyData* cfd : cfds) { + info.push_back({cfd->GetID(), cfd->GetName(), flush_reason}); + } + for (const auto& listener : immutable_db_options_.listeners) { + listener->OnManualFlushScheduled(this, info); + } +} + Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& flush_options, FlushReason flush_reason, @@ -2356,6 +2373,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, } } } + + NotifyOnManualFlushScheduled({cfd}, flush_reason); TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush"); TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush"); if (s.ok() && flush_options.wait) { @@ -2500,6 +2519,7 @@ Status DBImpl::AtomicFlushMemTables( } } } + NotifyOnManualFlushScheduled(cfds, flush_reason); TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush"); TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"); if (s.ok() && flush_options.wait) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index ed95549b0ac..de2b8f75079 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -2239,6 +2239,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber(); memtable_info.num_entries = cfd->mem()->num_entries(); memtable_info.num_deletes = cfd->mem()->num_deletes(); + if (!cfd->ioptions()->persist_user_defined_timestamps && + cfd->user_comparator()->timestamp_size() > 0) { + const Slice& newest_udt = cfd->mem()->GetNewestUDT(); + memtable_info.newest_udt.assign(newest_udt.data(), newest_udt.size()); + } // Log this later after lock release. It may be outdated, e.g., if background // flush happens before logging, but that should be ok. int num_imm_unflushed = cfd->imm()->NumNotFlushed(); diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 452ae54cdfb..8911bc299b0 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -328,6 +328,15 @@ struct BlobFileGarbageInfo : public BlobFileInfo { uint64_t garbage_blob_bytes; }; +struct ManualFlushInfo { + // the id of the column family + uint32_t cf_id; + // the name of the column family + std::string cf_name; + // Reason that triggered this manual flush + FlushReason flush_reason; +}; + struct FlushJobInfo { // the id of the column family uint32_t cf_id; @@ -492,6 +501,10 @@ struct MemTableInfo { uint64_t num_entries; // Total number of deletes in memtable uint64_t num_deletes; + + // The newest user-defined timestamps in the memtable. Note this field is + // only populated when `persist_user_defined_timestamps` is false. + std::string newest_udt; }; struct ExternalFileIngestionInfo { @@ -595,6 +608,14 @@ class EventListener : public Customizable { virtual void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*flush_job_info*/) {} + // A callback function to RocksDB which will be called after a manual flush + // is scheduled. The default implementation is no-op. + // The size of the `manual_flush_info` vector should only be bigger than 1 if + // the DB enables atomic flush and has more than 1 column families. Its size + // should be 1 in all other cases. + virtual void OnManualFlushScheduled( + DB* /*db*/, const std::vector& /*manual_flush_info*/) {} + // A callback function for RocksDB which will be called whenever // a SST file is deleted. Different from OnCompactionCompleted and // OnFlushCompleted, this callback is designed for external logging