Skip to content

Commit

Permalink
chore: add benchmark for RSM fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Sep 22, 2023
1 parent 994fd3f commit 1d200a0
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 0 deletions.
2 changes: 2 additions & 0 deletions benchmarks/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.properties
*.pem
44 changes: 44 additions & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2021 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id 'java'
}

ext {
jmhVersion = "1.36"
}

dependencies {
implementation project(':core')
implementation group: "org.apache.kafka", name: "kafka-storage-api", version: kafkaVersion
implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion

implementation project(":storage:s3")
implementation "org.testcontainers:localstack:$testcontainersVersion"
implementation ("software.amazon.awssdk:s3:$awsSdkVersion") {
exclude group: "com.fasterxml.jackson.core"
exclude group: "com.fasterxml.jackson.dataformat"
exclude group: "org.slf4j"
}
implementation "javax.xml.bind:jaxb-api:2.3.1"

implementation "org.openjdk.jmh:jmh-core:$jmhVersion"
implementation "org.openjdk.jmh:jmh-core-benchmarks:$jmhVersion"
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmhVersion"

runtimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.benchmark;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;

import io.aiven.kafka.tieredstorage.RemoteStorageManager;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.profile.AsyncProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 2, time = 10)
@Measurement(iterations = 3, time = 30)
@BenchmarkMode({Mode.SampleTime, Mode.Throughput})
@OutputTimeUnit(TimeUnit.SECONDS)
public class RemoteStorageManagerFetchLogBenchmark {
RemoteStorageManager rsm = new RemoteStorageManager();

@Setup(Level.Trial)
public void setup() throws IOException {

final var tmpDir = Files.createTempDirectory("rsm-cache");
final var compression = false;
final var encryption = true;
final var cacheClass = DiskBasedChunkCache.class.getCanonicalName();
// Configure the RSM.
final var cacheDir = tmpDir.resolve("cache");
Files.createDirectories(cacheDir);

final var props = new Properties();
props.load(Files.newInputStream(Path.of("rsm.properties")));
final Map<String, String> config = new HashMap<>();
props.forEach((k, v) -> config.put((String) k, (String) v));
// 5MiB
final int chunkSize = 5 * 1024 * 1024;
config.putAll(Map.of(
"chunk.size", Integer.toString(chunkSize),
"compression.enabled", Boolean.toString(compression),
"encryption.enabled", Boolean.toString(encryption),
"chunk.cache.class", cacheClass,
"chunk.cache.path", cacheDir.toString(),
"chunk.cache.size", Integer.toString(100 * 1024 * 1024),
"custom.metadata.fields.include", "REMOTE_SIZE,OBJECT_PREFIX,OBJECT_KEY"
));

rsm.configure(config);
}

@Benchmark
public int read(final Blackhole blackhole) {
final var tip =
new TopicIdPartition(Uuid.fromString("xPDCxWn7SdC5HGRVktllxg"), 0, "test-topic-0000000-1riOu0I");
final RemoteLogSegmentMetadata meta = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(tip, Uuid.fromString("i7VvlUbeQR-ncOPMXpILYg")), 0, 2185L,
0, 0, 0, 200 * 1024 * 1024, Map.of(0, 0L));
InputStream content = null;
try {
content = rsm.fetchLogSegment(meta, 0);
final var capacity = 50 * 1024 * 1024;
Utils.readFully(content, ByteBuffer.allocate(capacity));
blackhole.consume(capacity);
return capacity;
} catch (final Exception e) {
throw new RuntimeException(e);
} finally {
if (content != null) {
Utils.closeQuietly(content, "yay");
}
}
}

public static void main(final String[] args) throws Exception {
new Runner(new OptionsBuilder()
.include(RemoteStorageManagerFetchLogBenchmark.class.getSimpleName())
.addProfiler(AsyncProfiler.class, "-output=flamegraph;event=cpu")
.build()).run();
}
}
21 changes: 21 additions & 0 deletions benchmarks/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright 2023 Aiven Oy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="MetricCollector.java"/>
<suppress checks="ClassDataAbstractionCoupling" files=".*Benchmark\.java"/>
<suppress checks="CyclomaticComplexity" files="MetricCollector.java"/>
<suppress checks="CyclomaticComplexity" files="SingleBrokerTest.java"/>
<suppress checks="CyclomaticComplexity" files="FetchChunkEnumeration.java"/>
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ include 'storage:gcs'
include 'storage:s3'
include 'e2e'
include 'commons'
include 'benchmarks'

0 comments on commit 1d200a0

Please sign in to comment.