Skip to content

Commit

Permalink
Fix and optimize read IO pattern for flat map column (facebookincubat…
Browse files Browse the repository at this point in the history
…or#11236)

Summary:

1. `numReferences` and `numReads` in `TrackingData` can overflow for large volume of read on flatmap column, which causes the read percentage becomes negative and fail to coalesce.
2. Increase the limit for maximum number of regions for coalesce, reducing the number of IO reads for a typical flatmap column to 1/3.

Reviewed By: oerling

Differential Revision: D64225777
  • Loading branch information
Yuhta authored and facebook-github-bot committed Oct 12, 2024
1 parent 6cc4152 commit fe8d3a1
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 89 deletions.
16 changes: 9 additions & 7 deletions velox/common/caching/ScanTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <mutex>

#include "velox/common/base/BitUtil.h"
#include "velox/common/base/CheckedArithmetic.h"
#include "velox/common/base/Exceptions.h"

namespace facebook::velox::cache {
Expand Down Expand Up @@ -73,10 +74,10 @@ class FileGroupStats;

/// Records references and actual uses of a stream.
struct TrackingData {
int64_t referencedBytes{};
int64_t readBytes{};
int32_t numReferences{};
int32_t numReads{};
double referencedBytes{};
double readBytes{};
int64_t numReferences{};
int64_t numReads{};

/// Marks that 'bytes' worth of data in the tracked object has been referenced
/// and may later be accessed. If 'bytes' is larger than a single
Expand All @@ -87,15 +88,16 @@ struct TrackingData {
void incrementReference(uint64_t bytes, int32_t loadQuantum) {
referencedBytes += bytes;
if (loadQuantum == 0) {
++numReferences;
numReferences = checkedPlus<int64_t>(numReferences, 1);
} else {
numReferences += bits::roundUp(bytes, loadQuantum) / loadQuantum;
numReferences = checkedPlus<int64_t>(
numReferences, bits::divRoundUp(bytes, loadQuantum));
}
}

void incrementRead(uint64_t bytes) {
readBytes += bytes;
++numReads;
numReads = checkedPlus<int64_t>(numReads, 1);
}
};

Expand Down
14 changes: 14 additions & 0 deletions velox/dwio/common/BufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "velox/common/caching/ScanTracker.h"
#include "velox/common/memory/AllocationPool.h"
#include "velox/dwio/common/SeekableInputStream.h"
#include "velox/dwio/common/StreamIdentifier.h"
Expand Down Expand Up @@ -149,6 +150,19 @@ class BufferedInput {
virtual uint64_t nextFetchSize() const;

protected:
static int32_t adjustedReadPct(const cache::TrackingData& trackingData) {
// When this method is called, there is one more reference that is already
// counted, but the corresponding read (if exists) has not happened yet. So
// we must count one fewer reference at this point.
if (trackingData.numReferences < 2) {
return 0;
}
const auto pct = checkedMultiply<int64_t>(100, trackingData.numReads) /
(trackingData.numReferences - 1);
VELOX_CHECK(0 <= pct && pct <= 100, "Bad read percentage: {}", pct);
return pct;
}

const std::shared_ptr<ReadFileInputStream> input_;
memory::MemoryPool* const pool_;

Expand Down
83 changes: 34 additions & 49 deletions velox/dwio/common/CachedBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ std::vector<CacheRequest*> makeRequestParts(
const auto readPct =
(100 * trackingData.numReads) / (1 + trackingData.numReferences);
const auto readDensity =
(100 * trackingData.readBytes) / (1 + trackingData.referencedBytes);
trackingData.readBytes / (1 + trackingData.referencedBytes);
const bool prefetch = trackingData.referencedBytes > 0 &&
(isPrefetchPct(readPct) && readDensity >= 80);
(isPrefetchPct(readPct) && readDensity >= 0.8);
std::vector<CacheRequest*> parts;
for (uint64_t offset = 0; offset < request.size; offset += loadQuantum) {
const int32_t size = std::min<int32_t>(loadQuantum, request.size - offset);
Expand All @@ -143,14 +143,6 @@ std::vector<CacheRequest*> makeRequestParts(
return parts;
}

int32_t adjustedReadPct(const cache::TrackingData& trackingData) {
// When called, there will be one more reference that read, since references
// are counted before reading.
if (trackingData.numReferences < 2) {
return 0;
}
return (100 * trackingData.numReads) / (trackingData.numReferences - 1);
}
} // namespace

void CachedBufferedInput::load(const LogType /*unused*/) {
Expand All @@ -165,49 +157,43 @@ void CachedBufferedInput::load(const LogType /*unused*/) {
// Extra requests made for pre-loadable regions that are larger than
// 'loadQuantum'.
std::vector<std::unique_ptr<CacheRequest>> extraRequests;
// We loop over access frequency buckets. For example readPct 80 will get all
// streams where 80% or more of the referenced data is actually loaded.
for (const auto readPct : std::vector<int32_t>{80, 50, 20, 0}) {
std::vector<CacheRequest*> storageLoad;
std::vector<CacheRequest*> ssdLoad;
for (auto& request : requests) {
if (request.processed) {
std::vector<CacheRequest*> storageLoad[2];
std::vector<CacheRequest*> ssdLoad[2];
for (auto& request : requests) {
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && (tracker_ != nullptr)) {
trackingData = tracker_->trackingData(request.trackingId);
}
const int prefetch =
(prefetchAnyway || isPrefetchPct(adjustedReadPct(trackingData))) ? 1
: 0;
auto parts = makeRequestParts(
request, trackingData, options_.loadQuantum(), extraRequests);
for (auto part : parts) {
if (cache_->exists(part->key)) {
continue;
}
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && (tracker_ != nullptr)) {
trackingData = tracker_->trackingData(request.trackingId);
}
if (prefetchAnyway || adjustedReadPct(trackingData) >= readPct) {
request.processed = true;
auto parts = makeRequestParts(
request, trackingData, options_.loadQuantum(), extraRequests);
for (auto part : parts) {
if (cache_->exists(part->key)) {
continue;
}
if (ssdFile != nullptr) {
part->ssdPin = ssdFile->find(part->key);
if (!part->ssdPin.empty() &&
part->ssdPin.run().size() < part->size) {
LOG(INFO) << "IOERR: Ignoring SSD shorter than requested: "
<< part->ssdPin.run().size() << " vs " << part->size;
part->ssdPin.clear();
}
if (!part->ssdPin.empty()) {
ssdLoad.push_back(part);
continue;
}
}
storageLoad.push_back(part);
if (ssdFile != nullptr) {
part->ssdPin = ssdFile->find(part->key);
if (!part->ssdPin.empty() && part->ssdPin.run().size() < part->size) {
LOG(INFO) << "IOERR: Ignoring SSD shorter than requested: "
<< part->ssdPin.run().size() << " vs " << part->size;
part->ssdPin.clear();
}
if (!part->ssdPin.empty()) {
ssdLoad[prefetch].push_back(part);
continue;
}
}
storageLoad[prefetch].push_back(part);
}
makeLoads(std::move(storageLoad), isPrefetchPct(readPct));
makeLoads(std::move(ssdLoad), isPrefetchPct(readPct));
}
makeLoads(std::move(storageLoad[1]), true);
makeLoads(std::move(ssdLoad[1]), true);
makeLoads(std::move(storageLoad[0]), false);
makeLoads(std::move(ssdLoad[0]), false);
}

void CachedBufferedInput::makeLoads(
Expand Down Expand Up @@ -235,8 +221,7 @@ void CachedBufferedInput::makeLoads(
coalesceIo<CacheRequest*, CacheRequest*>(
requests,
maxDistance,
// Break batches up. Better load more short ones in parallel.
40,
std::numeric_limits<int32_t>::max(),
[&](int32_t index) {
return isSsd ? requests[index]->ssdPin.run().offset()
: requests[index]->key.offset;
Expand Down
2 changes: 0 additions & 2 deletions velox/dwio/common/CachedBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ struct CacheRequest {
cache::CachePin pin;
cache::SsdPin ssdPin;

bool processed{false};

/// True if this should be coalesced into a CoalescedLoad with other nearby
/// requests with a similar load probability. This is false for sparsely
/// accessed large columns where hitting one piece should not load the
Expand Down
45 changes: 15 additions & 30 deletions velox/dwio/common/DirectBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,42 +81,27 @@ bool isPrefetchablePct(int32_t pct) {
return pct >= FLAGS_cache_prefetch_min_pct;
}

int32_t adjustedReadPct(const cache::TrackingData& trackingData) {
// When called, there will be one more reference that read, since references
// are counted before reading.
if (trackingData.numReferences < 2) {
return 0;
}
return (100 * trackingData.numReads) / (trackingData.numReferences - 1);
}
} // namespace

void DirectBufferedInput::load(const LogType /*unused*/) {
// After load, new requests cannot be merged into pre-load ones.
auto requests = std::move(requests_);

// We loop over access frequency buckets. For example readPct 80
// will get all streams where 80% or more of the referenced data is
// actually loaded.
for (auto readPct : std::vector<int32_t>{80, 50, 20, 0}) {
std::vector<LoadRequest*> storageLoad;
for (auto& request : requests) {
if (request.processed) {
continue;
}
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && tracker_) {
trackingData = tracker_->trackingData(request.trackingId);
}
if (prefetchAnyway || adjustedReadPct(trackingData) >= readPct) {
request.processed = true;
storageLoad.push_back(&request);
}
std::vector<LoadRequest*> storageLoad[2];
for (auto& request : requests) {
cache::TrackingData trackingData;
const bool prefetchAnyway = request.trackingId.empty() ||
request.trackingId.id() == StreamIdentifier::sequentialFile().id_;
if (!prefetchAnyway && tracker_) {
trackingData = tracker_->trackingData(request.trackingId);
}
makeLoads(std::move(storageLoad), isPrefetchablePct(readPct));
const int prefetch =
(prefetchAnyway || isPrefetchablePct(adjustedReadPct(trackingData)))
? 1
: 0;
storageLoad[prefetch].push_back(&request);
}
makeLoads(std::move(storageLoad[1]), true);
makeLoads(std::move(storageLoad[0]), false);
}

void DirectBufferedInput::makeLoads(
Expand Down Expand Up @@ -148,7 +133,7 @@ void DirectBufferedInput::makeLoads(
requests,
maxDistance,
// Break batches up. Better load more short ones i parallel.
1000, // limit coalesce by size, not count.
std::numeric_limits<int32_t>::max(), // limit coalesce by size, not count.
[&](int32_t index) { return requests[index]->region.offset; },
[&](int32_t index) -> int32_t {
auto size = requests[index]->region.length;
Expand Down
1 change: 0 additions & 1 deletion velox/dwio/common/DirectBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ struct LoadRequest {

velox::common::Region region;
cache::TrackingId trackingId;
bool processed{false};

const SeekableInputStream* stream;

Expand Down
3 changes: 3 additions & 0 deletions velox/dwio/dwrf/common/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ class DwrfStreamIdentifier : public dwio::common::StreamIdentifier {
uint32_t sequence,
uint32_t column,
StreamKind kind)
// Pruned flat map keys are not enqueued thus all flatmap values on same
// column should have similar read percentage, so it is ok for them to
// share the same TrackingData.
: StreamIdentifier(
velox::cache::TrackingId((node << kNodeShift) | kind).id()),
column_{column},
Expand Down

0 comments on commit fe8d3a1

Please sign in to comment.