Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Mode support: Adding memory mode, and implementing it for Asynchronous Instruments #5709

Merged
merged 45 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
cb35698
With pool that is based on array
asafm Jul 23, 2023
cd5a0f7
Async now work at 99.8% less memory allocations
asafm Jul 27, 2023
216a6f5
Working code, with beautified code (almost all) and less code in Pool…
asafm Aug 1, 2023
8639239
Code looks good. Next are tests
asafm Aug 2, 2023
7c7507d
Unit tests done
asafm Aug 3, 2023
01d2215
jmh test almost ready with prototype
asafm Aug 6, 2023
e8ac068
test/code is ready
asafm Aug 9, 2023
bcc1a73
Merge remote-tracking branch 'origin/main' into memory-allocations-async
asafm Aug 9, 2023
d667889
Fixed all Gradle check task errors
asafm Aug 10, 2023
8cce0c4
Small rename
asafm Aug 13, 2023
501f819
Removed JOL. Added javadocs.
asafm Aug 13, 2023
0ecb4ec
Removed draft code
asafm Aug 13, 2023
93f25eb
Update sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/…
asafm Aug 27, 2023
4aea7c9
Update sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/…
asafm Aug 27, 2023
01513f3
Update sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/…
asafm Aug 27, 2023
25ced28
Update sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/interna…
asafm Aug 27, 2023
4a35ccd
Added internal warning
asafm Aug 27, 2023
52278e6
Change naming of ImmutableMeasurement factory method to be consistent…
asafm Aug 27, 2023
d14f275
Making code more readable with 2 with* methods on Measurement
asafm Aug 27, 2023
62a71bd
Making code more readable with replacing switch with if for memory mode
asafm Aug 27, 2023
6648c7c
PR fixes, including perf improvement.
asafm Aug 27, 2023
031f2c2
PR fixes
asafm Aug 29, 2023
6093c38
More PR fixes
asafm Aug 29, 2023
e2a9007
Merging
asafm Aug 30, 2023
098ca71
Merge remote-tracking branch 'origin/main' into memory-allocations-async
asafm Aug 30, 2023
a7a481e
spotless fixes
asafm Aug 30, 2023
f7c1608
Removed nullable from MutableDoublePointData
asafm Aug 30, 2023
ed02001
Merge remote-tracking branch 'origin/main' into memory-allocations-async
asafm Aug 30, 2023
0ea62f1
Fix tiny imports issue
asafm Aug 31, 2023
4c32a26
Merge remote-tracking branch 'origin/main' into memory-allocations-async
asafm Aug 31, 2023
16d4713
checkstyle
asafm Aug 31, 2023
69ecf76
More PR fixes
asafm Sep 3, 2023
8e52652
More PR fixes
asafm Sep 6, 2023
cfbf50c
More PR fixes
asafm Sep 21, 2023
59c713e
Trying to see if this fixes failure of AsynchronousMetricStorageGarba…
asafm Sep 21, 2023
ea04e14
Merge remote-tracking branch 'origin/main' into memory-allocations-async
asafm Sep 21, 2023
0e3f1c1
Update sdk/common/src/main/java/io/opentelemetry/sdk/common/export/Me…
asafm Sep 26, 2023
5b2f3ba
Update sdk/common/src/main/java/io/opentelemetry/sdk/common/export/Me…
asafm Sep 26, 2023
87c9073
Update sdk/common/src/main/java/io/opentelemetry/sdk/common/export/Me…
asafm Sep 26, 2023
4964d6a
Update sdk/common/src/main/java/io/opentelemetry/sdk/common/export/Me…
asafm Sep 26, 2023
b6f1ef6
Update sdk/common/src/main/java/io/opentelemetry/sdk/common/export/Me…
asafm Sep 26, 2023
52582d8
Update sdk/common/src/main/java/io/opentelemetry/sdk/common/export/Me…
asafm Sep 26, 2023
8cf00ba
style check fixes
asafm Sep 26, 2023
0d09946
Add API changes and improved instructions a bit
asafm Sep 26, 2023
b5378c5
MD fixes
asafm Sep 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ val DEPENDENCIES = listOf(
"io.opencensus:opencensus-contrib-exemplar-util:${opencensusVersion}",
"org.openjdk.jmh:jmh-core:${jmhVersion}",
"org.openjdk.jmh:jmh-generator-bytecode:${jmhVersion}",
"org.openjdk.jmh:jmh-generator-annprocess:${jmhVersion}",
"org.mockito:mockito-core:${mockitoVersion}",
"org.mockito:mockito-junit-jupiter:${mockitoVersion}",
"org.slf4j:slf4j-simple:${slf4jVersion}",
Expand Down
9 changes: 9 additions & 0 deletions sdk/metrics/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {

jmh(project(":sdk:trace"))
jmh(project(":sdk:testing"))
jmh(project(":sdk:metrics:testing-internal"))
}

testing {
Expand All @@ -41,6 +42,14 @@ testing {
}
}
}
register<JvmTestSuite>("jmhBasedTest") {
dependencies {
implementation("org.openjdk.jmh:jmh-core")
implementation("org.openjdk.jmh:jmh-generator-bytecode")
annotationProcessor("org.openjdk.jmh:jmh-generator-annprocess")
implementation(project(":sdk:metrics:testing-internal"))
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,19 @@

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -37,6 +33,7 @@
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jol.info.GraphLayout;

/**
* Measures the collection path for various histogram aggregations. Should be used primarily to
Expand All @@ -48,6 +45,7 @@
@Warmup(iterations = 2, batchSize = 10)
@Fork(1)
public class HistogramCollectBenchmark {
private static final Logger logger = Logger.getLogger(HistogramCollectBenchmark.class.getName());

private static final int cardinality = 100;
private static final int measurementsPerSeries = 10_000;
Expand All @@ -64,31 +62,24 @@ public static class ThreadState {

@Setup
public void setup() {
SdkMeterProviderBuilder builder =
SdkMeterProvider.builder()
.registerMetricReader(
PeriodicMetricReader.builder(
// Configure an exporter that configures the temporality and aggregation
// for the test case, but otherwise drops the data on export
new NoopMetricExporter(
aggregationTemporality, aggregationGenerator.aggregation))
// Effectively disable periodic reading so reading is only done on #flush()
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
.build());
PeriodicMetricReader metricReader =
PeriodicMetricReader.builder(
// Configure an exporter that configures the temporality and aggregation
// for the test case, but otherwise drops the data on export
new NoopMetricExporter(aggregationTemporality, aggregationGenerator.aggregation))
// Effectively disable periodic reading so reading is only done on #flush()
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
.build();
SdkMeterProviderBuilder builder = SdkMeterProvider.builder();
SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(
builder, metricReader, unused -> cardinality);
// Disable examplars
SdkMeterProviderUtil.setExemplarFilter(builder, ExemplarFilter.alwaysOff());
sdkMeterProvider = builder.build();
histogram = sdkMeterProvider.get("meter").histogramBuilder("histogram").build();

random = new Random();
attributesList = new ArrayList<>(cardinality);
String last = "aaaaaaaaaaaaaaaaaaaaaaaaaa";
for (int i = 0; i < cardinality; i++) {
char[] chars = last.toCharArray();
chars[random.nextInt(last.length())] = (char) (random.nextInt(26) + 'a');
last = new String(chars);
attributesList.add(Attributes.builder().put("key", last).build());
}
attributesList = AttributesGenerator.generate(cardinality);
}

@TearDown
Expand Down Expand Up @@ -122,40 +113,4 @@ public enum AggregationGenerator {
this.aggregation = aggregation;
}
}

private static class NoopMetricExporter implements MetricExporter {
private final AggregationTemporality aggregationTemporality;
private final Aggregation aggregation;

private NoopMetricExporter(
AggregationTemporality aggregationTemporality, Aggregation aggregation) {
this.aggregationTemporality = aggregationTemporality;
this.aggregation = aggregation;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}

@Override
public Aggregation getDefaultAggregation(InstrumentType instrumentType) {
return aggregation;
}

@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return aggregationTemporality;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;

import static org.assertj.core.api.Fail.fail;

import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.MemoryMode;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Benchmarks the memory usage of different memory modes and aggregation temporalities
*
* <p>This benchmark helps to see the difference in heap usage between the memory modes. You can
* change the aggregation temporality, number of counters and cardinality (attribute sets count).
* You have to run it manually for each parameter, since it's not JMH-based, and we need a clean
* heap before we start.
*
* <p>This benchmark class has additional usage: You can use it to see memory allocation frame graphs for a
* single run.
*
* <p>Steps:
* <ol>
* <li> Follow download instructions for async-profiler, located at
* https://github.com/async-profiler/async-profiler
* <li>Assuming you have extracted it at
* /tmp/async-profiler-2.9-macos, add the following to your JVM arguments of your run configuration:
*
* <p>-agentpath:/tmp/async-profiler-2.9-macos/build/libasyncProfiler.so=start,event=alloc,flamegraph,file=/tmp/profiled_data.html
*
* <li>Tune the parameters as you see fit
* <li>Run the class
* <li>Open /tmp/profiled_data.html with your browser
* </ol>
*/
@SuppressWarnings("SystemOut")
public class AsynchronousCounterMemoryUsageBenchmark {
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
AtomicLong maxUsedMemory = new AtomicLong(0);

private AsynchronousCounterMemoryUsageBenchmark() {}

/**
* Runs the memory usage benchmark.
*
* <p>Set the parameters in {@link AsynchronousCounterMemoryUsageBenchmark#measure()}
*
* @param args Unused
*/
public static void main(String[] args)
throws ExecutionException, InterruptedException, TimeoutException {
AsynchronousCounterMemoryUsageBenchmark asynchronousCounterMemoryUsageBenchmark =
new AsynchronousCounterMemoryUsageBenchmark();
asynchronousCounterMemoryUsageBenchmark.measure();
}

@SuppressWarnings("DefaultCharset")
private void measure()
throws ExecutionException, InterruptedException, TimeoutException {
// Parameters
AggregationTemporality aggregationTemporality = AggregationTemporality.CUMULATIVE;
MemoryMode memoryMode = MemoryMode.IMMUTABLE_DATA;
int countersCount = 1;
int cardinality = 100_000;

AsynchronousMetricStorageGarbageCollectionBenchmark.ThreadState benchmarkSetup =
new AsynchronousMetricStorageGarbageCollectionBenchmark.ThreadState(
countersCount, cardinality);

benchmarkSetup.aggregationTemporality = aggregationTemporality;
benchmarkSetup.memoryMode = memoryMode;

AsynchronousMetricStorageGarbageCollectionBenchmark benchmark =
new AsynchronousMetricStorageGarbageCollectionBenchmark();

benchmarkSetup.setup();

waitForGarbageCollection();
long usedMemoryBefore = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();

ScheduledFuture<?> scheduledFuture = startMeasuringUsedMemoryInBackground();
try {
runBenchmark(benchmark, benchmarkSetup);
} finally {
executorService.shutdown();
}

try {
scheduledFuture.get(10, TimeUnit.SECONDS);
} catch (CancellationException e) {
// Due to the shutdown, ignore
}

waitForGarbageCollection();
long usedMemoryAfter = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
long memoryUsedButFreed = maxUsedMemory.get() - usedMemoryAfter;

System.out.printf(
""
+ "\nCounters = %d, Cardinality = %,d"
+ "\n%s, %s: "
+ "\nmemoryUsedButFreed = %,15d [bytes]"
+ "\nmaxMemoryUsedDuringCollection = %,15d [bytes]"
+ "\nmemoryUsedBeforeCollectionStart = %,15d [bytes]"
+ "\nmemoryUsedAfterCollectionFinished = %,15d [bytes]%n",
countersCount,
cardinality,
benchmarkSetup.aggregationTemporality,
benchmarkSetup.memoryMode,
memoryUsedButFreed,
maxUsedMemory.get(),
usedMemoryBefore,
usedMemoryAfter);
}

private static void waitForGarbageCollection() throws InterruptedException {
List<Long> collectionCountBefore = getGarbageCollectorsCollectionCount();
boolean oneGcCountIncreased;
int attempts = 0;
do {
System.gc();
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
} while ((oneGcCountIncreased = oneOfGarbageCollectorsCountIncreased(collectionCountBefore))
&& ++attempts < 3);
if (!oneGcCountIncreased) {
fail("Failed to get GC in " + attempts + " attempts");
}
}

private static boolean oneOfGarbageCollectorsCountIncreased(List<Long> collectionCountBefore) {
List<Long> gcCollectionCountNow = getGarbageCollectorsCollectionCount();
for (int i = 0; i < gcCollectionCountNow.size(); i++) {
Long now = gcCollectionCountNow.get(i);
if (now > collectionCountBefore.get(i)) {
return true;
}
}
return false;
}

private static List<Long> getGarbageCollectorsCollectionCount() {
List<Long> collectionCountBefore = new ArrayList<>();
ManagementFactory.getGarbageCollectorMXBeans()
.forEach(
garbageCollectorMXBean ->
collectionCountBefore.add(garbageCollectorMXBean.getCollectionCount()));
return collectionCountBefore;
}

private ScheduledFuture<?> startMeasuringUsedMemoryInBackground() {
Runnable measureMaxMemory =
() -> {
try {
long used = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getUsed();
if (used > maxUsedMemory.get()) {
maxUsedMemory.set(used);
}
} catch (Throwable t) {
t.printStackTrace();
throw t;
}
};

return executorService.scheduleWithFixedDelay(measureMaxMemory, 0, 2, TimeUnit.MILLISECONDS);
}

private static void runBenchmark(
AsynchronousMetricStorageGarbageCollectionBenchmark benchmark,
AsynchronousMetricStorageGarbageCollectionBenchmark.ThreadState threadState) {
benchmark.recordAndCollect(threadState);
}
}
Loading