Skip to content

Commit

Permalink
Merge branch 'main' into wip-execmodes
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored Aug 27, 2024
2 parents ec7fadd + 763c19c commit 97a8b8e
Show file tree
Hide file tree
Showing 122 changed files with 4,312 additions and 917 deletions.
1 change: 1 addition & 0 deletions CMake/resolve_dependency_modules/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ if(VELOX_ENABLE_ARROW)
set(ARROW_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/arrow_ep")
set(ARROW_CMAKE_ARGS
-DARROW_PARQUET=OFF
-DARROW_DEPENDENCY_SOURCE=AUTO
-DARROW_WITH_THRIFT=ON
-DARROW_WITH_LZ4=ON
-DARROW_WITH_SNAPPY=ON
Expand Down
180 changes: 91 additions & 89 deletions velox/common/base/SpillStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,34 @@ SpillStats::SpillStats(
uint64_t _spilledRows,
uint32_t _spilledPartitions,
uint64_t _spilledFiles,
uint64_t _spillFillTimeUs,
uint64_t _spillSortTimeUs,
uint64_t _spillSerializationTimeUs,
uint64_t _spillFillTimeNanos,
uint64_t _spillSortTimeNanos,
uint64_t _spillSerializationTimeNanos,
uint64_t _spillWrites,
uint64_t _spillFlushTimeUs,
uint64_t _spillWriteTimeUs,
uint64_t _spillFlushTimeNanos,
uint64_t _spillWriteTimeNanos,
uint64_t _spillMaxLevelExceededCount,
uint64_t _spillReadBytes,
uint64_t _spillReads,
uint64_t _spillReadTimeUs,
uint64_t _spillDeserializationTimeUs)
uint64_t _spillReadTimeNanos,
uint64_t _spillDeserializationTimeNanos)
: spillRuns(_spillRuns),
spilledInputBytes(_spilledInputBytes),
spilledBytes(_spilledBytes),
spilledRows(_spilledRows),
spilledPartitions(_spilledPartitions),
spilledFiles(_spilledFiles),
spillFillTimeUs(_spillFillTimeUs),
spillSortTimeUs(_spillSortTimeUs),
spillSerializationTimeUs(_spillSerializationTimeUs),
spillFillTimeNanos(_spillFillTimeNanos),
spillSortTimeNanos(_spillSortTimeNanos),
spillSerializationTimeNanos(_spillSerializationTimeNanos),
spillWrites(_spillWrites),
spillFlushTimeUs(_spillFlushTimeUs),
spillWriteTimeUs(_spillWriteTimeUs),
spillFlushTimeNanos(_spillFlushTimeNanos),
spillWriteTimeNanos(_spillWriteTimeNanos),
spillMaxLevelExceededCount(_spillMaxLevelExceededCount),
spillReadBytes(_spillReadBytes),
spillReads(_spillReads),
spillReadTimeUs(_spillReadTimeUs),
spillDeserializationTimeUs(_spillDeserializationTimeUs) {}
spillReadTimeNanos(_spillReadTimeNanos),
spillDeserializationTimeNanos(_spillDeserializationTimeNanos) {}

SpillStats& SpillStats::operator+=(const SpillStats& other) {
spillRuns += other.spillRuns;
Expand All @@ -78,17 +78,17 @@ SpillStats& SpillStats::operator+=(const SpillStats& other) {
spilledRows += other.spilledRows;
spilledPartitions += other.spilledPartitions;
spilledFiles += other.spilledFiles;
spillFillTimeUs += other.spillFillTimeUs;
spillSortTimeUs += other.spillSortTimeUs;
spillSerializationTimeUs += other.spillSerializationTimeUs;
spillFillTimeNanos += other.spillFillTimeNanos;
spillSortTimeNanos += other.spillSortTimeNanos;
spillSerializationTimeNanos += other.spillSerializationTimeNanos;
spillWrites += other.spillWrites;
spillFlushTimeUs += other.spillFlushTimeUs;
spillWriteTimeUs += other.spillWriteTimeUs;
spillFlushTimeNanos += other.spillFlushTimeNanos;
spillWriteTimeNanos += other.spillWriteTimeNanos;
spillMaxLevelExceededCount += other.spillMaxLevelExceededCount;
spillReadBytes += other.spillReadBytes;
spillReads += other.spillReads;
spillReadTimeUs += other.spillReadTimeUs;
spillDeserializationTimeUs += other.spillDeserializationTimeUs;
spillReadTimeNanos += other.spillReadTimeNanos;
spillDeserializationTimeNanos += other.spillDeserializationTimeNanos;
return *this;
}

Expand All @@ -100,20 +100,20 @@ SpillStats SpillStats::operator-(const SpillStats& other) const {
result.spilledRows = spilledRows - other.spilledRows;
result.spilledPartitions = spilledPartitions - other.spilledPartitions;
result.spilledFiles = spilledFiles - other.spilledFiles;
result.spillFillTimeUs = spillFillTimeUs - other.spillFillTimeUs;
result.spillSortTimeUs = spillSortTimeUs - other.spillSortTimeUs;
result.spillSerializationTimeUs =
spillSerializationTimeUs - other.spillSerializationTimeUs;
result.spillFillTimeNanos = spillFillTimeNanos - other.spillFillTimeNanos;
result.spillSortTimeNanos = spillSortTimeNanos - other.spillSortTimeNanos;
result.spillSerializationTimeNanos =
spillSerializationTimeNanos - other.spillSerializationTimeNanos;
result.spillWrites = spillWrites - other.spillWrites;
result.spillFlushTimeUs = spillFlushTimeUs - other.spillFlushTimeUs;
result.spillWriteTimeUs = spillWriteTimeUs - other.spillWriteTimeUs;
result.spillFlushTimeNanos = spillFlushTimeNanos - other.spillFlushTimeNanos;
result.spillWriteTimeNanos = spillWriteTimeNanos - other.spillWriteTimeNanos;
result.spillMaxLevelExceededCount =
spillMaxLevelExceededCount - other.spillMaxLevelExceededCount;
result.spillReadBytes = spillReadBytes - other.spillReadBytes;
result.spillReads = spillReads - other.spillReads;
result.spillReadTimeUs = spillReadTimeUs - other.spillReadTimeUs;
result.spillDeserializationTimeUs =
spillDeserializationTimeUs - other.spillDeserializationTimeUs;
result.spillReadTimeNanos = spillReadTimeNanos - other.spillReadTimeNanos;
result.spillDeserializationTimeNanos =
spillDeserializationTimeNanos - other.spillDeserializationTimeNanos;
return result;
}

Expand All @@ -135,17 +135,17 @@ bool SpillStats::operator<(const SpillStats& other) const {
UPDATE_COUNTER(spilledRows);
UPDATE_COUNTER(spilledPartitions);
UPDATE_COUNTER(spilledFiles);
UPDATE_COUNTER(spillFillTimeUs);
UPDATE_COUNTER(spillSortTimeUs);
UPDATE_COUNTER(spillSerializationTimeUs);
UPDATE_COUNTER(spillFillTimeNanos);
UPDATE_COUNTER(spillSortTimeNanos);
UPDATE_COUNTER(spillSerializationTimeNanos);
UPDATE_COUNTER(spillWrites);
UPDATE_COUNTER(spillFlushTimeUs);
UPDATE_COUNTER(spillWriteTimeUs);
UPDATE_COUNTER(spillFlushTimeNanos);
UPDATE_COUNTER(spillWriteTimeNanos);
UPDATE_COUNTER(spillMaxLevelExceededCount);
UPDATE_COUNTER(spillReadBytes);
UPDATE_COUNTER(spillReads);
UPDATE_COUNTER(spillReadTimeUs);
UPDATE_COUNTER(spillDeserializationTimeUs);
UPDATE_COUNTER(spillReadTimeNanos);
UPDATE_COUNTER(spillDeserializationTimeNanos);
#undef UPDATE_COUNTER
VELOX_CHECK(
!((gtCount > 0) && (ltCount > 0)),
Expand Down Expand Up @@ -175,35 +175,35 @@ bool SpillStats::operator==(const SpillStats& other) const {
spilledRows,
spilledPartitions,
spilledFiles,
spillFillTimeUs,
spillSortTimeUs,
spillSerializationTimeUs,
spillFillTimeNanos,
spillSortTimeNanos,
spillSerializationTimeNanos,
spillWrites,
spillFlushTimeUs,
spillWriteTimeUs,
spillFlushTimeNanos,
spillWriteTimeNanos,
spillMaxLevelExceededCount,
spillReadBytes,
spillReads,
spillReadTimeUs,
spillDeserializationTimeUs) ==
spillReadTimeNanos,
spillDeserializationTimeNanos) ==
std::tie(
other.spillRuns,
other.spilledInputBytes,
other.spilledBytes,
other.spilledRows,
other.spilledPartitions,
other.spilledFiles,
other.spillFillTimeUs,
other.spillSortTimeUs,
other.spillSerializationTimeUs,
other.spillFillTimeNanos,
other.spillSortTimeNanos,
other.spillSerializationTimeNanos,
other.spillWrites,
other.spillFlushTimeUs,
other.spillWriteTimeUs,
other.spillFlushTimeNanos,
other.spillWriteTimeNanos,
spillMaxLevelExceededCount,
spillReadBytes,
spillReads,
spillReadTimeUs,
spillDeserializationTimeUs);
spillReadTimeNanos,
spillDeserializationTimeNanos);
}

void SpillStats::reset() {
Expand All @@ -213,44 +213,44 @@ void SpillStats::reset() {
spilledRows = 0;
spilledPartitions = 0;
spilledFiles = 0;
spillFillTimeUs = 0;
spillSortTimeUs = 0;
spillSerializationTimeUs = 0;
spillFillTimeNanos = 0;
spillSortTimeNanos = 0;
spillSerializationTimeNanos = 0;
spillWrites = 0;
spillFlushTimeUs = 0;
spillWriteTimeUs = 0;
spillFlushTimeNanos = 0;
spillWriteTimeNanos = 0;
spillMaxLevelExceededCount = 0;
spillReadBytes = 0;
spillReads = 0;
spillReadTimeUs = 0;
spillDeserializationTimeUs = 0;
spillReadTimeNanos = 0;
spillDeserializationTimeNanos = 0;
}

std::string SpillStats::toString() const {
return fmt::format(
"spillRuns[{}] spilledInputBytes[{}] spilledBytes[{}] spilledRows[{}] "
"spilledPartitions[{}] spilledFiles[{}] spillFillTimeUs[{}] "
"spillSortTime[{}] spillSerializationTime[{}] spillWrites[{}] "
"spillFlushTime[{}] spillWriteTime[{}] maxSpillExceededLimitCount[{}] "
"spillReadBytes[{}] spillReads[{}] spillReadTime[{}] "
"spillReadDeserializationTime[{}]",
"spilledPartitions[{}] spilledFiles[{}] spillFillTimeNanos[{}] "
"spillSortTimeNanos[{}] spillSerializationTimeNanos[{}] spillWrites[{}] "
"spillFlushTimeNanos[{}] spillWriteTimeNanos[{}] maxSpillExceededLimitCount[{}] "
"spillReadBytes[{}] spillReads[{}] spillReadTimeNanos[{}] "
"spillReadDeserializationTimeNanos[{}]",
spillRuns,
succinctBytes(spilledInputBytes),
succinctBytes(spilledBytes),
spilledRows,
spilledPartitions,
spilledFiles,
succinctMicros(spillFillTimeUs),
succinctMicros(spillSortTimeUs),
succinctMicros(spillSerializationTimeUs),
succinctNanos(spillFillTimeNanos),
succinctNanos(spillSortTimeNanos),
succinctNanos(spillSerializationTimeNanos),
spillWrites,
succinctMicros(spillFlushTimeUs),
succinctMicros(spillWriteTimeUs),
succinctNanos(spillFlushTimeNanos),
succinctNanos(spillWriteTimeNanos),
spillMaxLevelExceededCount,
succinctBytes(spillReadBytes),
spillReads,
succinctMicros(spillReadTimeUs),
succinctMicros(spillDeserializationTimeUs));
succinctNanos(spillReadTimeNanos),
succinctNanos(spillDeserializationTimeNanos));
}

void updateGlobalSpillRunStats(uint64_t numRuns) {
Expand All @@ -260,52 +260,54 @@ void updateGlobalSpillRunStats(uint64_t numRuns) {

void updateGlobalSpillAppendStats(
uint64_t numRows,
uint64_t serializationTimeUs) {
uint64_t serializationTimeNs) {
RECORD_METRIC_VALUE(kMetricSpilledRowsCount, numRows);
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricSpillSerializationTimeMs, serializationTimeUs / 1'000);
kMetricSpillSerializationTimeMs, serializationTimeNs / 1'000'000);
auto statsLocked = localSpillStats().wlock();
statsLocked->spilledRows += numRows;
statsLocked->spillSerializationTimeUs += serializationTimeUs;
statsLocked->spillSerializationTimeNanos += serializationTimeNs;
}

void incrementGlobalSpilledPartitionStats() {
++localSpillStats().wlock()->spilledPartitions;
}

void updateGlobalSpillFillTime(uint64_t timeUs) {
RECORD_HISTOGRAM_METRIC_VALUE(kMetricSpillFillTimeMs, timeUs / 1'000);
localSpillStats().wlock()->spillFillTimeUs += timeUs;
void updateGlobalSpillFillTime(uint64_t timeNs) {
RECORD_HISTOGRAM_METRIC_VALUE(kMetricSpillFillTimeMs, timeNs / 1'000'000);
localSpillStats().wlock()->spillFillTimeNanos += timeNs;
}

void updateGlobalSpillSortTime(uint64_t timeUs) {
RECORD_HISTOGRAM_METRIC_VALUE(kMetricSpillSortTimeMs, timeUs / 1'000);
localSpillStats().wlock()->spillSortTimeUs += timeUs;
void updateGlobalSpillSortTime(uint64_t timeNs) {
RECORD_HISTOGRAM_METRIC_VALUE(kMetricSpillSortTimeMs, timeNs / 1'000'000);
localSpillStats().wlock()->spillSortTimeNanos += timeNs;
}

void updateGlobalSpillWriteStats(
uint64_t spilledBytes,
uint64_t flushTimeUs,
uint64_t writeTimeUs) {
uint64_t flushTimeNs,
uint64_t writeTimeNs) {
RECORD_METRIC_VALUE(kMetricSpillWritesCount);
RECORD_METRIC_VALUE(kMetricSpilledBytes, spilledBytes);
RECORD_HISTOGRAM_METRIC_VALUE(kMetricSpillFlushTimeMs, flushTimeUs / 1'000);
RECORD_HISTOGRAM_METRIC_VALUE(kMetricSpillWriteTimeMs, writeTimeUs / 1'000);
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricSpillFlushTimeMs, flushTimeNs / 1'000'000);
RECORD_HISTOGRAM_METRIC_VALUE(
kMetricSpillWriteTimeMs, writeTimeNs / 1'000'000);
auto statsLocked = localSpillStats().wlock();
++statsLocked->spillWrites;
statsLocked->spilledBytes += spilledBytes;
statsLocked->spillFlushTimeUs += flushTimeUs;
statsLocked->spillWriteTimeUs += writeTimeUs;
statsLocked->spillFlushTimeNanos += flushTimeNs;
statsLocked->spillWriteTimeNanos += writeTimeNs;
}

void updateGlobalSpillReadStats(
uint64_t spillReads,
uint64_t spillReadBytes,
uint64_t spillRadTimeUs) {
uint64_t spillReadTimeNs) {
auto statsLocked = localSpillStats().wlock();
statsLocked->spillReads += spillReads;
statsLocked->spillReadBytes += spillReadBytes;
statsLocked->spillReadTimeUs += spillRadTimeUs;
statsLocked->spillReadTimeNanos += spillReadTimeNs;
}

void updateGlobalSpillMemoryBytes(uint64_t spilledInputBytes) {
Expand All @@ -325,8 +327,8 @@ void updateGlobalMaxSpillLevelExceededCount(
maxSpillLevelExceededCount;
}

void updateGlobalSpillDeserializationTimeUs(uint64_t timeUs) {
localSpillStats().wlock()->spillDeserializationTimeUs += timeUs;
void updateGlobalSpillDeserializationTimeNs(uint64_t timeNs) {
localSpillStats().wlock()->spillDeserializationTimeNanos += timeNs;
}

SpillStats globalSpillStats() {
Expand Down
Loading

0 comments on commit 97a8b8e

Please sign in to comment.