Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional BlobCacheMetrics, expose BlobCacheMetrics via SharedBlobCacheService #111730

Merged
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5dacb69
Add callback for copy-to-cache metrics, additional BlobCacheMetrics
nicktindall Aug 9, 2024
c4b6487
Update docs/changelog/111730.yaml
nicktindall Aug 9, 2024
ad68e99
Shorten metric names (exceeded max length)
nicktindall Aug 9, 2024
181b958
Align attribute key name
nicktindall Aug 9, 2024
07dfc5a
Remove callback from method not used by stateless
nicktindall Aug 9, 2024
9c8ee42
Add CachePopulationReason.CacheMiss
nicktindall Aug 9, 2024
752b1ef
Revert "Remove callback from method not used by stateless"
nicktindall Aug 9, 2024
c0189f4
NO_OP -> NOOP
nicktindall Aug 9, 2024
7c34720
Remove metrics from copyToCacheFileAligned not used
nicktindall Aug 12, 2024
69d58ec
Merge remote-tracking branch 'origin/main' into feature/ES-9067_expos…
nicktindall Aug 12, 2024
c823f75
Add todo
nicktindall Aug 12, 2024
10a32f6
Protect against divide-by-zero
nicktindall Aug 12, 2024
439152c
Remove unused CachePopulationReason#LoadCommit
nicktindall Aug 12, 2024
f8dfabc
Merge branch 'main' into feature/ES-9067_expose_cache_copy_metrics
nicktindall Aug 13, 2024
5c38dbf
Simplify blob-cache population notifications, extract interface
nicktindall Aug 13, 2024
d504094
Add time/bytes counter, make throughput MB/s
nicktindall Aug 13, 2024
c8e6d28
Fix metric names
nicktindall Aug 13, 2024
03a8f1d
Fix javadocs
nicktindall Aug 13, 2024
47a0a35
Be more defensive when handling metrics
nicktindall Aug 13, 2024
a71d70b
Don't notify when no bytes were copied
nicktindall Aug 13, 2024
95d2e5f
Add test for SharedBytes#copyToCacheFileAligned
nicktindall Aug 14, 2024
e18a283
Fix comment, ensure room for at least a byte
nicktindall Aug 14, 2024
168efe1
Use strings for metric values
nicktindall Aug 14, 2024
f60d341
Take shard ID as a string, to reduce string concatenation
nicktindall Aug 14, 2024
c938e89
Remove BlobCachePopulationListener
nicktindall Aug 15, 2024
8f5967a
Add source to BlobCacheMetrics
nicktindall Aug 15, 2024
45fb734
Add test, extract CachePopulationSource
nicktindall Aug 15, 2024
71c0b2c
Expose BlobCacheMetrics from SharedBlobCacheService
nicktindall Aug 15, 2024
91405f2
Fix test name
nicktindall Aug 15, 2024
03a0140
Delete docs/changelog/111730.yaml
nicktindall Aug 15, 2024
df97c59
Improve metric and attribute names
nicktindall Aug 15, 2024
8295d35
Randomise BlobCacheMetricsTests
nicktindall Aug 15, 2024
a903534
De-duplicate
nicktindall Aug 15, 2024
e8edc63
Fix spotless
nicktindall Aug 15, 2024
81e2776
Add Unknown CachePopulationSource
nicktindall Aug 15, 2024
b17f316
Apply feedback
nicktindall Aug 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/111730.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111730
summary: "Add callback for copy-to-cache metrics, additional `BlobCacheMetrics`"
area: Store
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,37 @@

package org.elasticsearch.blobcache;

import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.DoubleHistogram;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.Map;
import java.util.concurrent.TimeUnit;

public class BlobCacheMetrics {
public static final String CACHE_POPULATION_REASON_ATTRIBUTE_KEY = "cachePopulationReason";
public static final String SHARD_ID_ATTRIBUTE_KEY = "shardId";
ywangd marked this conversation as resolved.
Show resolved Hide resolved

private final LongCounter cacheMissCounter;
private final LongCounter evictedCountNonZeroFrequency;
private final LongHistogram cacheMissLoadTimes;
private final DoubleHistogram cachePopulateThroughput;
private final LongCounter cachePopulationBytes;
private final LongCounter cachePopulationTime;

public enum CachePopulationReason {
/**
* When warming the cache
*/
Warming,
/**
* When the data we need is not in the cache
*/
CacheMiss
}

This comment was marked as outdated.


public BlobCacheMetrics(MeterRegistry meterRegistry) {
this(
Expand All @@ -33,14 +55,39 @@ public BlobCacheMetrics(MeterRegistry meterRegistry) {
"es.blob_cache.cache_miss_load_times.histogram",
"The time in milliseconds for populating entries in the blob store resulting from a cache miss, expressed as a histogram.",
"ms"
),
meterRegistry.registerDoubleHistogram(
"es.blob_cache.populate_throughput.histogram",
"The throughput when populating the blob store from the cache",
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
"MB/second"
),
Copy link
Contributor Author

@nicktindall nicktindall Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use MiB/s to work around LongHistogram limit of 131,072

meterRegistry.registerLongCounter(
"es.blob_cache.populate_bytes.total",
"The number of bytes that have been loaded into the cache",
"bytes"
),
meterRegistry.registerLongCounter(
"es.blob_cache.populate_time.total",
"The time spent copying data into the cache",
"milliseconds"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed the discussion somewhere: I thought this should be a histogram similar to s3 http request time?

Copy link
Contributor Author

@nicktindall nicktindall Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did discuss this, the feeling was that because we've got the throughput distribution, it might give us more flexibility to record population bytes and time as raw totals. Leaving them as raw totals leaves more options for aggregation in the charts (e.g. how much did we download when warming shard X, how long did we spend warming index Y, how much did we download due to warming when that node failed) I don't think you can answer those questions with bytes/time histograms, (I think) they can only tell us the distribution of chunk sizes or chunk download times in some window.

)
);
}

BlobCacheMetrics(LongCounter cacheMissCounter, LongCounter evictedCountNonZeroFrequency, LongHistogram cacheMissLoadTimes) {
BlobCacheMetrics(
LongCounter cacheMissCounter,
LongCounter evictedCountNonZeroFrequency,
LongHistogram cacheMissLoadTimes,
DoubleHistogram cachePopulateThroughput,
LongCounter cachePopulationBytes,
LongCounter cachePopulationTime
) {
this.cacheMissCounter = cacheMissCounter;
this.evictedCountNonZeroFrequency = evictedCountNonZeroFrequency;
this.cacheMissLoadTimes = cacheMissLoadTimes;
this.cachePopulateThroughput = cachePopulateThroughput;
this.cachePopulationBytes = cachePopulationBytes;
this.cachePopulationTime = cachePopulationTime;
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
}

public static BlobCacheMetrics NOOP = new BlobCacheMetrics(TelemetryProvider.NOOP.getMeterRegistry());
Expand All @@ -56,4 +103,47 @@ public LongCounter getEvictedCountNonZeroFrequency() {
public LongHistogram getCacheMissLoadTimes() {
return cacheMissLoadTimes;
}

/**
* Record the various cache population metrics after a chunk is copied to the cache
*
* @param totalBytesCopied The total number of bytes copied
* @param totalCopyTimeNanos The time taken to copy the bytes in nanoseconds
* @param shardId The shard ID to which the chunk belonged
* @param cachePopulationReason The reason for the cache being populated
*/
public void recordCachePopulationMetrics(
int totalBytesCopied,
long totalCopyTimeNanos,
nicktindall marked this conversation as resolved.
Show resolved Hide resolved
ShardId shardId,
CachePopulationReason cachePopulationReason
) {
Map<String, Object> metricAttributes = Map.of(
SHARD_ID_ATTRIBUTE_KEY,
shardId.toString(),
CACHE_POPULATION_REASON_ATTRIBUTE_KEY,
cachePopulationReason.name()
);
ywangd marked this conversation as resolved.
Show resolved Hide resolved
assert totalBytesCopied > 0 : "We shouldn't be recording zero-sized copies";
cachePopulationBytes.incrementBy(totalBytesCopied, metricAttributes);

// This is almost certainly paranoid, but if we had a very fast/small copy with a very coarse nanosecond timer it might happen?
if (totalCopyTimeNanos > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add a warning log in the else branch similar to how we log a warning if s3 metric does not have a valid request time metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b17f316

I couldn't find the warning you were referring to, but I did add one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.warn("Expected HttpRequestTime to be tracked for request [{}] but found no count.", request);

cachePopulateThroughput.record(toMegabytesPerSecond(totalBytesCopied, totalCopyTimeNanos), metricAttributes);
cachePopulationTime.incrementBy(TimeUnit.NANOSECONDS.toMillis(totalCopyTimeNanos), metricAttributes);
}
}

/**
* Calculate throughput as megabytes/second
*
* @param totalBytes The total number of bytes transferred
* @param totalNanoseconds The time to transfer in nanoseconds
* @return The throughput as megabytes/second
*/
private double toMegabytesPerSecond(int totalBytes, long totalNanoseconds) {
double totalSeconds = totalNanoseconds / 1_000_000_000.0;
double totalMegabytes = totalBytes / 1_000_000.0;
return totalMegabytes / totalSeconds;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used Mebibytes because that's what ByteSizeValue#ofMb uses

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.blobcache.shared;

public interface BlobCachePopulationListener {

BlobCachePopulationListener NOOP = (bc, ctn) -> {};

/**
* Notify of a blob cache population that occurred
*
* Note that <code>bytesCopied</code> is approximate, there are cases where we write
* more than we read, due to page alignment, and there are times when we read more
* than we write, e.g. when filling multiple gaps. Notifiers should try and record
* the larger of those two numbers when invoking this method.
*
* @param bytesCopied The number of bytes copied into the cache
* @param copyTimeNanos The time in nanoseconds taken to copy those bytes
*/
void onCachePopulation(int bytesCopied, long copyTimeNanos);
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ static void preallocate(Path cacheFile, long fileSize) throws IOException {
* @param buf bytebuffer to use for writing
* @throws IOException on failure
*/
// TODO: Support BlobCachePopulationListener when required
public static void copyToCacheFileAligned(
IO fc,
InputStream input,
Expand Down Expand Up @@ -212,11 +213,19 @@ public static void copyToCacheFileAligned(
* @param fileChannelPos position in {@code fc} to write to
* @param progressUpdater callback to invoke with the number of copied bytes as they are copied
* @param buffer bytebuffer to use for writing
* @param populationListener a listener that will be notified upon completion of the copy
* @return the number of bytes copied
* @throws IOException on failure
*/
public static int copyToCacheFileAligned(IO fc, InputStream input, int fileChannelPos, IntConsumer progressUpdater, ByteBuffer buffer)
throws IOException {
public static int copyToCacheFileAligned(
IO fc,
InputStream input,
int fileChannelPos,
IntConsumer progressUpdater,
ByteBuffer buffer,
BlobCachePopulationListener populationListener
) throws IOException {
final long copyStartTime = System.nanoTime();
int bytesCopied = 0;
while (true) {
final int bytesRead = Streams.read(input, buffer, buffer.remaining());
Expand All @@ -226,6 +235,10 @@ public static int copyToCacheFileAligned(IO fc, InputStream input, int fileChann
bytesCopied += copyBufferToCacheFileAligned(fc, fileChannelPos + bytesCopied, buffer);
progressUpdater.accept(bytesCopied);
}
if (bytesCopied > 0) {
long elapsedTimeNanos = System.nanoTime() - copyStartTime;
populationListener.onCachePopulation(bytesCopied, elapsedTimeNanos);
}
return bytesCopied;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,24 @@
package org.elasticsearch.blobcache.shared;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;

import static org.elasticsearch.blobcache.shared.SharedBytes.PAGE_SIZE;
import static org.hamcrest.Matchers.greaterThan;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;

public class SharedBytesTests extends ESTestCase {

public void testReleasesFileCorrectly() throws Exception {
Expand All @@ -41,4 +50,78 @@ public void testReleasesFileCorrectly() throws Exception {
assertFalse(Files.exists(sharedBytesPath));
}
}

public void testPopulationListenerIsCalledOnSuccess() throws IOException {
doWithSharedBytes(sharedBytes -> {
// position + length must be <= region size
final int position = randomIntBetween(0, sharedBytes.regionSize - 1);
final int alignedPosition = position - position % PAGE_SIZE;
final int streamLength = randomIntBetween(1, sharedBytes.regionSize - alignedPosition);
logger.info("Copying {} bytes to position {} (region size={})", streamLength, alignedPosition, sharedBytes.regionSize);
final int[] bytesCopiedHolder = new int[1];
final long[] copyTimeNanosHolder = new long[1];
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(streamLength))) {
SharedBytes.copyToCacheFileAligned(
sharedBytes.getFileChannel(0),
inputStream,
alignedPosition,
p -> {},
ByteBuffer.allocate(randomIntBetween(1, 4) * PAGE_SIZE),
(bytesCopied, copyTimeNanos) -> {
bytesCopiedHolder[0] = bytesCopied;
copyTimeNanosHolder[0] = copyTimeNanos;
}
);
final int partialPage = streamLength % PAGE_SIZE;
final int pageAlignedCopySize = partialPage == 0 ? streamLength : streamLength + PAGE_SIZE - partialPage;
assertEquals(pageAlignedCopySize, bytesCopiedHolder[0]);
assertThat(copyTimeNanosHolder[0], greaterThan(0L));
}
});
}

public void testPopulationListenerIsNotCalledForZeroLengthStream() throws IOException {
doWithSharedBytes(sharedBytes -> {
final BlobCachePopulationListener listener = mock(BlobCachePopulationListener.class);
final int position = randomIntBetween(0, sharedBytes.regionSize);
final int alignedPosition = position - position % PAGE_SIZE;
logger.info("Copying nothing to position {} (region size={})", alignedPosition, sharedBytes.regionSize);
try (ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[] {})) {
SharedBytes.copyToCacheFileAligned(
sharedBytes.getFileChannel(0),
inputStream,
alignedPosition,
p -> {},
ByteBuffer.allocate(randomIntBetween(1, 4) * PAGE_SIZE),
listener
);
verifyNoInteractions(listener);
}
});
}

private <E extends Exception> void doWithSharedBytes(CheckedConsumer<SharedBytes, E> consumer) throws E, IOException {
int regions = randomIntBetween(1, 10);
var nodeSettings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), "node")
.put("path.home", createTempDir())
.putList(Environment.PATH_DATA_SETTING.getKey(), createTempDir().toString())
.build();
try (var nodeEnv = new NodeEnvironment(nodeSettings, TestEnvironment.newEnvironment(nodeSettings))) {
SharedBytes sharedBytes = null;
try {
sharedBytes = new SharedBytes(
regions,
randomIntBetween(1, 16) * 4096,
nodeEnv,
ignored -> {},
ignored -> {},
IOUtils.WINDOWS == false && randomBoolean()
);
consumer.accept(sharedBytes);
} finally {
sharedBytes.decRef();
}
}
}
}