Skip to content

Commit

Permalink
feat(e2e): add e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Aug 23, 2023
1 parent 0e8166f commit b60e6fe
Show file tree
Hide file tree
Showing 14 changed files with 1,309 additions and 12 deletions.
10 changes: 9 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,19 @@ subprojects {
slf4jVersion = "1.7.36"

// Don't bump this version without need, as this is the min supported version for the plugin.
kafkaVersion = "3.0.0"
kafkaVersion = "3.3.2"

assertJVersion = "3.24.2"

apacheCommonsIOVersion = "2.13.0"

jacksonVersion = "2.15.2"

awaitilityVersion = "4.2.0"

awsSdkVersion = "1.12.520"

testcontainersVersion = "1.18.3"
}

dependencies {
Expand All @@ -131,6 +137,8 @@ subprojects {
testImplementation "org.mockito:mockito-core:$mockitoVersion"
testImplementation "org.mockito:mockito-junit-jupiter:$mockitoVersion"

testImplementation "org.awaitility:awaitility:$awaitilityVersion"

testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}

Expand Down
3 changes: 3 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>
<suppress checks="CyclomaticComplexity" files="MetricCollector.java"/>
<suppress checks="CyclomaticComplexity" files="SingleBrokerTest.java"/>
<suppress checks="NPathComplexity" files="SingleBrokerTest.java"/>
<suppress checks="JavaNCSSCheck" files="MetricsRegistry.java"/>
<suppress checks="JavaNCSSCheck" files="RemoteStorageManagerMetricsTest.java"/>
<suppress checks="JavaNCSSCheck" files="SingleBrokerTest.java"/>
</suppressions>
3 changes: 0 additions & 3 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ ext {
caffeineVersion = "3.1.7"

zstdVersion = "1.5.5-5"

awaitilityVersion = "4.2.0"
}

dependencies {
Expand All @@ -51,6 +49,5 @@ dependencies {
testImplementation(project(":storage:filesystem"))

testImplementation "com.github.luben:zstd-jni:$zstdVersion"
testImplementation "org.awaitility:awaitility:$awaitilityVersion"
integrationTestImplementation sourceSets.test.output
}
8 changes: 8 additions & 0 deletions e2e/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# End-to-end tests for Kafka tiered storage

## Usage

Docker is needed for running the tests.

1. Build the image with < TBD >.
2. `./gradlew test`
43 changes: 43 additions & 0 deletions e2e/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage-api:$kafkaVersion"

testImplementation "commons-io:commons-io:$apacheCommonsIOVersion"
testImplementation "com.amazonaws:aws-java-sdk-s3:$awsSdkVersion"

testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
testImplementation "org.testcontainers:kafka:$testcontainersVersion"

testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}

tasks.named('test') {
// Use junit platform for unit tests.
useJUnitPlatform()
testLogging {
events 'passed', 'skipped', 'failed'
showStandardStreams = true
showExceptions = true
showStackTraces = true
showCauses = true
exceptionFormat "full"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

public class LocalSystemSingleBrokerTest extends SingleBrokerTest {
static final String TS_DATA_SUBDIR_HOST = "ts-data";
static final String TS_DATA_DIR_CONTAINER = "/home/appuser/kafka-tiered-storage";

static Path tieredDataDir;

@BeforeAll
static void init() {
setupKafka(kafka -> {
tieredDataDir = baseDir.resolve(TS_DATA_SUBDIR_HOST);
tieredDataDir.toFile().mkdirs();
tieredDataDir.toFile().setWritable(true, false);

kafka
.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_ROOT", TS_DATA_DIR_CONTAINER)
.withFileSystemBind(tieredDataDir.toString(), TS_DATA_DIR_CONTAINER);
});
}

@AfterAll
static void cleanup() {
stopKafka();
cleanupStorage();
}

static void cleanupStorage() {
final Path parentDir;
if (localDataDir != null) {
parentDir = localDataDir.getParent();
} else if (tieredDataDir != null) {
parentDir = tieredDataDir.getParent();
} else {
parentDir = null;
}
if (parentDir != null) {
FileUtils.deleteQuietly(parentDir.toFile());
}
}

@Override
boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());
try (final var files = Files.list(tieredDataDir)) {
return files.noneMatch(path -> path.getFileName().startsWith(prefix));
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
final Path dir = tieredDataDir.resolve(
String.format("%s-%s/%s",
topicIdPartition.topic(),
topicIdPartition.topicId().toString(),
topicIdPartition.partition()));
try (final var paths = Files.list(dir)) {
return paths.map(Path::getFileName)
.map(Path::toString)
.sorted()
.collect(Collectors.toList());
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.Testcontainers;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy;
import org.testcontainers.utility.DockerImageName;

public class S3MinioSingleBrokerTest extends SingleBrokerTest {

static final int MINIO_PORT = 9000;
static final GenericContainer<?> MINIO = new GenericContainer<>(DockerImageName.parse("minio/minio"))
.withCommand("server", "/data", "--console-address", ":9090")
.withExposedPorts(MINIO_PORT)
.withNetwork(NETWORK)
.withNetworkAliases("minio");
static final String ACCESS_KEY_ID = "minioadmin";
static final String SECRET_ACCESS_KEY = "minioadmin";
static final String REGION = "us-east-1";

static AmazonS3 s3Client;

@BeforeAll
static void init() {

MINIO.start();

final String minioServerUrl = String.format("http://minio:%s", MINIO_PORT);

createBucket(minioServerUrl);

initializeS3Client();

setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.s3.S3Storage")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", REGION)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", ACCESS_KEY_ID)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", SECRET_ACCESS_KEY)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", minioServerUrl)
.dependsOn(MINIO));
}

private static void initializeS3Client() {
final Integer mappedPort = MINIO.getFirstMappedPort();
Testcontainers.exposeHostPorts(mappedPort);
s3Client = AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
"http://localhost:" + mappedPort,
REGION
)
)
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY)
)
)
.withPathStyleAccessEnabled(true)
.build();

s3Client.listBuckets()
.forEach(bucket -> System.out.println("Buckets: " + bucket.getName()));
}

private static void createBucket(final String minioServerUrl) {
final String cmd =
"/usr/bin/mc config host add local "
+ minioServerUrl + " " + ACCESS_KEY_ID + " " + SECRET_ACCESS_KEY + " --api s3v4 &&"
+ "/usr/bin/mc mb local/test-bucket;\n";

final GenericContainer<?> mcContainer = new GenericContainer<>("minio/mc")
.withNetwork(NETWORK)
.withStartupCheckStrategy(new OneShotStartupCheckStrategy())
.withCreateContainerCmdModifier(containerCommand -> containerCommand
.withTty(true)
.withEntrypoint("/bin/sh", "-c", cmd));
mcContainer.start();
}


@AfterAll
static void cleanup() {
stopKafka();

MINIO.stop();

cleanupStorage();
}

@Override
boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());

final var summaries = s3Client.listObjectsV2(BUCKET, prefix).getObjectSummaries();
return summaries.isEmpty();
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(BUCKET);
final List<S3ObjectSummary> summaries = new ArrayList<>();
ListObjectsV2Result result;
while ((result = s3Client.listObjectsV2(request)).isTruncated()) {
summaries.addAll(result.getObjectSummaries());
request = request.withContinuationToken(result.getNextContinuationToken());
}
summaries.addAll(result.getObjectSummaries());

return summaries.stream()
.map(S3ObjectSummary::getKey)
.map(k -> k.substring(k.lastIndexOf('/') + 1))
.sorted()
.collect(Collectors.toList());
}
}
Loading

0 comments on commit b60e6fe

Please sign in to comment.