From 6e5a8e9ae4395800a9d10396b7d408ef51602760 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Fri, 31 Mar 2023 15:28:13 -0700 Subject: [PATCH] draft --- CMakeLists.txt | 1 + TARGETS | 2 ++ db/builder.cc | 1 + db/compaction/compaction_job.cc | 9 +++++- db/db_impl/db_impl_open.cc | 3 ++ db/db_impl/db_impl_write.cc | 2 +- db/flush_job.cc | 3 +- db/perf_context_test.cc | 3 +- db/version_builder.cc | 5 +++- file/file_util.h | 2 ++ file/random_access_file_reader.cc | 23 +++++++++++---- include/rocksdb/env.h | 7 +++++ include/rocksdb/file_system.h | 2 ++ include/rocksdb/options.h | 2 ++ include/rocksdb/statistics.h | 3 ++ java/rocksjni/portal.h | 8 +++++ .../main/java/org/rocksdb/HistogramType.java | 4 +++ monitoring/statistics.cc | 2 ++ src.mk | 1 + table/block_based/block_based_table_reader.cc | 1 + util/stop_watch.h | 29 ++++++++++++------- util/thread_io_activity.cc | 10 +++++++ util/thread_io_activity.h | 22 ++++++++++++++ 23 files changed, 125 insertions(+), 20 deletions(-) create mode 100644 util/thread_io_activity.cc create mode 100644 util/thread_io_activity.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 598c7281543e..4a87bc4f63a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -895,6 +895,7 @@ set(SOURCES util/stderr_logger.cc util/string_util.cc util/thread_local.cc + util/thread_io_activity.cc util/threadpool_imp.cc util/xxhash.cc utilities/agg_merge/agg_merge.cc diff --git a/TARGETS b/TARGETS index 2514e09a7cd8..5a3e8477a6be 100644 --- a/TARGETS +++ b/TARGETS @@ -262,6 +262,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "util/status.cc", "util/stderr_logger.cc", "util/string_util.cc", + "util/thread_io_activity.cc", "util/thread_local.cc", "util/threadpool_imp.cc", "util/xxhash.cc", @@ -608,6 +609,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "util/status.cc", "util/stderr_logger.cc", "util/string_util.cc", + "util/thread_io_activity.cc", "util/thread_local.cc", "util/threadpool_imp.cc", "util/xxhash.cc", diff --git a/db/builder.cc b/db/builder.cc index b86dd6b9ce53..23fa39ee9308 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -370,6 +370,7 @@ Status BuildTable( // No matter whether use_direct_io_for_flush_and_compaction is true, // the goal is to cache it here for further user reads. ReadOptions read_options; + read_options.io_activity = Env::IOActivity::kFlush; std::unique_ptr it(table_cache->NewIterator( read_options, file_options, tboptions.internal_comparator, *meta, nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 331be915e216..9f5f6dfc9b1a 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -56,6 +56,7 @@ #include "table/unique_id_impl.h" #include "test_util/sync_point.h" #include "util/stop_watch.h" +#include "util/thread_io_activity.h" namespace ROCKSDB_NAMESPACE { @@ -617,6 +618,7 @@ Status CompactionJob::Run() { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_RUN); TEST_SYNC_POINT("CompactionJob::Run():Start"); + ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kCompaction); log_buffer_->FlushBufferToLog(); LogCompaction(); @@ -710,6 +712,8 @@ Status CompactionJob::Run() { compact_->compaction->mutable_cf_options()->prefix_extractor; std::atomic next_file_idx(0); auto verify_table = [&](Status& output_status) { + ThreadIOActivityGuard verify_table_thread_io_activity_guard( + Env::IOActivity::kCompaction); while (true) { size_t file_idx = next_file_idx.fetch_add(1); if (file_idx >= files_output.size()) { @@ -723,6 +727,8 @@ Status CompactionJob::Run() { // verification as user reads since the goal is to cache it here for // further user reads ReadOptions read_options; + read_options.io_activity = Env::IOActivity::kCompaction; + InternalIterator* iter = cfd->table_cache()->NewIterator( read_options, file_options_, cfd->internal_comparator(), files_output[file_idx]->meta, /*range_del_agg=*/nullptr, @@ -1032,7 +1038,7 @@ void CompactionJob::NotifyOnSubcompactionCompleted( void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact); assert(sub_compact->compaction); - + ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kCompaction); if (db_options_.compaction_service) { CompactionServiceJobStatus comp_status = ProcessKeyValueCompactionWithCompactionService(sub_compact); @@ -1083,6 +1089,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { read_options.verify_checksums = true; read_options.fill_cache = false; read_options.rate_limiter_priority = GetRateLimiterPriority(); + read_options.io_activity = Env::IOActivity::kCompaction; // Compaction iterators shouldn't be confined to a single prefix. // Compactions use Seek() for // (a) concurrent compactions, diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 478384c919fa..6f9a5067abbc 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -24,6 +24,7 @@ #include "rocksdb/wal_filter.h" #include "test_util/sync_point.h" #include "util/rate_limiter.h" +#include "util/thread_io_activity.h" namespace ROCKSDB_NAMESPACE { Options SanitizeOptions(const std::string& dbname, const Options& src, @@ -1557,6 +1558,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit) { mutex_.AssertHeld(); + ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kFlush); assert(cfd); assert(cfd->imm()); // The immutable memtable list must be empty. @@ -1700,6 +1702,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, InternalStats::BYTES_FLUSHED, stats.bytes_written + stats.bytes_written_blob); RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); + thread_io_activity = Env::IOActivity::kUnknown; return s; } diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 89a054e4c098..abf68fe8dd45 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1805,7 +1805,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread, bool delayed = false; { StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL, - &time_delayed); + Histograms::HISTOGRAM_ENUM_MAX, &time_delayed); // To avoid parallel timed delays (bad throttling), only support them // on the primary write queue. uint64_t delay; diff --git a/db/flush_job.cc b/db/flush_job.cc index 8193f594f8b4..5f6b769e5cf4 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -46,6 +46,7 @@ #include "util/coding.h" #include "util/mutexlock.h" #include "util/stop_watch.h" +#include "util/thread_io_activity.h" namespace ROCKSDB_NAMESPACE { @@ -211,6 +212,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, bool* switched_to_mempurge) { TEST_SYNC_POINT("FlushJob::Start"); db_mutex_->AssertHeld(); + ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kFlush); assert(pick_memtable_called); // Mempurge threshold can be dynamically changed. // For sake of consistency, mempurge_threshold is @@ -350,7 +352,6 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, stream << "file_cpu_read_nanos" << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos); } - return s; } diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 3e78dbe273ce..bb8691b969e0 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -187,7 +187,8 @@ TEST_F(PerfContextTest, StopWatchOverhead) { uint64_t elapsed = 0; std::vector timings(kTotalIterations); - StopWatch timer(SystemClock::Default().get(), nullptr, 0, &elapsed); + StopWatch timer(SystemClock::Default().get(), nullptr, 0, + Histograms::HISTOGRAM_ENUM_MAX, &elapsed); for (auto& timing : timings) { timing = elapsed; } diff --git a/db/version_builder.cc b/db/version_builder.cc index 4f0e3a8413c0..0a7aa64b6e89 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -33,6 +33,7 @@ #include "port/port.h" #include "table/table_reader.h" #include "util/string_util.h" +#include "util/thread_io_activity.h" namespace ROCKSDB_NAMESPACE { @@ -1323,8 +1324,10 @@ class VersionBuilder::Rep { auto* file_meta = files_meta[file_idx].first; int level = files_meta[file_idx].second; TableCache::TypedHandle* handle = nullptr; + ReadOptions read_options; + read_options.io_activity = thread_io_activity; statuses[file_idx] = table_cache_->FindTable( - ReadOptions(), file_options_, + read_options, file_options_, *(base_vstorage_->InternalComparator()), *file_meta, &handle, prefix_extractor, false /*no_io */, true /* record_read_stats */, internal_stats->GetFileReadHist(level), false, level, diff --git a/file/file_util.h b/file/file_util.h index d46a7ba0e1a9..e279cfba0abc 100644 --- a/file/file_util.h +++ b/file/file_util.h @@ -80,6 +80,8 @@ inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, } opts.rate_limiter_priority = ro.rate_limiter_priority; + opts.io_activity = ro.io_activity; + return IOStatus::OK(); } diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 226970641da6..8c4d40009415 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -20,9 +20,14 @@ #include "test_util/sync_point.h" #include "util/random.h" #include "util/rate_limiter.h" +#include "util/thread_io_activity.h" namespace ROCKSDB_NAMESPACE { - +const std::array + kReadHistograms{{ + SST_READ_FLUSH_MICROS, + SST_READ_COMPACTION_MICROS, + }}; inline void RecordIOStats(Statistics* stats, Temperature file_temperature, bool is_last_level, size_t size) { IOSTATS_ADD(bytes_read, size); @@ -94,6 +99,9 @@ IOStatus RandomAccessFileReader::Read( uint64_t elapsed = 0; { StopWatch sw(clock_, stats_, hist_type_, + (opts.io_activity != Env::IOActivity::kUnknown) + ? kReadHistograms[(std::size_t)(opts.io_activity)] + : Histograms::HISTOGRAM_ENUM_MAX, (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, true /*delay_enabled*/); auto prev_perf_level = GetPerfLevel(); @@ -288,6 +296,9 @@ IOStatus RandomAccessFileReader::MultiRead( uint64_t elapsed = 0; { StopWatch sw(clock_, stats_, hist_type_, + (opts.io_activity != Env::IOActivity::kUnknown) + ? kReadHistograms[(std::size_t)(opts.io_activity)] + : Histograms::HISTOGRAM_ENUM_MAX, (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, true /*delay_enabled*/); auto prev_perf_level = GetPerfLevel(); @@ -476,13 +487,15 @@ IOStatus RandomAccessFileReader::ReadAsync( assert(read_async_info->buf_.CurrentSize() == 0); - StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed, - true /*overwrite*/, true /*delay_enabled*/); + StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, + Histograms::HISTOGRAM_ENUM_MAX, &elapsed, true /*overwrite*/, + true /*delay_enabled*/); s = file_->ReadAsync(aligned_req, opts, read_async_callback, read_async_info, io_handle, del_fn, nullptr /*dbg*/); } else { - StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed, - true /*overwrite*/, true /*delay_enabled*/); + StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, + Histograms::HISTOGRAM_ENUM_MAX, &elapsed, true /*overwrite*/, + true /*delay_enabled*/); s = file_->ReadAsync(req, opts, read_async_callback, read_async_info, io_handle, del_fn, nullptr /*dbg*/); } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 62af602c6294..dcd52dcc895c 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -436,6 +436,13 @@ class Env : public Customizable { IO_TOTAL = 4 }; + // EXPERIMENTAL + enum class IOActivity : uint8_t { + kFlush = 0, + kCompaction = 1, + kUnknown = 2, + }; + // Arrange to run "(*function)(arg)" once in a background thread, in // the thread pool specified by pri. By default, jobs go to the 'LOW' // priority thread pool. diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 97b21e286e6b..ae59ef8009cc 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -116,6 +116,8 @@ struct IOOptions { // directories and list only files in GetChildren API. bool do_not_recurse; + Env::IOActivity io_activity = Env::IOActivity::kUnknown; + IOOptions() : IOOptions(false) {} explicit IOOptions(bool force_dir_fsync_) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 669afc1d49f2..6e91af880d8b 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1696,6 +1696,8 @@ struct ReadOptions { // Default: true bool optimize_multiget_for_io; + Env::IOActivity io_activity = Env::IOActivity::kUnknown; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index c10c67919023..3ed1b61032f1 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -467,6 +467,9 @@ enum Histograms : uint32_t { DB_SEEK, WRITE_STALL, SST_READ_MICROS, + SST_READ_FLUSH_MICROS, + SST_READ_COMPACTION_MICROS, + // The number of subcompactions actually scheduled during a compaction NUM_SUBCOMPACTIONS_SCHEDULED, // Value size distribution in each operation diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index ee87f89472ca..34bc79a8a847 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -5619,6 +5619,10 @@ class HistogramTypeJni { return 0x38; case ROCKSDB_NAMESPACE::Histograms::TABLE_OPEN_PREFETCH_TAIL_READ_BYTES: return 0x39; + case ROCKSDB_NAMESPACE::Histograms::SST_READ_FLUSH_MICROS: + return 0x3A; + case ROCKSDB_NAMESPACE::Histograms::SST_READ_COMPACTION_MICROS: + return 0x3B; case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX: // 0x1F for backwards compatibility on current minor version. return 0x1F; @@ -5738,6 +5742,10 @@ class HistogramTypeJni { case 0x39: return ROCKSDB_NAMESPACE::Histograms:: TABLE_OPEN_PREFETCH_TAIL_READ_BYTES; + case 0x3A: + return ROCKSDB_NAMESPACE::Histograms::SST_READ_FLUSH_MICROS; + case 0x3B: + return ROCKSDB_NAMESPACE::Histograms::SST_READ_COMPACTION_MICROS; case 0x1F: // 0x1F for backwards compatibility on current minor version. return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX; diff --git a/java/src/main/java/org/rocksdb/HistogramType.java b/java/src/main/java/org/rocksdb/HistogramType.java index 20c54422c17e..4cd35570d113 100644 --- a/java/src/main/java/org/rocksdb/HistogramType.java +++ b/java/src/main/java/org/rocksdb/HistogramType.java @@ -169,6 +169,10 @@ public enum HistogramType { */ TABLE_OPEN_PREFETCH_TAIL_READ_BYTES((byte) 0x39), + SST_READ_FLUSH_MICROS((byte) 0x3A), + + SST_READ_COMPACTION_MICROS((byte) 0x3B), + // 0x1F for backwards compatibility on current minor version. HISTOGRAM_ENUM_MAX((byte) 0x1F); diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 206372c7c77f..d2454d5d42b7 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -240,6 +240,8 @@ const std::vector> HistogramsNameMap = { {DB_SEEK, "rocksdb.db.seek.micros"}, {WRITE_STALL, "rocksdb.db.write.stall"}, {SST_READ_MICROS, "rocksdb.sst.read.micros"}, + {SST_READ_FLUSH_MICROS, "rocksdb.sst.read.flush.micros"}, + {SST_READ_COMPACTION_MICROS, "rocksdb.sst.read.compaction.micros"}, {NUM_SUBCOMPACTIONS_SCHEDULED, "rocksdb.num.subcompactions.scheduled"}, {BYTES_PER_READ, "rocksdb.bytes.per.read"}, {BYTES_PER_WRITE, "rocksdb.bytes.per.write"}, diff --git a/src.mk b/src.mk index e1ab947a06c5..80ed27486946 100644 --- a/src.mk +++ b/src.mk @@ -250,6 +250,7 @@ LIB_SOURCES = \ util/stderr_logger.cc \ util/string_util.cc \ util/thread_local.cc \ + util/thread_io_activity.cc \ util/threadpool_imp.cc \ util/xxhash.cc \ utilities/agg_merge/agg_merge.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 6b0e2f158408..18cf6563610e 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -583,6 +583,7 @@ Status BlockBasedTable::Open( ro.io_timeout = read_options.io_timeout; ro.rate_limiter_priority = read_options.rate_limiter_priority; ro.verify_checksums = read_options.verify_checksums; + ro.io_activity = read_options.io_activity; // prefetch both index and filters, down to all partitions const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0; diff --git a/util/stop_watch.h b/util/stop_watch.h index e26380d97cb0..f2fce6933954 100644 --- a/util/stop_watch.h +++ b/util/stop_watch.h @@ -9,23 +9,28 @@ namespace ROCKSDB_NAMESPACE { // Auto-scoped. -// Records the measure time into the corresponding histogram if statistics +// Records the measure time into the corresponding histogram(s) if statistics // is not nullptr. It is also saved into *elapsed if the pointer is not nullptr // and overwrite is true, it will be added to *elapsed if overwrite is false. class StopWatch { public: StopWatch(SystemClock* clock, Statistics* statistics, - const uint32_t hist_type, uint64_t* elapsed = nullptr, - bool overwrite = true, bool delay_enabled = false) + const uint32_t hist_type_1, + const uint32_t hist_type_2 = Histograms::HISTOGRAM_ENUM_MAX, + uint64_t* elapsed = nullptr, bool overwrite = true, + bool delay_enabled = false) : clock_(clock), statistics_(statistics), - hist_type_(hist_type), + hist_type_1_(hist_type_1), + hist_type_2_(hist_type_2), elapsed_(elapsed), overwrite_(overwrite), stats_enabled_(statistics && statistics->get_stats_level() >= StatsLevel::kExceptTimers && - statistics->HistEnabledForType(hist_type)), + statistics->HistEnabledForType(hist_type_1) && + (hist_type_2 == Histograms::HISTOGRAM_ENUM_MAX || + statistics->HistEnabledForType(hist_type_2))), delay_enabled_(delay_enabled), total_delay_(0), delay_start_time_(0), @@ -44,10 +49,13 @@ class StopWatch { *elapsed_ -= total_delay_; } if (stats_enabled_) { - statistics_->reportTimeToHistogram( - hist_type_, (elapsed_ != nullptr) - ? *elapsed_ - : (clock_->NowMicros() - start_time_)); + const auto time = (elapsed_ != nullptr) + ? *elapsed_ + : (clock_->NowMicros() - start_time_); + statistics_->reportTimeToHistogram(hist_type_1_, time); + if (hist_type_2_ != Histograms::HISTOGRAM_ENUM_MAX) { + statistics_->reportTimeToHistogram(hist_type_2_, time); + } } } @@ -75,7 +83,8 @@ class StopWatch { private: SystemClock* clock_; Statistics* statistics_; - const uint32_t hist_type_; + const uint32_t hist_type_1_; + const uint32_t hist_type_2_; uint64_t* elapsed_; bool overwrite_; bool stats_enabled_; diff --git a/util/thread_io_activity.cc b/util/thread_io_activity.cc new file mode 100644 index 000000000000..cd89f746ccc4 --- /dev/null +++ b/util/thread_io_activity.cc @@ -0,0 +1,10 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "util/thread_io_activity.h" + +namespace ROCKSDB_NAMESPACE { +thread_local Env::IOActivity thread_io_activity = Env::IOActivity::kUnknown; +} // namespace ROCKSDB_NAMESPACE diff --git a/util/thread_io_activity.h b/util/thread_io_activity.h new file mode 100644 index 000000000000..f07239fc8fdd --- /dev/null +++ b/util/thread_io_activity.h @@ -0,0 +1,22 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { +extern thread_local Env::IOActivity thread_io_activity; + +class ThreadIOActivityGuard { + public: + ThreadIOActivityGuard(Env::IOActivity io_activity) { + thread_io_activity = io_activity; + } + + ~ThreadIOActivityGuard(){ + thread_io_activity = Env::IOActivity::kUnknown; + } +}; +} // namespace ROCKSDB_NAMESPACE