Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Mar 31, 2023
1 parent 646a708 commit 6e5a8e9
Show file tree
Hide file tree
Showing 23 changed files with 125 additions and 20 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ set(SOURCES
util/stderr_logger.cc
util/string_util.cc
util/thread_local.cc
util/thread_io_activity.cc
util/threadpool_imp.cc
util/xxhash.cc
utilities/agg_merge/agg_merge.cc
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/status.cc",
"util/stderr_logger.cc",
"util/string_util.cc",
"util/thread_io_activity.cc",
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/xxhash.cc",
Expand Down Expand Up @@ -608,6 +609,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"util/status.cc",
"util/stderr_logger.cc",
"util/string_util.cc",
"util/thread_io_activity.cc",
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/xxhash.cc",
Expand Down
1 change: 1 addition & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ Status BuildTable(
// No matter whether use_direct_io_for_flush_and_compaction is true,
// the goal is to cache it here for further user reads.
ReadOptions read_options;
read_options.io_activity = Env::IOActivity::kFlush;
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
read_options, file_options, tboptions.internal_comparator, *meta,
nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor,
Expand Down
9 changes: 8 additions & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
#include "util/stop_watch.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -617,6 +618,7 @@ Status CompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
TEST_SYNC_POINT("CompactionJob::Run():Start");
ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kCompaction);
log_buffer_->FlushBufferToLog();
LogCompaction();

Expand Down Expand Up @@ -710,6 +712,8 @@ Status CompactionJob::Run() {
compact_->compaction->mutable_cf_options()->prefix_extractor;
std::atomic<size_t> next_file_idx(0);
auto verify_table = [&](Status& output_status) {
ThreadIOActivityGuard verify_table_thread_io_activity_guard(
Env::IOActivity::kCompaction);
while (true) {
size_t file_idx = next_file_idx.fetch_add(1);
if (file_idx >= files_output.size()) {
Expand All @@ -723,6 +727,8 @@ Status CompactionJob::Run() {
// verification as user reads since the goal is to cache it here for
// further user reads
ReadOptions read_options;
read_options.io_activity = Env::IOActivity::kCompaction;

InternalIterator* iter = cfd->table_cache()->NewIterator(
read_options, file_options_, cfd->internal_comparator(),
files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
Expand Down Expand Up @@ -1032,7 +1038,7 @@ void CompactionJob::NotifyOnSubcompactionCompleted(
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);

ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kCompaction);
if (db_options_.compaction_service) {
CompactionServiceJobStatus comp_status =
ProcessKeyValueCompactionWithCompactionService(sub_compact);
Expand Down Expand Up @@ -1083,6 +1089,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
read_options.verify_checksums = true;
read_options.fill_cache = false;
read_options.rate_limiter_priority = GetRateLimiterPriority();
read_options.io_activity = Env::IOActivity::kCompaction;
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rocksdb/wal_filter.h"
#include "test_util/sync_point.h"
#include "util/rate_limiter.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {
Options SanitizeOptions(const std::string& dbname, const Options& src,
Expand Down Expand Up @@ -1557,6 +1558,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kFlush);
assert(cfd);
assert(cfd->imm());
// The immutable memtable list must be empty.
Expand Down Expand Up @@ -1700,6 +1702,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
InternalStats::BYTES_FLUSHED,
stats.bytes_written + stats.bytes_written_blob);
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
thread_io_activity = Env::IOActivity::kUnknown;
return s;
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1805,7 +1805,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
bool delayed = false;
{
StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL,
&time_delayed);
Histograms::HISTOGRAM_ENUM_MAX, &time_delayed);
// To avoid parallel timed delays (bad throttling), only support them
// on the primary write queue.
uint64_t delay;
Expand Down
3 changes: 2 additions & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -211,6 +212,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
bool* switched_to_mempurge) {
TEST_SYNC_POINT("FlushJob::Start");
db_mutex_->AssertHeld();
ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kFlush);
assert(pick_memtable_called);
// Mempurge threshold can be dynamically changed.
// For sake of consistency, mempurge_threshold is
Expand Down Expand Up @@ -350,7 +352,6 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
stream << "file_cpu_read_nanos"
<< (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
}

return s;
}

Expand Down
3 changes: 2 additions & 1 deletion db/perf_context_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ TEST_F(PerfContextTest, StopWatchOverhead) {
uint64_t elapsed = 0;
std::vector<uint64_t> timings(kTotalIterations);

StopWatch timer(SystemClock::Default().get(), nullptr, 0, &elapsed);
StopWatch timer(SystemClock::Default().get(), nullptr, 0,
Histograms::HISTOGRAM_ENUM_MAX, &elapsed);
for (auto& timing : timings) {
timing = elapsed;
}
Expand Down
5 changes: 4 additions & 1 deletion db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "port/port.h"
#include "table/table_reader.h"
#include "util/string_util.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -1323,8 +1324,10 @@ class VersionBuilder::Rep {
auto* file_meta = files_meta[file_idx].first;
int level = files_meta[file_idx].second;
TableCache::TypedHandle* handle = nullptr;
ReadOptions read_options;
read_options.io_activity = thread_io_activity;
statuses[file_idx] = table_cache_->FindTable(
ReadOptions(), file_options_,
read_options, file_options_,
*(base_vstorage_->InternalComparator()), *file_meta, &handle,
prefix_extractor, false /*no_io */, true /* record_read_stats */,
internal_stats->GetFileReadHist(level), false, level,
Expand Down
2 changes: 2 additions & 0 deletions file/file_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro,
}

opts.rate_limiter_priority = ro.rate_limiter_priority;
opts.io_activity = ro.io_activity;

return IOStatus::OK();
}

Expand Down
23 changes: 18 additions & 5 deletions file/random_access_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
#include "test_util/sync_point.h"
#include "util/random.h"
#include "util/rate_limiter.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {

const std::array<Histograms, std::size_t(Env::IOActivity::kUnknown)>
kReadHistograms{{
SST_READ_FLUSH_MICROS,
SST_READ_COMPACTION_MICROS,
}};
inline void RecordIOStats(Statistics* stats, Temperature file_temperature,
bool is_last_level, size_t size) {
IOSTATS_ADD(bytes_read, size);
Expand Down Expand Up @@ -94,6 +99,9 @@ IOStatus RandomAccessFileReader::Read(
uint64_t elapsed = 0;
{
StopWatch sw(clock_, stats_, hist_type_,
(opts.io_activity != Env::IOActivity::kUnknown)
? kReadHistograms[(std::size_t)(opts.io_activity)]
: Histograms::HISTOGRAM_ENUM_MAX,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
Expand Down Expand Up @@ -288,6 +296,9 @@ IOStatus RandomAccessFileReader::MultiRead(
uint64_t elapsed = 0;
{
StopWatch sw(clock_, stats_, hist_type_,
(opts.io_activity != Env::IOActivity::kUnknown)
? kReadHistograms[(std::size_t)(opts.io_activity)]
: Histograms::HISTOGRAM_ENUM_MAX,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
Expand Down Expand Up @@ -476,13 +487,15 @@ IOStatus RandomAccessFileReader::ReadAsync(

assert(read_async_info->buf_.CurrentSize() == 0);

StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed,
true /*overwrite*/, true /*delay_enabled*/);
StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/,
Histograms::HISTOGRAM_ENUM_MAX, &elapsed, true /*overwrite*/,
true /*delay_enabled*/);
s = file_->ReadAsync(aligned_req, opts, read_async_callback,
read_async_info, io_handle, del_fn, nullptr /*dbg*/);
} else {
StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed,
true /*overwrite*/, true /*delay_enabled*/);
StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/,
Histograms::HISTOGRAM_ENUM_MAX, &elapsed, true /*overwrite*/,
true /*delay_enabled*/);
s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
io_handle, del_fn, nullptr /*dbg*/);
}
Expand Down
7 changes: 7 additions & 0 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,13 @@ class Env : public Customizable {
IO_TOTAL = 4
};

// EXPERIMENTAL
enum class IOActivity : uint8_t {
kFlush = 0,
kCompaction = 1,
kUnknown = 2,
};

// Arrange to run "(*function)(arg)" once in a background thread, in
// the thread pool specified by pri. By default, jobs go to the 'LOW'
// priority thread pool.
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ struct IOOptions {
// directories and list only files in GetChildren API.
bool do_not_recurse;

Env::IOActivity io_activity = Env::IOActivity::kUnknown;

IOOptions() : IOOptions(false) {}

explicit IOOptions(bool force_dir_fsync_)
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,8 @@ struct ReadOptions {
// Default: true
bool optimize_multiget_for_io;

Env::IOActivity io_activity = Env::IOActivity::kUnknown;

ReadOptions();
ReadOptions(bool cksum, bool cache);
};
Expand Down
3 changes: 3 additions & 0 deletions include/rocksdb/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ enum Histograms : uint32_t {
DB_SEEK,
WRITE_STALL,
SST_READ_MICROS,
SST_READ_FLUSH_MICROS,
SST_READ_COMPACTION_MICROS,

// The number of subcompactions actually scheduled during a compaction
NUM_SUBCOMPACTIONS_SCHEDULED,
// Value size distribution in each operation
Expand Down
8 changes: 8 additions & 0 deletions java/rocksjni/portal.h
Original file line number Diff line number Diff line change
Expand Up @@ -5619,6 +5619,10 @@ class HistogramTypeJni {
return 0x38;
case ROCKSDB_NAMESPACE::Histograms::TABLE_OPEN_PREFETCH_TAIL_READ_BYTES:
return 0x39;
case ROCKSDB_NAMESPACE::Histograms::SST_READ_FLUSH_MICROS:
return 0x3A;
case ROCKSDB_NAMESPACE::Histograms::SST_READ_COMPACTION_MICROS:
return 0x3B;
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
// 0x1F for backwards compatibility on current minor version.
return 0x1F;
Expand Down Expand Up @@ -5738,6 +5742,10 @@ class HistogramTypeJni {
case 0x39:
return ROCKSDB_NAMESPACE::Histograms::
TABLE_OPEN_PREFETCH_TAIL_READ_BYTES;
case 0x3A:
return ROCKSDB_NAMESPACE::Histograms::SST_READ_FLUSH_MICROS;
case 0x3B:
return ROCKSDB_NAMESPACE::Histograms::SST_READ_COMPACTION_MICROS;
case 0x1F:
// 0x1F for backwards compatibility on current minor version.
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;
Expand Down
4 changes: 4 additions & 0 deletions java/src/main/java/org/rocksdb/HistogramType.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ public enum HistogramType {
*/
TABLE_OPEN_PREFETCH_TAIL_READ_BYTES((byte) 0x39),

SST_READ_FLUSH_MICROS((byte) 0x3A),

SST_READ_COMPACTION_MICROS((byte) 0x3B),

// 0x1F for backwards compatibility on current minor version.
HISTOGRAM_ENUM_MAX((byte) 0x1F);

Expand Down
2 changes: 2 additions & 0 deletions monitoring/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{DB_SEEK, "rocksdb.db.seek.micros"},
{WRITE_STALL, "rocksdb.db.write.stall"},
{SST_READ_MICROS, "rocksdb.sst.read.micros"},
{SST_READ_FLUSH_MICROS, "rocksdb.sst.read.flush.micros"},
{SST_READ_COMPACTION_MICROS, "rocksdb.sst.read.compaction.micros"},
{NUM_SUBCOMPACTIONS_SCHEDULED, "rocksdb.num.subcompactions.scheduled"},
{BYTES_PER_READ, "rocksdb.bytes.per.read"},
{BYTES_PER_WRITE, "rocksdb.bytes.per.write"},
Expand Down
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ LIB_SOURCES = \
util/stderr_logger.cc \
util/string_util.cc \
util/thread_local.cc \
util/thread_io_activity.cc \
util/threadpool_imp.cc \
util/xxhash.cc \
utilities/agg_merge/agg_merge.cc \
Expand Down
1 change: 1 addition & 0 deletions table/block_based/block_based_table_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ Status BlockBasedTable::Open(
ro.io_timeout = read_options.io_timeout;
ro.rate_limiter_priority = read_options.rate_limiter_priority;
ro.verify_checksums = read_options.verify_checksums;
ro.io_activity = read_options.io_activity;

// prefetch both index and filters, down to all partitions
const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0;
Expand Down
29 changes: 19 additions & 10 deletions util/stop_watch.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,28 @@

namespace ROCKSDB_NAMESPACE {
// Auto-scoped.
// Records the measure time into the corresponding histogram if statistics
// Records the measure time into the corresponding histogram(s) if statistics
// is not nullptr. It is also saved into *elapsed if the pointer is not nullptr
// and overwrite is true, it will be added to *elapsed if overwrite is false.
class StopWatch {
public:
StopWatch(SystemClock* clock, Statistics* statistics,
const uint32_t hist_type, uint64_t* elapsed = nullptr,
bool overwrite = true, bool delay_enabled = false)
const uint32_t hist_type_1,
const uint32_t hist_type_2 = Histograms::HISTOGRAM_ENUM_MAX,
uint64_t* elapsed = nullptr, bool overwrite = true,
bool delay_enabled = false)
: clock_(clock),
statistics_(statistics),
hist_type_(hist_type),
hist_type_1_(hist_type_1),
hist_type_2_(hist_type_2),
elapsed_(elapsed),
overwrite_(overwrite),
stats_enabled_(statistics &&
statistics->get_stats_level() >=
StatsLevel::kExceptTimers &&
statistics->HistEnabledForType(hist_type)),
statistics->HistEnabledForType(hist_type_1) &&
(hist_type_2 == Histograms::HISTOGRAM_ENUM_MAX ||
statistics->HistEnabledForType(hist_type_2))),
delay_enabled_(delay_enabled),
total_delay_(0),
delay_start_time_(0),
Expand All @@ -44,10 +49,13 @@ class StopWatch {
*elapsed_ -= total_delay_;
}
if (stats_enabled_) {
statistics_->reportTimeToHistogram(
hist_type_, (elapsed_ != nullptr)
? *elapsed_
: (clock_->NowMicros() - start_time_));
const auto time = (elapsed_ != nullptr)
? *elapsed_
: (clock_->NowMicros() - start_time_);
statistics_->reportTimeToHistogram(hist_type_1_, time);
if (hist_type_2_ != Histograms::HISTOGRAM_ENUM_MAX) {
statistics_->reportTimeToHistogram(hist_type_2_, time);
}
}
}

Expand Down Expand Up @@ -75,7 +83,8 @@ class StopWatch {
private:
SystemClock* clock_;
Statistics* statistics_;
const uint32_t hist_type_;
const uint32_t hist_type_1_;
const uint32_t hist_type_2_;
uint64_t* elapsed_;
bool overwrite_;
bool stats_enabled_;
Expand Down
Loading

0 comments on commit 6e5a8e9

Please sign in to comment.