Skip to content

Commit

Permalink
[native]Advance velox and update metric code
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Dec 5, 2023
1 parent 67dd546 commit b435c29
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 243 deletions.
171 changes: 82 additions & 89 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
#include <sys/resource.h>

namespace {
#define REPORT_IF_NOT_ZERO(name, counter) \
if ((counter) != 0) { \
REPORT_ADD_STAT_VALUE((name), (counter)); \
#define REPORT_IF_NOT_ZERO(name, counter) \
if ((counter) != 0) { \
RECORD_METRIC_VALUE((name), (counter)); \
}
} // namespace

Expand Down Expand Up @@ -128,14 +128,14 @@ void PeriodicTaskManager::stop() {
void PeriodicTaskManager::updateExecutorStats() {
if (driverCPUExecutor_ != nullptr) {
// Report the current queue size of the thread pool.
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterDriverCPUExecutorQueueSize,
driverCPUExecutor_->getTaskQueueSize());

// Report driver execution latency.
folly::stop_watch<std::chrono::milliseconds> timer;
driverCPUExecutor_->add([timer = timer]() {
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterDriverCPUExecutorLatencyMs, timer.elapsed().count());
});
}
Expand All @@ -144,7 +144,7 @@ void PeriodicTaskManager::updateExecutorStats() {
// Report the latency between scheduling the task and its execution.
folly::stop_watch<std::chrono::milliseconds> timer;
httpExecutor_->add([timer = timer]() {
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterHTTPExecutorLatencyMs, timer.elapsed().count());
});
}
Expand All @@ -161,25 +161,25 @@ void PeriodicTaskManager::updateTaskStats() {
// Report the number of tasks and drivers in the system.
size_t numTasks{0};
auto taskNumbers = taskManager_->getTaskNumbers(numTasks);
REPORT_ADD_STAT_VALUE(kCounterNumTasks, taskManager_->getNumTasks());
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(kCounterNumTasks, taskManager_->getNumTasks());
RECORD_METRIC_VALUE(
kCounterNumTasksRunning, taskNumbers[velox::exec::TaskState::kRunning]);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterNumTasksFinished, taskNumbers[velox::exec::TaskState::kFinished]);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterNumTasksCancelled,
taskNumbers[velox::exec::TaskState::kCanceled]);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterNumTasksAborted, taskNumbers[velox::exec::TaskState::kAborted]);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterNumTasksFailed, taskNumbers[velox::exec::TaskState::kFailed]);

auto driverCountStats = taskManager_->getDriverCountStats();
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterNumRunningDrivers, driverCountStats.numRunningDrivers);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterNumBlockedDrivers, driverCountStats.numBlockedDrivers);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterTotalPartitionedOutputBuffer,
velox::exec::OutputBufferManager::getInstance().lock()->numBuffers());
}
Expand All @@ -206,20 +206,20 @@ void PeriodicTaskManager::addOldTaskCleanupTask() {
}

void PeriodicTaskManager::updateMemoryAllocatorStats() {
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMappedMemoryBytes,
(velox::memory::AllocationTraits::pageBytes(
memoryAllocator_->numMapped())));
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterAllocatedMemoryBytes,
(velox::memory::AllocationTraits::pageBytes(
memoryAllocator_->numAllocated())));
// TODO(jtan6): Remove condition after T150019700 is done
if (auto* mmapAllocator =
dynamic_cast<const velox::memory::MmapAllocator*>(memoryAllocator_)) {
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMmapRawAllocBytesSmall, (mmapAllocator->numMallocBytes()));
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMmapExternalMappedBytes,
velox::memory::AllocationTraits::pageBytes(
(mmapAllocator->numExternalMapped())));
Expand All @@ -240,7 +240,7 @@ void PeriodicTaskManager::updatePrestoExchangeSourceMemoryStats() {
PrestoExchangeSource::getMemoryUsage(
currQueuedMemoryBytes, peakQueuedMemoryBytes);
PrestoExchangeSource::resetPeakMemoryUsage();
REPORT_ADD_HISTOGRAM_VALUE(
RECORD_HISTOGRAM_METRIC_VALUE(
kCounterExchangeSourcePeakQueuedBytes, peakQueuedMemoryBytes);
}

Expand All @@ -255,49 +255,49 @@ void PeriodicTaskManager::updateCacheStats() {
const auto memoryCacheStats = asyncDataCache_->refreshStats();

// Snapshots.
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumEntries, memoryCacheStats.numEntries);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumEmptyEntries, memoryCacheStats.numEmptyEntries);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumSharedEntries, memoryCacheStats.numShared);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumExclusiveEntries, memoryCacheStats.numExclusive);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumPrefetchedEntries, memoryCacheStats.numPrefetch);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalTinyBytes, memoryCacheStats.tinySize);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalLargeBytes, memoryCacheStats.largeSize);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalTinyPaddingBytes, memoryCacheStats.tinyPadding);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalLargePaddingBytes, memoryCacheStats.largePadding);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheTotalPrefetchBytes, memoryCacheStats.prefetchBytes);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheSumEvictScore, memoryCacheStats.sumEvictScore);

// Interval cumulatives.
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumHit,
memoryCacheStats.numHit - lastMemoryCacheHits_);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheHitBytes,
memoryCacheStats.hitBytes - lastMemoryCacheHitsBytes_);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumNew,
memoryCacheStats.numNew - lastMemoryCacheInserts_);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumEvict,
memoryCacheStats.numEvict - lastMemoryCacheEvictions_);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumEvictChecks,
memoryCacheStats.numEvictChecks - lastMemoryCacheEvictionChecks_);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumWaitExclusive,
memoryCacheStats.numWaitExclusive - lastMemoryCacheStalls_);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumAllocClocks,
memoryCacheStats.allocClocks - lastMemoryCacheAllocClocks_);

Expand All @@ -310,67 +310,67 @@ void PeriodicTaskManager::updateCacheStats() {
lastMemoryCacheAllocClocks_ = memoryCacheStats.allocClocks;

// All time cumulatives.
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeHit, memoryCacheStats.numHit);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheCumulativeHitBytes, memoryCacheStats.hitBytes);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeNew, memoryCacheStats.numNew);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeEvict, memoryCacheStats.numEvict);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeEvictChecks,
memoryCacheStats.numEvictChecks);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeWaitExclusive,
memoryCacheStats.numWaitExclusive);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterMemoryCacheNumCumulativeAllocClocks,
memoryCacheStats.allocClocks);
if (memoryCacheStats.ssdStats != nullptr) {
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeReadEntries,
memoryCacheStats.ssdStats->entriesRead)
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeReadBytes,
memoryCacheStats.ssdStats->bytesRead);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeWrittenEntries,
memoryCacheStats.ssdStats->entriesWritten);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeWrittenBytes,
memoryCacheStats.ssdStats->bytesWritten);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeCachedEntries,
memoryCacheStats.ssdStats->entriesCached);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeCachedBytes,
memoryCacheStats.ssdStats->bytesCached);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeOpenSsdErrors,
memoryCacheStats.ssdStats->openFileErrors);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeOpenCheckpointErrors,
memoryCacheStats.ssdStats->openCheckpointErrors);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeOpenLogErrors,
memoryCacheStats.ssdStats->openLogErrors);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeDeleteCheckpointErrors,
memoryCacheStats.ssdStats->deleteCheckpointErrors);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeGrowFileErrors,
memoryCacheStats.ssdStats->growFileErrors);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeWriteSsdErrors,
memoryCacheStats.ssdStats->writeSsdErrors);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeWriteCheckpointErrors,
memoryCacheStats.ssdStats->writeCheckpointErrors);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeReadSsdErrors,
memoryCacheStats.ssdStats->readSsdErrors);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterSsdCacheCumulativeReadCheckpointErrors,
memoryCacheStats.ssdStats->readCheckpointErrors);
}
Expand Down Expand Up @@ -411,20 +411,15 @@ void PeriodicTaskManager::addConnectorStatsTask() {
oldValues[kNumLookupsMetricName] = 0;

// Exporting metrics types here since the metrics key is dynamic
REPORT_ADD_STAT_EXPORT_TYPE(
kNumElementsMetricName, facebook::velox::StatType::AVG);
REPORT_ADD_STAT_EXPORT_TYPE(
kPinnedSizeMetricName, facebook::velox::StatType::AVG);
REPORT_ADD_STAT_EXPORT_TYPE(
kCurSizeMetricName, facebook::velox::StatType::AVG);
REPORT_ADD_STAT_EXPORT_TYPE(
DEFINE_METRIC(kNumElementsMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(kPinnedSizeMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(kCurSizeMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(
kNumAccumulativeHitsMetricName, facebook::velox::StatType::AVG);
REPORT_ADD_STAT_EXPORT_TYPE(
DEFINE_METRIC(
kNumAccumulativeLookupsMetricName, facebook::velox::StatType::AVG);
REPORT_ADD_STAT_EXPORT_TYPE(
kNumHitsMetricName, facebook::velox::StatType::AVG);
REPORT_ADD_STAT_EXPORT_TYPE(
kNumLookupsMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(kNumHitsMetricName, facebook::velox::StatType::AVG);
DEFINE_METRIC(kNumLookupsMetricName, facebook::velox::StatType::AVG);

addTask(
[hiveConnector,
Expand All @@ -437,22 +432,22 @@ void PeriodicTaskManager::addConnectorStatsTask() {
kNumHitsMetricName,
kNumLookupsMetricName]() {
auto fileHandleCacheStats = hiveConnector->fileHandleCacheStats();
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kNumElementsMetricName, fileHandleCacheStats.numElements);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kPinnedSizeMetricName, fileHandleCacheStats.pinnedSize);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCurSizeMetricName, fileHandleCacheStats.curSize);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kNumAccumulativeHitsMetricName, fileHandleCacheStats.numHits);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kNumAccumulativeLookupsMetricName,
fileHandleCacheStats.numLookups);
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kNumHitsMetricName,
fileHandleCacheStats.numHits - oldValues[kNumHitsMetricName]);
oldValues[kNumHitsMetricName] = fileHandleCacheStats.numHits;
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kNumLookupsMetricName,
fileHandleCacheStats.numLookups -
oldValues[kNumLookupsMetricName]);
Expand All @@ -472,35 +467,35 @@ void PeriodicTaskManager::updateOperatingSystemStats() {
const int64_t userCpuTimeUs{
(int64_t)usage.ru_utime.tv_sec * 1'000'000 +
(int64_t)usage.ru_utime.tv_usec};
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterOsUserCpuTimeMicros, userCpuTimeUs - lastUserCpuTimeUs_);
lastUserCpuTimeUs_ = userCpuTimeUs;

const int64_t systemCpuTimeUs{
(int64_t)usage.ru_stime.tv_sec * 1'000'000 +
(int64_t)usage.ru_stime.tv_usec};
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterOsSystemCpuTimeMicros, systemCpuTimeUs - lastSystemCpuTimeUs_);
lastSystemCpuTimeUs_ = systemCpuTimeUs;

const int64_t softPageFaults{usage.ru_minflt};
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterOsNumSoftPageFaults, softPageFaults - lastSoftPageFaults_);
lastSoftPageFaults_ = softPageFaults;

const int64_t hardPageFaults{usage.ru_majflt};
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterOsNumHardPageFaults, hardPageFaults - lastHardPageFaults_);
lastHardPageFaults_ = hardPageFaults;

const int64_t voluntaryContextSwitches{usage.ru_nvcsw};
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterOsNumVoluntaryContextSwitches,
voluntaryContextSwitches - lastVoluntaryContextSwitches_);
lastVoluntaryContextSwitches_ = voluntaryContextSwitches;

const int64_t forcedContextSwitches{usage.ru_nivcsw};
REPORT_ADD_STAT_VALUE(
RECORD_METRIC_VALUE(
kCounterOsNumForcedContextSwitches,
forcedContextSwitches - lastForcedContextSwitches_);
lastForcedContextSwitches_ = forcedContextSwitches;
Expand Down Expand Up @@ -595,10 +590,8 @@ void PeriodicTaskManager::updateSpillStatsTask() {
LOG(INFO) << "Spill memory usage: current["
<< velox::succinctBytes(spillMemoryStats.currentBytes) << "] peak["
<< velox::succinctBytes(spillMemoryStats.peakBytes) << "]";
REPORT_ADD_STAT_VALUE(
kCounterSpillMemoryBytes, spillMemoryStats.currentBytes);
REPORT_ADD_STAT_VALUE(
kCounterSpillPeakMemoryBytes, spillMemoryStats.peakBytes);
RECORD_METRIC_VALUE(kCounterSpillMemoryBytes, spillMemoryStats.currentBytes);
RECORD_METRIC_VALUE(kCounterSpillPeakMemoryBytes, spillMemoryStats.peakBytes);

lastSpillStats_ = updatedSpillStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ PrestoExchangeSource::PrestoExchangeSource(
clientCertAndKeyPath_,
ciphers_,
[](size_t bufferBytes) {
REPORT_ADD_STAT_VALUE(kCounterHttpClientPrestoExchangeNumOnBody);
REPORT_ADD_HISTOGRAM_VALUE(
RECORD_METRIC_VALUE(kCounterHttpClientPrestoExchangeNumOnBody);
RECORD_HISTOGRAM_METRIC_VALUE(
kCounterHttpClientPrestoExchangeOnBodyBytes, bufferBytes);
});
}
Expand Down Expand Up @@ -292,7 +292,7 @@ void PrestoExchangeSource::processDataResponse(

const int64_t pageSize = empty ? 0 : page->size();

REPORT_ADD_HISTOGRAM_VALUE(
RECORD_HISTOGRAM_METRIC_VALUE(
kCounterPrestoExchangeSerializedPageSize, pageSize);

{
Expand Down
Loading

0 comments on commit b435c29

Please sign in to comment.