Skip to content

Commit

Permalink
Add additional BlobCacheMetrics, expose BlobCacheMetrics via SharedBl…
Browse files Browse the repository at this point in the history
…obCacheService (elastic#111730)

Relates: ES-9067
  • Loading branch information
nicktindall authored Aug 15, 2024
1 parent 15890e1 commit 5934190
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,43 @@

package org.elasticsearch.blobcache;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.DoubleHistogram;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class BlobCacheMetrics {
private static final Logger logger = LogManager.getLogger(BlobCacheMetrics.class);

private static final double BYTES_PER_NANOSECONDS_TO_MEBIBYTES_PER_SECOND = 1e9D / (1 << 20);
public static final String CACHE_POPULATION_REASON_ATTRIBUTE_KEY = "reason";
public static final String CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY = "source";
public static final String SHARD_ID_ATTRIBUTE_KEY = "shard_id";
public static final String INDEX_ATTRIBUTE_KEY = "index_name";

private final LongCounter cacheMissCounter;
private final LongCounter evictedCountNonZeroFrequency;
private final LongHistogram cacheMissLoadTimes;
private final DoubleHistogram cachePopulationThroughput;
private final LongCounter cachePopulationBytes;
private final LongCounter cachePopulationTime;

public enum CachePopulationReason {
/**
* When warming the cache
*/
Warming,
/**
* When the data we need is not in the cache
*/
CacheMiss
}

public BlobCacheMetrics(MeterRegistry meterRegistry) {
this(
Expand All @@ -33,14 +61,39 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
"es.blob_cache.cache_miss_load_times.histogram",
"The time in milliseconds for populating entries in the blob store resulting from a cache miss, expressed as a histogram.",
"ms"
),
meterRegistry.registerDoubleHistogram(
"es.blob_cache.population.throughput.histogram",
"The throughput observed when populating the the cache",
"MiB/second"
),
meterRegistry.registerLongCounter(
"es.blob_cache.population.bytes.total",
"The number of bytes that have been copied into the cache",
"bytes"
),
meterRegistry.registerLongCounter(
"es.blob_cache.population.time.total",
"The time spent copying data into the cache",
"milliseconds"
)
);
}

BlobCacheMetrics(LongCounter cacheMissCounter, LongCounter evictedCountNonZeroFrequency, LongHistogram cacheMissLoadTimes) {
BlobCacheMetrics(
LongCounter cacheMissCounter,
LongCounter evictedCountNonZeroFrequency,
LongHistogram cacheMissLoadTimes,
DoubleHistogram cachePopulationThroughput,
LongCounter cachePopulationBytes,
LongCounter cachePopulationTime
) {
this.cacheMissCounter = cacheMissCounter;
this.evictedCountNonZeroFrequency = evictedCountNonZeroFrequency;
this.cacheMissLoadTimes = cacheMissLoadTimes;
this.cachePopulationThroughput = cachePopulationThroughput;
this.cachePopulationBytes = cachePopulationBytes;
this.cachePopulationTime = cachePopulationTime;
}

public static BlobCacheMetrics NOOP = new BlobCacheMetrics(TelemetryProvider.NOOP.getMeterRegistry());
Expand All @@ -56,4 +109,55 @@ public LongCounter getEvictedCountNonZeroFrequency() {
public LongHistogram getCacheMissLoadTimes() {
return cacheMissLoadTimes;
}

/**
* Record the various cache population metrics after a chunk is copied to the cache
*
* @param bytesCopied The number of bytes copied
* @param copyTimeNanos The time taken to copy the bytes in nanoseconds
* @param index The index being loaded
* @param shardId The ID of the shard being loaded
* @param cachePopulationReason The reason for the cache being populated
* @param cachePopulationSource The source from which the data is being loaded
*/
public void recordCachePopulationMetrics(
int bytesCopied,
long copyTimeNanos,
String index,
int shardId,
CachePopulationReason cachePopulationReason,
CachePopulationSource cachePopulationSource
) {
Map<String, Object> metricAttributes = Map.of(
INDEX_ATTRIBUTE_KEY,
index,
SHARD_ID_ATTRIBUTE_KEY,
shardId,
CACHE_POPULATION_REASON_ATTRIBUTE_KEY,
cachePopulationReason.name(),
CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY,
cachePopulationSource.name()
);
assert bytesCopied > 0 : "We shouldn't be recording zero-sized copies";
cachePopulationBytes.incrementBy(bytesCopied, metricAttributes);

// This is almost certainly paranoid, but if we had a very fast/small copy with a very coarse nanosecond timer it might happen?
if (copyTimeNanos > 0) {
cachePopulationThroughput.record(toMebibytesPerSecond(bytesCopied, copyTimeNanos), metricAttributes);
cachePopulationTime.incrementBy(TimeUnit.NANOSECONDS.toMillis(copyTimeNanos), metricAttributes);
} else {
logger.warn("Zero-time copy being reported, ignoring");
}
}

/**
* Calculate throughput as MiB/second
*
* @param numberOfBytes The number of bytes transferred
* @param timeInNanoseconds The time taken to transfer in nanoseconds
* @return The throughput as MiB/second
*/
private double toMebibytesPerSecond(int numberOfBytes, long timeInNanoseconds) {
return ((double) numberOfBytes / timeInNanoseconds) * BYTES_PER_NANOSECONDS_TO_MEBIBYTES_PER_SECOND;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.blobcache;

/**
* The places we populate the cache from
*/
public enum CachePopulationSource {
/**
* When loading data from the blob-store
*/
BlobStore,
/**
* When fetching data from a peer node
*/
Peer,
/**
* We cannot determine the source (should not be used except in exceptional cases)
*/
Unknown
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ public static long calculateCacheSize(Settings settings, long totalFsSize) {
.getBytes();
}

public BlobCacheMetrics getBlobCacheMetrics() {
return blobCacheMetrics;
}

public int getRangeSize() {
return rangeSize;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.blobcache;

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.util.concurrent.TimeUnit;

public class BlobCacheMetricsTests extends ESTestCase {

private RecordingMeterRegistry recordingMeterRegistry;
private BlobCacheMetrics metrics;

@Before
public void createMetrics() {
recordingMeterRegistry = new RecordingMeterRegistry();
metrics = new BlobCacheMetrics(recordingMeterRegistry);
}

public void testRecordCachePopulationMetricsRecordsThroughput() {
int mebiBytesSent = randomIntBetween(1, 4);
int secondsTaken = randomIntBetween(1, 5);
String indexName = randomIdentifier();
int shardId = randomIntBetween(0, 10);
BlobCacheMetrics.CachePopulationReason cachePopulationReason = randomFrom(BlobCacheMetrics.CachePopulationReason.values());
CachePopulationSource cachePopulationSource = randomFrom(CachePopulationSource.values());
metrics.recordCachePopulationMetrics(
Math.toIntExact(ByteSizeValue.ofMb(mebiBytesSent).getBytes()),
TimeUnit.SECONDS.toNanos(secondsTaken),
indexName,
shardId,
cachePopulationReason,
cachePopulationSource
);

// throughput histogram
Measurement throughputMeasurement = recordingMeterRegistry.getRecorder()
.getMeasurements(InstrumentType.DOUBLE_HISTOGRAM, "es.blob_cache.population.throughput.histogram")
.get(0);
assertEquals(throughputMeasurement.getDouble(), (double) mebiBytesSent / secondsTaken, 0.0);
assertExpectedAttributesPresent(throughputMeasurement, shardId, indexName, cachePopulationReason, cachePopulationSource);

// bytes counter
Measurement totalBytesMeasurement = recordingMeterRegistry.getRecorder()
.getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.population.bytes.total")
.get(0);
assertEquals(totalBytesMeasurement.getLong(), ByteSizeValue.ofMb(mebiBytesSent).getBytes());
assertExpectedAttributesPresent(totalBytesMeasurement, shardId, indexName, cachePopulationReason, cachePopulationSource);

// time counter
Measurement totalTimeMeasurement = recordingMeterRegistry.getRecorder()
.getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.population.time.total")
.get(0);
assertEquals(totalTimeMeasurement.getLong(), TimeUnit.SECONDS.toMillis(secondsTaken));
assertExpectedAttributesPresent(totalTimeMeasurement, shardId, indexName, cachePopulationReason, cachePopulationSource);
}

private static void assertExpectedAttributesPresent(
Measurement measurement,
int shardId,
String indexName,
BlobCacheMetrics.CachePopulationReason cachePopulationReason,
CachePopulationSource cachePopulationSource
) {
assertEquals(measurement.attributes().get(BlobCacheMetrics.SHARD_ID_ATTRIBUTE_KEY), shardId);
assertEquals(measurement.attributes().get(BlobCacheMetrics.INDEX_ATTRIBUTE_KEY), indexName);
assertEquals(measurement.attributes().get(BlobCacheMetrics.CACHE_POPULATION_REASON_ATTRIBUTE_KEY), cachePopulationReason.name());
assertEquals(measurement.attributes().get(BlobCacheMetrics.CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY), cachePopulationSource.name());
}
}

0 comments on commit 5934190

Please sign in to comment.