Skip to content

Commit

Permalink
Add/Populate SST_READ_FLUSH/COMPACTION_MICROS new stat by IOActivity,…
Browse files Browse the repository at this point in the history
… get IOActivity from TableReaderCaller, fallback behavior on TableCache for SST_READ stat
  • Loading branch information
hx235 committed Mar 10, 2023
1 parent b7631b9 commit 4c871ca
Show file tree
Hide file tree
Showing 22 changed files with 140 additions and 91 deletions.
6 changes: 3 additions & 3 deletions db/blob/blob_file_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ Status BlobFileReader::OpenFile(

file_reader->reset(new RandomAccessFileReader(
std::move(file), blob_file_path, immutable_options.clock, io_tracer,
immutable_options.stats, BLOB_DB_BLOB_FILE_READ_MICROS,
blob_file_read_hist, immutable_options.rate_limiter.get(),
immutable_options.listeners));
immutable_options.stats, IOActivity::kBlob,
Histograms::HISTOGRAM_ENUM_MAX, blob_file_read_hist,
immutable_options.rate_limiter.get(), immutable_options.listeners));

return Status::OK();
}
Expand Down
7 changes: 3 additions & 4 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Status BuildTable(
const std::string& dbname, VersionSet* versions,
const ImmutableDBOptions& db_options, const TableBuilderOptions& tboptions,
const FileOptions& file_options, TableCache* table_cache,
InternalIterator* iter,
TableReaderCaller table_reader_caller, InternalIterator* iter,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters,
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
Expand Down Expand Up @@ -256,8 +256,7 @@ Status BuildTable(
approx_opts.files_size_error_margin = 0.1;
meta->compensated_range_deletion_size += versions->ApproximateSize(
approx_opts, version, kv.first.Encode(), tombstone_end.Encode(),
0 /* start_level */, -1 /* end_level */,
TableReaderCaller::kFlush);
0 /* start_level */, -1 /* end_level */, table_reader_caller);
}
last_tombstone_start_user_key = range_del_it->start_key();
}
Expand Down Expand Up @@ -376,7 +375,7 @@ Status BuildTable(
nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
TableReaderCaller::kFlush, /*arena=*/nullptr,
table_reader_caller, /*arena=*/nullptr,
/*skip_filter=*/false, tboptions.level_at_creation,
MaxFileSizeForL0MetaPin(mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
Expand Down
2 changes: 1 addition & 1 deletion db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ extern Status BuildTable(
const std::string& dbname, VersionSet* versions,
const ImmutableDBOptions& db_options, const TableBuilderOptions& tboptions,
const FileOptions& file_options, TableCache* table_cache,
InternalIterator* iter,
TableReaderCaller table_reader_caller, InternalIterator* iter,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters,
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
Expand Down
3 changes: 2 additions & 1 deletion db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ Status VerifySstFileChecksum(const Options& options,
new RandomAccessFileReader(
std::move(file), file_path, ioptions.clock, nullptr /* io_tracer */,
nullptr /* stats */, IOActivity::kUnknown,
nullptr /* file_read_hist */, ioptions.rate_limiter.get()));
Histograms::HISTOGRAM_ENUM_MAX, nullptr /* file_read_hist */,
ioptions.rate_limiter.get()));
const bool kImmortal = true;
auto reader_options = TableReaderOptions(
ioptions, options.prefix_extractor, env_options, internal_comparator,
Expand Down
21 changes: 11 additions & 10 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1538,16 +1538,17 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
SeqnoToTimeMapping empty_seqno_time_mapping;
Version* version = cfd->current();
version->Ref();
s = BuildTable(
dbname_, versions_.get(), immutable_db_options_, tboptions,
file_options_for_compaction_, cfd->table_cache(), iter.get(),
std::move(range_del_iters), &meta, &blob_file_additions,
snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber,
snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s,
io_tracer_, BlobFileCreationReason::kRecovery,
empty_seqno_time_mapping, &event_logger_, job_id, Env::IO_HIGH,
nullptr /* table_properties */, write_hint,
nullptr /*full_history_ts_low*/, &blob_callback_, version);
s = BuildTable(dbname_, versions_.get(), immutable_db_options_, tboptions,
file_options_for_compaction_, cfd->table_cache(),
TableReaderCaller::kFlush, iter.get(),
std::move(range_del_iters), &meta, &blob_file_additions,
snapshot_seqs, earliest_write_conflict_snapshot,
kMaxSequenceNumber, snapshot_checker, paranoid_file_checks,
cfd->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kRecovery,
empty_seqno_time_mapping, &event_logger_, job_id,
Env::IO_HIGH, nullptr /* table_properties */, write_hint,
nullptr /*full_history_ts_low*/, &blob_callback_, version);
version->Unref();
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
Expand Down
23 changes: 12 additions & 11 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -930,17 +930,18 @@ Status FlushJob::WriteLevel0Table() {
meta_.fd.GetNumber());
const SequenceNumber job_snapshot_seq =
job_context_->GetJobSnapshotSequence();
s = BuildTable(
dbname_, versions_, db_options_, tboptions, file_options_,
cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
&blob_file_additions, existing_snapshots_,
earliest_write_conflict_snapshot_, job_snapshot_seq,
snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kFlush, seqno_to_time_mapping_, event_logger_,
job_context_->job_id, io_priority, &table_properties_, write_hint,
full_history_ts_low, blob_callback_, base_, &num_input_entries,
&memtable_payload_bytes, &memtable_garbage_bytes);
s = BuildTable(dbname_, versions_, db_options_, tboptions, file_options_,
cfd_->table_cache(), TableReaderCaller::kFlush, iter.get(),
std::move(range_del_iters), &meta_, &blob_file_additions,
existing_snapshots_, earliest_write_conflict_snapshot_,
job_snapshot_seq, snapshot_checker_,
mutable_cf_options_.paranoid_file_checks,
cfd_->internal_stats(), &io_s, io_tracer_,
BlobFileCreationReason::kFlush, seqno_to_time_mapping_,
event_logger_, job_context_->job_id, io_priority,
&table_properties_, write_hint, full_history_ts_low,
blob_callback_, base_, &num_input_entries,
&memtable_payload_bytes, &memtable_garbage_bytes);
// TODO: Cleanup io_status in BuildTable and table builders
assert(!s.ok() || io_s.ok());
io_s.PermitUncheckedError();
Expand Down
16 changes: 8 additions & 8 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -456,13 +456,14 @@ class Repairer {
SeqnoToTimeMapping empty_seqno_time_mapping;
status = BuildTable(
dbname_, /* versions */ nullptr, immutable_db_options_, tboptions,
file_options_, table_cache_.get(), iter.get(),
std::move(range_del_iters), &meta, nullptr /* blob_file_additions */,
{}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker,
false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s,
nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery,
empty_seqno_time_mapping, nullptr /* event_logger */, 0 /* job_id */,
Env::IO_HIGH, nullptr /* table_properties */, write_hint);
file_options_, table_cache_.get(), TableReaderCaller::kFlush,
iter.get(), std::move(range_del_iters), &meta,
nullptr /* blob_file_additions */, {}, kMaxSequenceNumber,
kMaxSequenceNumber, snapshot_checker, false /* paranoid_file_checks*/,
nullptr /* internal_stats */, &io_s, nullptr /*IOTracer*/,
BlobFileCreationReason::kRecovery, empty_seqno_time_mapping,
nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
nullptr /* table_properties */, write_hint);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),
Expand Down Expand Up @@ -809,4 +810,3 @@ Status RepairDB(const std::string& dbname, const Options& options) {
}

} // namespace ROCKSDB_NAMESPACE

70 changes: 45 additions & 25 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Status TableCache::GetTableReader(
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, bool sequential_mode, bool record_read_stats,
HistogramImpl* file_read_hist, std::unique_ptr<TableReader>* table_reader,
TableReaderCaller caller,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
size_t max_file_size_for_l0_meta_pin, Temperature file_temperature) {
Expand Down Expand Up @@ -124,11 +125,23 @@ Status TableCache::GetTableReader(
if (!sequential_mode && ioptions_.advise_random_on_open) {
file->Hint(FSRandomAccessFile::kRandom);
}
StopWatch sw(ioptions_.clock, ioptions_.stats, TABLE_OPEN_IO_MICROS);
IOActivity io_activity;
if (caller == TableReaderCaller::kFlush) {
io_activity = IOActivity::kFlush;
} else if (caller == TableReaderCaller::kCompaction ||
caller == TableReaderCaller::kCompactionRefill) {
io_activity = IOActivity::kCompaction;
} else {
io_activity = IOActivity::kUnknown;
}
StopWatch sw(ioptions_.clock, ioptions_.stats, {TABLE_OPEN_IO_MICROS});
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), fname, ioptions_.clock, io_tracer_,
record_read_stats ? ioptions_.stats : nullptr, SST_READ_MICROS,
record_read_stats ? ioptions_.stats : nullptr, io_activity,
io_activity == IOActivity::kUnknown
? Histograms::SST_READ_MICROS
: Histograms::HISTOGRAM_ENUM_MAX,
file_read_hist, ioptions_.rate_limiter.get(), ioptions_.listeners,
file_temperature, level == ioptions_.num_levels - 1));
UniqueId64x2 expected_unique_id;
Expand Down Expand Up @@ -156,6 +169,7 @@ Status TableCache::FindTable(
const ReadOptions& ro, const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, TypedHandle** handle,
TableReaderCaller caller,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
const bool no_io, bool record_read_stats, HistogramImpl* file_read_hist,
bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
Expand All @@ -182,7 +196,7 @@ Status TableCache::FindTable(
Status s =
GetTableReader(ro, file_options, internal_comparator, file_meta,
false /* sequential mode */, record_read_stats,
file_read_hist, &table_reader, prefix_extractor,
file_read_hist, &table_reader, caller, prefix_extractor,
skip_filters, level, prefetch_index_and_filter_in_cache,
max_file_size_for_l0_meta_pin, file_temperature);
if (!s.ok()) {
Expand Down Expand Up @@ -226,7 +240,7 @@ InternalIterator* TableCache::NewIterator(
table_reader = fd.table_reader;
if (table_reader == nullptr) {
s = FindTable(
options, file_options, icomparator, file_meta, &handle,
options, file_options, icomparator, file_meta, &handle, caller,
prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */,
!for_compaction /* record_read_stats */, file_read_hist, skip_filters,
level, true /* prefetch_index_and_filter_in_cache */,
Expand Down Expand Up @@ -317,7 +331,7 @@ Status TableCache::GetRangeTombstoneIterator(
TypedHandle* handle = nullptr;
if (t == nullptr) {
s = FindTable(options, file_options_, internal_comparator, file_meta,
&handle);
&handle, TableReaderCaller::kUncategorized);
if (s.ok()) {
t = cache_.Value(handle);
}
Expand Down Expand Up @@ -430,7 +444,8 @@ Status TableCache::Get(
assert(s.ok());
if (t == nullptr) {
s = FindTable(options, file_options_, internal_comparator, file_meta,
&handle, prefix_extractor,
&handle, TableReaderCaller::kUncategorized,
prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters,
level, true /* prefetch_index_and_filter_in_cache */,
Expand Down Expand Up @@ -531,12 +546,13 @@ Status TableCache::MultiGetFilter(
MultiGetContext::Range tombstone_range(*mget_range, mget_range->begin(),
mget_range->end());
if (t == nullptr) {
s = FindTable(
options, file_options_, internal_comparator, file_meta, &handle,
prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, /*skip_filters=*/false,
level, true /* prefetch_index_and_filter_in_cache */,
/*max_file_size_for_l0_meta_pin=*/0, file_meta.temperature);
s = FindTable(options, file_options_, internal_comparator, file_meta,
&handle, TableReaderCaller::kUncategorized, prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist,
/*skip_filters=*/false, level,
true /* prefetch_index_and_filter_in_cache */,
/*max_file_size_for_l0_meta_pin=*/0, file_meta.temperature);
if (s.ok()) {
t = cache_.Value(handle);
}
Expand Down Expand Up @@ -574,8 +590,10 @@ Status TableCache::GetTableProperties(
}

TypedHandle* table_handle = nullptr;
Status s = FindTable(ReadOptions(), file_options, internal_comparator,
file_meta, &table_handle, prefix_extractor, no_io);
Status s =
FindTable(ReadOptions(), file_options, internal_comparator, file_meta,
&table_handle, TableReaderCaller::kUncategorized,
prefix_extractor, no_io);
if (!s.ok()) {
return s;
}
Expand All @@ -593,7 +611,8 @@ Status TableCache::ApproximateKeyAnchors(
TableReader* t = file_meta.fd.table_reader;
TypedHandle* handle = nullptr;
if (t == nullptr) {
s = FindTable(ro, file_options_, internal_comparator, file_meta, &handle);
s = FindTable(ro, file_options_, internal_comparator, file_meta, &handle,
TableReaderCaller::kUncategorized);
if (s.ok()) {
t = cache_.Value(handle);
}
Expand All @@ -619,8 +638,9 @@ size_t TableCache::GetMemoryUsageByTableReader(
}

TypedHandle* table_handle = nullptr;
Status s = FindTable(ReadOptions(), file_options, internal_comparator,
file_meta, &table_handle, prefix_extractor, true);
Status s = FindTable(
ReadOptions(), file_options, internal_comparator, file_meta,
&table_handle, TableReaderCaller::kUncategorized, prefix_extractor, true);
if (!s.ok()) {
return 0;
}
Expand All @@ -644,10 +664,10 @@ uint64_t TableCache::ApproximateOffsetOf(
TypedHandle* table_handle = nullptr;
if (table_reader == nullptr) {
const bool for_compaction = (caller == TableReaderCaller::kCompaction);
Status s =
FindTable(ReadOptions(), file_options_, internal_comparator, file_meta,
&table_handle, prefix_extractor, false /* no_io */,
!for_compaction /* record_read_stats */);
Status s = FindTable(
ReadOptions(), file_options_, internal_comparator, file_meta,
&table_handle, TableReaderCaller::kUncategorized, prefix_extractor,
false /* no_io */, !for_compaction /* record_read_stats */);
if (s.ok()) {
table_reader = cache_.Value(table_handle);
}
Expand All @@ -672,10 +692,10 @@ uint64_t TableCache::ApproximateSize(
TypedHandle* table_handle = nullptr;
if (table_reader == nullptr) {
const bool for_compaction = (caller == TableReaderCaller::kCompaction);
Status s =
FindTable(ReadOptions(), file_options_, internal_comparator, file_meta,
&table_handle, prefix_extractor, false /* no_io */,
!for_compaction /* record_read_stats */);
Status s = FindTable(
ReadOptions(), file_options_, internal_comparator, file_meta,
&table_handle, TableReaderCaller::kUncategorized, prefix_extractor,
false /* no_io */, !for_compaction /* record_read_stats */);
if (s.ok()) {
table_reader = cache_.Value(table_handle);
}
Expand Down
4 changes: 2 additions & 2 deletions db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class TableCache {
Status FindTable(
const ReadOptions& ro, const FileOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, TypedHandle**,
const FileMetaData& file_meta, TypedHandle**, TableReaderCaller caller,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
const bool no_io = false, bool record_read_stats = true,
HistogramImpl* file_read_hist = nullptr, bool skip_filters = false,
Expand Down Expand Up @@ -234,7 +234,7 @@ class TableCache {
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, bool sequential_mode,
bool record_read_stats, HistogramImpl* file_read_hist,
std::unique_ptr<TableReader>* table_reader,
std::unique_ptr<TableReader>* table_reader, TableReaderCaller caller,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
bool skip_filters = false, int level = -1,
bool prefetch_index_and_filter_in_cache = true,
Expand Down
3 changes: 2 additions & 1 deletion db/table_cache_sync_and_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
if (t == nullptr) {
assert(handle == nullptr);
s = FindTable(options, file_options_, internal_comparator, file_meta,
&handle, prefix_extractor,
&handle, TableReaderCaller::kUncategorized,
prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters,
level, true /* prefetch_index_and_filter_in_cache */,
Expand Down
3 changes: 2 additions & 1 deletion db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,8 @@ class VersionBuilder::Rep {
statuses[file_idx] = table_cache_->FindTable(
ReadOptions(), file_options_,
*(base_vstorage_->InternalComparator()), *file_meta, &handle,
prefix_extractor, false /*no_io */, true /* record_read_stats */,
TableReaderCaller::kUncategorized, prefix_extractor,
false /*no_io */, true /* record_read_stats */,
internal_stats->GetFileReadHist(level), false, level,
prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin,
file_meta->temperature);
Expand Down
7 changes: 4 additions & 3 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1567,8 +1567,8 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
new RandomAccessFileReader(
std::move(file), file_name, nullptr /* env */, io_tracer_,
nullptr /* stats */, IOActivity::kUnknown,
nullptr /* file_read_hist */, nullptr /* rate_limiter */,
ioptions->listeners));
Histograms::HISTOGRAM_ENUM_MAX, nullptr /* file_read_hist */,
nullptr /* rate_limiter */, ioptions->listeners));
std::unique_ptr<TableProperties> props;
s = ReadTableProperties(
file_reader.get(), file_meta->fd.GetFileSize(),
Expand Down Expand Up @@ -6927,7 +6927,8 @@ Status VersionSet::VerifyFileMetadata(ColumnFamilyData* cfd,
TableCache::TypedHandle* handle = nullptr;
FileMetaData meta_copy = meta;
status = table_cache->FindTable(
ReadOptions(), file_opts, *icmp, meta_copy, &handle, pe,
ReadOptions(), file_opts, *icmp, meta_copy, &handle,
TableReaderCaller::kUncategorized, pe,
/*no_io=*/false, /*record_read_stats=*/true,
internal_stats->GetFileReadHist(level), false, level,
/*prefetch_index_and_filter_in_cache*/ false, max_sz_for_l0_meta_pin,
Expand Down
Loading

0 comments on commit 4c871ca

Please sign in to comment.