From 6f57cc9faaf928db0c9be774a8935a8f5e9e7da6 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Fri, 22 Sep 2023 15:08:32 +0300 Subject: [PATCH 1/2] chore: add benchmark for RSM fetch --- benchmarks/.gitignore | 2 + benchmarks/build.gradle | 44 +++++++ ...RemoteStorageManagerFetchLogBenchmark.java | 122 ++++++++++++++++++ .../src/main/resources/log4j.properties | 21 +++ checkstyle/suppressions.xml | 1 + settings.gradle | 1 + 6 files changed, 191 insertions(+) create mode 100644 benchmarks/.gitignore create mode 100644 benchmarks/build.gradle create mode 100644 benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchmark/RemoteStorageManagerFetchLogBenchmark.java create mode 100644 benchmarks/src/main/resources/log4j.properties diff --git a/benchmarks/.gitignore b/benchmarks/.gitignore new file mode 100644 index 000000000..b4920767f --- /dev/null +++ b/benchmarks/.gitignore @@ -0,0 +1,2 @@ +*.properties +*.pem \ No newline at end of file diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle new file mode 100644 index 000000000..73dfb76c3 --- /dev/null +++ b/benchmarks/build.gradle @@ -0,0 +1,44 @@ +/* + * Copyright 2021 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id 'java' +} + +ext { + jmhVersion = "1.36" +} + +dependencies { + implementation project(':core') + implementation group: "org.apache.kafka", name: "kafka-storage-api", version: kafkaVersion + implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion + + implementation project(":storage:s3") + implementation "org.testcontainers:localstack:$testcontainersVersion" + implementation ("software.amazon.awssdk:s3:$awsSdkVersion") { + exclude group: "com.fasterxml.jackson.core" + exclude group: "com.fasterxml.jackson.dataformat" + exclude group: "org.slf4j" + } + implementation "javax.xml.bind:jaxb-api:2.3.1" + + implementation "org.openjdk.jmh:jmh-core:$jmhVersion" + implementation "org.openjdk.jmh:jmh-core-benchmarks:$jmhVersion" + annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmhVersion" + + runtimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" +} \ No newline at end of file diff --git a/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchmark/RemoteStorageManagerFetchLogBenchmark.java b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchmark/RemoteStorageManagerFetchLogBenchmark.java new file mode 100644 index 000000000..3efeab49d --- /dev/null +++ b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchmark/RemoteStorageManagerFetchLogBenchmark.java @@ -0,0 +1,122 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.benchmark; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; + +import io.aiven.kafka.tieredstorage.RemoteStorageManager; +import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.profile.AsyncProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 2, time = 10) +@Measurement(iterations = 3, time = 30) +@BenchmarkMode({Mode.SampleTime, Mode.Throughput}) +@OutputTimeUnit(TimeUnit.SECONDS) +public class RemoteStorageManagerFetchLogBenchmark { + RemoteStorageManager rsm = new RemoteStorageManager(); + + @Setup(Level.Trial) + public void setup() throws IOException { + + final var tmpDir = Files.createTempDirectory("rsm-cache"); + final var compression = false; + final var encryption = true; + final var cacheClass = DiskBasedChunkCache.class.getCanonicalName(); + // Configure the RSM. + final var cacheDir = tmpDir.resolve("cache"); + Files.createDirectories(cacheDir); + + final var props = new Properties(); + props.load(Files.newInputStream(Path.of("rsm.properties"))); + final Map config = new HashMap<>(); + props.forEach((k, v) -> config.put((String) k, (String) v)); + // 5MiB + final int chunkSize = 5 * 1024 * 1024; + config.putAll(Map.of( + "chunk.size", Integer.toString(chunkSize), + "compression.enabled", Boolean.toString(compression), + "encryption.enabled", Boolean.toString(encryption), + "chunk.cache.class", cacheClass, + "chunk.cache.path", cacheDir.toString(), + "chunk.cache.size", Integer.toString(100 * 1024 * 1024), + "custom.metadata.fields.include", "REMOTE_SIZE,OBJECT_PREFIX,OBJECT_KEY" + )); + + rsm.configure(config); + } + + @Benchmark + public int read(final Blackhole blackhole) { + final var tip = + new TopicIdPartition(Uuid.fromString("xPDCxWn7SdC5HGRVktllxg"), 0, "test-topic-0000000-1riOu0I"); + final RemoteLogSegmentMetadata meta = new RemoteLogSegmentMetadata( + new RemoteLogSegmentId(tip, Uuid.fromString("i7VvlUbeQR-ncOPMXpILYg")), 0, 2185L, + 0, 0, 0, 200 * 1024 * 1024, Map.of(0, 0L)); + InputStream content = null; + try { + content = rsm.fetchLogSegment(meta, 0); + final var capacity = 50 * 1024 * 1024; + Utils.readFully(content, ByteBuffer.allocate(capacity)); + blackhole.consume(capacity); + return capacity; + } catch (final Exception e) { + throw new RuntimeException(e); + } finally { + if (content != null) { + Utils.closeQuietly(content, "yay"); + } + } + } + + public static void main(final String[] args) throws Exception { + new Runner(new OptionsBuilder() + .include(RemoteStorageManagerFetchLogBenchmark.class.getSimpleName()) + .addProfiler(AsyncProfiler.class, "-output=flamegraph;event=cpu") + .build()).run(); + } +} diff --git a/benchmarks/src/main/resources/log4j.properties b/benchmarks/src/main/resources/log4j.properties new file mode 100644 index 000000000..3e86cfa01 --- /dev/null +++ b/benchmarks/src/main/resources/log4j.properties @@ -0,0 +1,21 @@ +# +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e2e0a11e6..4bb841b73 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -28,6 +28,7 @@ + diff --git a/settings.gradle b/settings.gradle index 425fbe138..4e138ae1a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -23,3 +23,4 @@ include 'storage:gcs' include 'storage:s3' include 'e2e' include 'commons' +include 'benchmarks' From aecf79f586dc1853275485a301d92e5387cd934f Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 25 Sep 2023 14:59:14 +0300 Subject: [PATCH 2/2] fix: remove caching --- .../benchmark/RemoteStorageManagerFetchLogBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchmark/RemoteStorageManagerFetchLogBenchmark.java b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchmark/RemoteStorageManagerFetchLogBenchmark.java index 3efeab49d..740cb2db7 100644 --- a/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchmark/RemoteStorageManagerFetchLogBenchmark.java +++ b/benchmarks/src/main/java/io/aiven/kafka/tieredstorage/benchmark/RemoteStorageManagerFetchLogBenchmark.java @@ -83,7 +83,7 @@ public void setup() throws IOException { "encryption.enabled", Boolean.toString(encryption), "chunk.cache.class", cacheClass, "chunk.cache.path", cacheDir.toString(), - "chunk.cache.size", Integer.toString(100 * 1024 * 1024), + "chunk.cache.size", Integer.toString(100), "custom.metadata.fields.include", "REMOTE_SIZE,OBJECT_PREFIX,OBJECT_KEY" ));