diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheMetrics.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheMetrics.java index e92aa89022f35..075621e8cdccb 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheMetrics.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCacheMetrics.java @@ -7,15 +7,43 @@ package org.elasticsearch.blobcache; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.DoubleHistogram; import org.elasticsearch.telemetry.metric.LongCounter; import org.elasticsearch.telemetry.metric.LongHistogram; import org.elasticsearch.telemetry.metric.MeterRegistry; +import java.util.Map; +import java.util.concurrent.TimeUnit; + public class BlobCacheMetrics { + private static final Logger logger = LogManager.getLogger(BlobCacheMetrics.class); + + private static final double BYTES_PER_NANOSECONDS_TO_MEBIBYTES_PER_SECOND = 1e9D / (1 << 20); + public static final String CACHE_POPULATION_REASON_ATTRIBUTE_KEY = "reason"; + public static final String CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY = "source"; + public static final String SHARD_ID_ATTRIBUTE_KEY = "shard_id"; + public static final String INDEX_ATTRIBUTE_KEY = "index_name"; + private final LongCounter cacheMissCounter; private final LongCounter evictedCountNonZeroFrequency; private final LongHistogram cacheMissLoadTimes; + private final DoubleHistogram cachePopulationThroughput; + private final LongCounter cachePopulationBytes; + private final LongCounter cachePopulationTime; + + public enum CachePopulationReason { + /** + * When warming the cache + */ + Warming, + /** + * When the data we need is not in the cache + */ + CacheMiss + } public BlobCacheMetrics(MeterRegistry meterRegistry) { this( @@ -33,14 +61,39 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) { "es.blob_cache.cache_miss_load_times.histogram", "The time in milliseconds for populating entries in the blob store resulting from a cache miss, expressed as a histogram.", "ms" + ), + meterRegistry.registerDoubleHistogram( + "es.blob_cache.population.throughput.histogram", + "The throughput observed when populating the the cache", + "MiB/second" + ), + meterRegistry.registerLongCounter( + "es.blob_cache.population.bytes.total", + "The number of bytes that have been copied into the cache", + "bytes" + ), + meterRegistry.registerLongCounter( + "es.blob_cache.population.time.total", + "The time spent copying data into the cache", + "milliseconds" ) ); } - BlobCacheMetrics(LongCounter cacheMissCounter, LongCounter evictedCountNonZeroFrequency, LongHistogram cacheMissLoadTimes) { + BlobCacheMetrics( + LongCounter cacheMissCounter, + LongCounter evictedCountNonZeroFrequency, + LongHistogram cacheMissLoadTimes, + DoubleHistogram cachePopulationThroughput, + LongCounter cachePopulationBytes, + LongCounter cachePopulationTime + ) { this.cacheMissCounter = cacheMissCounter; this.evictedCountNonZeroFrequency = evictedCountNonZeroFrequency; this.cacheMissLoadTimes = cacheMissLoadTimes; + this.cachePopulationThroughput = cachePopulationThroughput; + this.cachePopulationBytes = cachePopulationBytes; + this.cachePopulationTime = cachePopulationTime; } public static BlobCacheMetrics NOOP = new BlobCacheMetrics(TelemetryProvider.NOOP.getMeterRegistry()); @@ -56,4 +109,55 @@ public LongCounter getEvictedCountNonZeroFrequency() { public LongHistogram getCacheMissLoadTimes() { return cacheMissLoadTimes; } + + /** + * Record the various cache population metrics after a chunk is copied to the cache + * + * @param bytesCopied The number of bytes copied + * @param copyTimeNanos The time taken to copy the bytes in nanoseconds + * @param index The index being loaded + * @param shardId The ID of the shard being loaded + * @param cachePopulationReason The reason for the cache being populated + * @param cachePopulationSource The source from which the data is being loaded + */ + public void recordCachePopulationMetrics( + int bytesCopied, + long copyTimeNanos, + String index, + int shardId, + CachePopulationReason cachePopulationReason, + CachePopulationSource cachePopulationSource + ) { + Map metricAttributes = Map.of( + INDEX_ATTRIBUTE_KEY, + index, + SHARD_ID_ATTRIBUTE_KEY, + shardId, + CACHE_POPULATION_REASON_ATTRIBUTE_KEY, + cachePopulationReason.name(), + CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY, + cachePopulationSource.name() + ); + assert bytesCopied > 0 : "We shouldn't be recording zero-sized copies"; + cachePopulationBytes.incrementBy(bytesCopied, metricAttributes); + + // This is almost certainly paranoid, but if we had a very fast/small copy with a very coarse nanosecond timer it might happen? + if (copyTimeNanos > 0) { + cachePopulationThroughput.record(toMebibytesPerSecond(bytesCopied, copyTimeNanos), metricAttributes); + cachePopulationTime.incrementBy(TimeUnit.NANOSECONDS.toMillis(copyTimeNanos), metricAttributes); + } else { + logger.warn("Zero-time copy being reported, ignoring"); + } + } + + /** + * Calculate throughput as MiB/second + * + * @param numberOfBytes The number of bytes transferred + * @param timeInNanoseconds The time taken to transfer in nanoseconds + * @return The throughput as MiB/second + */ + private double toMebibytesPerSecond(int numberOfBytes, long timeInNanoseconds) { + return ((double) numberOfBytes / timeInNanoseconds) * BYTES_PER_NANOSECONDS_TO_MEBIBYTES_PER_SECOND; + } } diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/CachePopulationSource.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/CachePopulationSource.java new file mode 100644 index 0000000000000..8cf4b1b548f7d --- /dev/null +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/CachePopulationSource.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.blobcache; + +/** + * The places we populate the cache from + */ +public enum CachePopulationSource { + /** + * When loading data from the blob-store + */ + BlobStore, + /** + * When fetching data from a peer node + */ + Peer, + /** + * We cannot determine the source (should not be used except in exceptional cases) + */ + Unknown +} diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 28a5eb164d049..8ca62a3b95023 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -398,6 +398,10 @@ public static long calculateCacheSize(Settings settings, long totalFsSize) { .getBytes(); } + public BlobCacheMetrics getBlobCacheMetrics() { + return blobCacheMetrics; + } + public int getRangeSize() { return rangeSize; } diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/BlobCacheMetricsTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/BlobCacheMetricsTests.java new file mode 100644 index 0000000000000..ea9d0b7356f0e --- /dev/null +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/BlobCacheMetricsTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.blobcache; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.RecordingMeterRegistry; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.concurrent.TimeUnit; + +public class BlobCacheMetricsTests extends ESTestCase { + + private RecordingMeterRegistry recordingMeterRegistry; + private BlobCacheMetrics metrics; + + @Before + public void createMetrics() { + recordingMeterRegistry = new RecordingMeterRegistry(); + metrics = new BlobCacheMetrics(recordingMeterRegistry); + } + + public void testRecordCachePopulationMetricsRecordsThroughput() { + int mebiBytesSent = randomIntBetween(1, 4); + int secondsTaken = randomIntBetween(1, 5); + String indexName = randomIdentifier(); + int shardId = randomIntBetween(0, 10); + BlobCacheMetrics.CachePopulationReason cachePopulationReason = randomFrom(BlobCacheMetrics.CachePopulationReason.values()); + CachePopulationSource cachePopulationSource = randomFrom(CachePopulationSource.values()); + metrics.recordCachePopulationMetrics( + Math.toIntExact(ByteSizeValue.ofMb(mebiBytesSent).getBytes()), + TimeUnit.SECONDS.toNanos(secondsTaken), + indexName, + shardId, + cachePopulationReason, + cachePopulationSource + ); + + // throughput histogram + Measurement throughputMeasurement = recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.DOUBLE_HISTOGRAM, "es.blob_cache.population.throughput.histogram") + .get(0); + assertEquals(throughputMeasurement.getDouble(), (double) mebiBytesSent / secondsTaken, 0.0); + assertExpectedAttributesPresent(throughputMeasurement, shardId, indexName, cachePopulationReason, cachePopulationSource); + + // bytes counter + Measurement totalBytesMeasurement = recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.population.bytes.total") + .get(0); + assertEquals(totalBytesMeasurement.getLong(), ByteSizeValue.ofMb(mebiBytesSent).getBytes()); + assertExpectedAttributesPresent(totalBytesMeasurement, shardId, indexName, cachePopulationReason, cachePopulationSource); + + // time counter + Measurement totalTimeMeasurement = recordingMeterRegistry.getRecorder() + .getMeasurements(InstrumentType.LONG_COUNTER, "es.blob_cache.population.time.total") + .get(0); + assertEquals(totalTimeMeasurement.getLong(), TimeUnit.SECONDS.toMillis(secondsTaken)); + assertExpectedAttributesPresent(totalTimeMeasurement, shardId, indexName, cachePopulationReason, cachePopulationSource); + } + + private static void assertExpectedAttributesPresent( + Measurement measurement, + int shardId, + String indexName, + BlobCacheMetrics.CachePopulationReason cachePopulationReason, + CachePopulationSource cachePopulationSource + ) { + assertEquals(measurement.attributes().get(BlobCacheMetrics.SHARD_ID_ATTRIBUTE_KEY), shardId); + assertEquals(measurement.attributes().get(BlobCacheMetrics.INDEX_ATTRIBUTE_KEY), indexName); + assertEquals(measurement.attributes().get(BlobCacheMetrics.CACHE_POPULATION_REASON_ATTRIBUTE_KEY), cachePopulationReason.name()); + assertEquals(measurement.attributes().get(BlobCacheMetrics.CACHE_POPULATION_SOURCE_ATTRIBUTE_KEY), cachePopulationSource.name()); + } +}