Skip to content

Commit

Permalink
Plumb IOActivity down through ReadOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Apr 3, 2023
1 parent 646a708 commit e98f4ad
Show file tree
Hide file tree
Showing 31 changed files with 150 additions and 33 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ set(SOURCES
util/stderr_logger.cc
util/string_util.cc
util/thread_local.cc
util/thread_io_activity.cc
util/threadpool_imp.cc
util/xxhash.cc
utilities/agg_merge/agg_merge.cc
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
### New Features
* Add experimental `PerfContext` counters `iter_{next|prev|seek}_count` for db iterator, each counting the times of corresponding API being called.
* Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()`
* New statistics `rocksdb.sst.read.{flush|compaction}.micros` that measures read time of block-based SST tables during flush or compaction, in addition to the existing aggregated statistics `rocksdb.sst.read.micros`

## 8.1.0 (03/18/2023)
### Behavior changes
Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/status.cc",
"util/stderr_logger.cc",
"util/string_util.cc",
"util/thread_io_activity.cc",
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/xxhash.cc",
Expand Down Expand Up @@ -608,6 +609,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"util/status.cc",
"util/stderr_logger.cc",
"util/string_util.cc",
"util/thread_io_activity.cc",
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/xxhash.cc",
Expand Down
1 change: 1 addition & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ Status BuildTable(
// No matter whether use_direct_io_for_flush_and_compaction is true,
// the goal is to cache it here for further user reads.
ReadOptions read_options;
read_options.io_activity = GetThreadIOActivity();
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
read_options, file_options, tboptions.internal_comparator, *meta,
nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor,
Expand Down
15 changes: 12 additions & 3 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
#include "util/stop_watch.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -505,8 +506,10 @@ void CompactionJob::GenSubcompactionBoundaries() {
for (size_t i = 0; i < num_files; i++) {
FileMetaData* f = flevel->files[i].file_metadata;
std::vector<TableReader::Anchor> my_anchors;
Status s = cfd->table_cache()->ApproximateKeyAnchors(
ReadOptions(), icomp, *f, my_anchors);
ReadOptions ro;
ro.io_activity = Env::IOActivity::kCompaction;
Status s = cfd->table_cache()->ApproximateKeyAnchors(ro, icomp, *f,
my_anchors);
if (!s.ok() || my_anchors.empty()) {
my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
}
Expand Down Expand Up @@ -617,6 +620,7 @@ Status CompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
TEST_SYNC_POINT("CompactionJob::Run():Start");
ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kCompaction);
log_buffer_->FlushBufferToLog();
LogCompaction();

Expand Down Expand Up @@ -710,6 +714,8 @@ Status CompactionJob::Run() {
compact_->compaction->mutable_cf_options()->prefix_extractor;
std::atomic<size_t> next_file_idx(0);
auto verify_table = [&](Status& output_status) {
ThreadIOActivityGuard verify_table_thread_io_activity_guard(
Env::IOActivity::kCompaction);
while (true) {
size_t file_idx = next_file_idx.fetch_add(1);
if (file_idx >= files_output.size()) {
Expand All @@ -723,6 +729,8 @@ Status CompactionJob::Run() {
// verification as user reads since the goal is to cache it here for
// further user reads
ReadOptions read_options;
read_options.io_activity = Env::IOActivity::kCompaction;

InternalIterator* iter = cfd->table_cache()->NewIterator(
read_options, file_options_, cfd->internal_comparator(),
files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
Expand Down Expand Up @@ -1032,7 +1040,7 @@ void CompactionJob::NotifyOnSubcompactionCompleted(
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);

ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kCompaction);
if (db_options_.compaction_service) {
CompactionServiceJobStatus comp_status =
ProcessKeyValueCompactionWithCompactionService(sub_compact);
Expand Down Expand Up @@ -1083,6 +1091,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
read_options.verify_checksums = true;
read_options.fill_cache = false;
read_options.rate_limiter_priority = GetRateLimiterPriority();
read_options.io_activity = Env::IOActivity::kCompaction;
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
Expand Down
5 changes: 2 additions & 3 deletions db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ Status VerifySstFileChecksum(const Options& options,
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), file_path, ioptions.clock, nullptr /* io_tracer */,
nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
ioptions.rate_limiter.get()));
nullptr /* stats */, Histograms::SST_READ_MICROS /* hist_type */,
nullptr /* file_read_hist */, ioptions.rate_limiter.get()));
const bool kImmortal = true;
auto reader_options = TableReaderOptions(
ioptions, options.prefix_extractor, env_options, internal_comparator,
Expand All @@ -76,4 +76,3 @@ Status VerifySstFileChecksum(const Options& options,
}

} // namespace ROCKSDB_NAMESPACE

1 change: 1 addition & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,

ReadOptions ro;
ro.total_order_seek = true;
ro.io_activity = Env::IOActivity::kCompaction;
bool overlap;
for (int level = 0;
level < current_version->storage_info()->num_non_empty_levels();
Expand Down
3 changes: 3 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rocksdb/wal_filter.h"
#include "test_util/sync_point.h"
#include "util/rate_limiter.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {
Options SanitizeOptions(const std::string& dbname, const Options& src,
Expand Down Expand Up @@ -1557,6 +1558,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kFlush);
assert(cfd);
assert(cfd->imm());
// The immutable memtable list must be empty.
Expand All @@ -1574,6 +1576,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
ReadOptions ro;
ro.total_order_seek = true;
ro.io_activity = Env::IOActivity::kFlush;
Arena arena;
Status s;
TableProperties table_properties;
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1805,7 +1805,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
bool delayed = false;
{
StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL,
&time_delayed);
Histograms::HISTOGRAM_ENUM_MAX, &time_delayed);
// To avoid parallel timed delays (bad throttling), only support them
// on the primary write queue.
uint64_t delay;
Expand Down
3 changes: 3 additions & 0 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -211,6 +212,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
bool* switched_to_mempurge) {
TEST_SYNC_POINT("FlushJob::Start");
db_mutex_->AssertHeld();
ThreadIOActivityGuard thread_io_activity_guard(Env::IOActivity::kFlush);
assert(pick_memtable_called);
// Mempurge threshold can be dynamically changed.
// For sake of consistency, mempurge_threshold is
Expand Down Expand Up @@ -841,6 +843,7 @@ Status FlushJob::WriteLevel0Table() {
range_del_iters;
ReadOptions ro;
ro.total_order_seek = true;
ro.io_activity = Env::IOActivity::kFlush;
Arena arena;
uint64_t total_num_entries = 0, total_num_deletes = 0;
uint64_t total_data_size = 0;
Expand Down
3 changes: 2 additions & 1 deletion db/perf_context_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ TEST_F(PerfContextTest, StopWatchOverhead) {
uint64_t elapsed = 0;
std::vector<uint64_t> timings(kTotalIterations);

StopWatch timer(SystemClock::Default().get(), nullptr, 0, &elapsed);
StopWatch timer(SystemClock::Default().get(), nullptr, 0,
Histograms::HISTOGRAM_ENUM_MAX, &elapsed);
for (auto& timing : timings) {
timing = elapsed;
}
Expand Down
5 changes: 4 additions & 1 deletion db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "port/port.h"
#include "table/table_reader.h"
#include "util/string_util.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -1323,8 +1324,10 @@ class VersionBuilder::Rep {
auto* file_meta = files_meta[file_idx].first;
int level = files_meta[file_idx].second;
TableCache::TypedHandle* handle = nullptr;
ReadOptions read_options;
read_options.io_activity = GetThreadIOActivity();
statuses[file_idx] = table_cache_->FindTable(
ReadOptions(), file_options_,
read_options, file_options_,
*(base_vstorage_->InternalComparator()), *file_meta, &handle,
prefix_extractor, false /*no_io */, true /* record_read_stats */,
internal_stats->GetFileReadHist(level), false, level,
Expand Down
5 changes: 3 additions & 2 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1566,8 +1566,9 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), file_name, nullptr /* env */, io_tracer_,
nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
nullptr /* rate_limiter */, ioptions->listeners));
nullptr /* stats */, Histograms::SST_READ_MICROS /* hist_type */,
nullptr /* file_read_hist */, nullptr /* rate_limiter */,
ioptions->listeners));
std::unique_ptr<TableProperties> props;
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
Expand Down
6 changes: 3 additions & 3 deletions file/file_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ IOStatus GenerateOneFileChecksum(
if (!io_s.ok()) {
return io_s;
}
reader.reset(new RandomAccessFileReader(std::move(r_file), file_path,
nullptr /*Env*/, io_tracer, nullptr,
0, nullptr, rate_limiter));
reader.reset(new RandomAccessFileReader(
std::move(r_file), file_path, nullptr /*Env*/, io_tracer, nullptr,
Histograms::HISTOGRAM_ENUM_MAX, nullptr, rate_limiter));
}

// Found that 256 KB readahead size provides the best performance, based on
Expand Down
2 changes: 2 additions & 0 deletions file/file_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro,
}

opts.rate_limiter_priority = ro.rate_limiter_priority;
opts.io_activity = ro.io_activity;

return IOStatus::OK();
}

Expand Down
23 changes: 18 additions & 5 deletions file/random_access_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
#include "test_util/sync_point.h"
#include "util/random.h"
#include "util/rate_limiter.h"
#include "util/thread_io_activity.h"

namespace ROCKSDB_NAMESPACE {

const std::array<Histograms, std::size_t(Env::IOActivity::kUnknown)>
kReadHistograms{{
SST_READ_FLUSH_MICROS,
SST_READ_COMPACTION_MICROS,
}};
inline void RecordIOStats(Statistics* stats, Temperature file_temperature,
bool is_last_level, size_t size) {
IOSTATS_ADD(bytes_read, size);
Expand Down Expand Up @@ -94,6 +99,9 @@ IOStatus RandomAccessFileReader::Read(
uint64_t elapsed = 0;
{
StopWatch sw(clock_, stats_, hist_type_,
(opts.io_activity != Env::IOActivity::kUnknown)
? kReadHistograms[(std::size_t)(opts.io_activity)]
: Histograms::HISTOGRAM_ENUM_MAX,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
Expand Down Expand Up @@ -288,6 +296,9 @@ IOStatus RandomAccessFileReader::MultiRead(
uint64_t elapsed = 0;
{
StopWatch sw(clock_, stats_, hist_type_,
(opts.io_activity != Env::IOActivity::kUnknown)
? kReadHistograms[(std::size_t)(opts.io_activity)]
: Histograms::HISTOGRAM_ENUM_MAX,
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
true /*delay_enabled*/);
auto prev_perf_level = GetPerfLevel();
Expand Down Expand Up @@ -476,13 +487,15 @@ IOStatus RandomAccessFileReader::ReadAsync(

assert(read_async_info->buf_.CurrentSize() == 0);

StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed,
true /*overwrite*/, true /*delay_enabled*/);
StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/,
Histograms::HISTOGRAM_ENUM_MAX, &elapsed, true /*overwrite*/,
true /*delay_enabled*/);
s = file_->ReadAsync(aligned_req, opts, read_async_callback,
read_async_info, io_handle, del_fn, nullptr /*dbg*/);
} else {
StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed,
true /*overwrite*/, true /*delay_enabled*/);
StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/,
Histograms::HISTOGRAM_ENUM_MAX, &elapsed, true /*overwrite*/,
true /*delay_enabled*/);
s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
io_handle, del_fn, nullptr /*dbg*/);
}
Expand Down
3 changes: 2 additions & 1 deletion file/random_access_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class RandomAccessFileReader {
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
SystemClock* clock = nullptr,
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
Statistics* stats = nullptr, uint32_t hist_type = 0,
Statistics* stats = nullptr,
uint32_t hist_type = Histograms::HISTOGRAM_ENUM_MAX,
HistogramImpl* file_read_hist = nullptr,
RateLimiter* rate_limiter = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
Expand Down
7 changes: 7 additions & 0 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,13 @@ class Env : public Customizable {
IO_TOTAL = 4
};

// EXPERIMENTAL
enum class IOActivity : uint8_t {
kFlush = 0,
kCompaction = 1,
kUnknown = 2,
};

// Arrange to run "(*function)(arg)" once in a background thread, in
// the thread pool specified by pri. By default, jobs go to the 'LOW'
// priority thread pool.
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ struct IOOptions {
// directories and list only files in GetChildren API.
bool do_not_recurse;

Env::IOActivity io_activity = Env::IOActivity::kUnknown;

IOOptions() : IOOptions(false) {}

explicit IOOptions(bool force_dir_fsync_)
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,8 @@ struct ReadOptions {
// Default: true
bool optimize_multiget_for_io;

Env::IOActivity io_activity = Env::IOActivity::kUnknown;

ReadOptions();
ReadOptions(bool cksum, bool cache);
};
Expand Down
5 changes: 5 additions & 0 deletions include/rocksdb/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,12 @@ enum Histograms : uint32_t {
NUM_FILES_IN_SINGLE_COMPACTION,
DB_SEEK,
WRITE_STALL,
// Time spent in reading block-based or plain SST table
SST_READ_MICROS,
// Time spent in reading block-based SST table for flush or compaction
SST_READ_FLUSH_MICROS,
SST_READ_COMPACTION_MICROS,

// The number of subcompactions actually scheduled during a compaction
NUM_SUBCOMPACTIONS_SCHEDULED,
// Value size distribution in each operation
Expand Down
8 changes: 8 additions & 0 deletions java/rocksjni/portal.h
Original file line number Diff line number Diff line change
Expand Up @@ -5619,6 +5619,10 @@ class HistogramTypeJni {
return 0x38;
case ROCKSDB_NAMESPACE::Histograms::TABLE_OPEN_PREFETCH_TAIL_READ_BYTES:
return 0x39;
case ROCKSDB_NAMESPACE::Histograms::SST_READ_FLUSH_MICROS:
return 0x3A;
case ROCKSDB_NAMESPACE::Histograms::SST_READ_COMPACTION_MICROS:
return 0x3B;
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
// 0x1F for backwards compatibility on current minor version.
return 0x1F;
Expand Down Expand Up @@ -5738,6 +5742,10 @@ class HistogramTypeJni {
case 0x39:
return ROCKSDB_NAMESPACE::Histograms::
TABLE_OPEN_PREFETCH_TAIL_READ_BYTES;
case 0x3A:
return ROCKSDB_NAMESPACE::Histograms::SST_READ_FLUSH_MICROS;
case 0x3B:
return ROCKSDB_NAMESPACE::Histograms::SST_READ_COMPACTION_MICROS;
case 0x1F:
// 0x1F for backwards compatibility on current minor version.
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;
Expand Down
4 changes: 4 additions & 0 deletions java/src/main/java/org/rocksdb/HistogramType.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ public enum HistogramType {
*/
TABLE_OPEN_PREFETCH_TAIL_READ_BYTES((byte) 0x39),

SST_READ_FLUSH_MICROS((byte) 0x3A),

SST_READ_COMPACTION_MICROS((byte) 0x3B),

// 0x1F for backwards compatibility on current minor version.
HISTOGRAM_ENUM_MAX((byte) 0x1F);

Expand Down
Loading

0 comments on commit e98f4ad

Please sign in to comment.