Skip to content

Commit

Permalink
feat(e2e): move e2e tests from [1]
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Aug 23, 2023
1 parent ef37b03 commit c4f2b46
Show file tree
Hide file tree
Showing 14 changed files with 1,292 additions and 12 deletions.
10 changes: 9 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,19 @@ subprojects {
slf4jVersion = "1.7.36"

// Don't bump this version without need, as this is the min supported version for the plugin.
kafkaVersion = "3.0.0"
kafkaVersion = "3.3.2"

assertJVersion = "3.24.2"

apacheCommonsIOVersion = "2.13.0"

jacksonVersion = "2.15.2"

awaitilityVersion = "4.2.0"

awsSdkVersion = "1.12.520"

testcontainersVersion = "1.18.3"
}

dependencies {
Expand All @@ -131,6 +137,8 @@ subprojects {
testImplementation "org.mockito:mockito-core:$mockitoVersion"
testImplementation "org.mockito:mockito-junit-jupiter:$mockitoVersion"

testImplementation "org.awaitility:awaitility:$awaitilityVersion"

testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}

Expand Down
3 changes: 3 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>
<suppress checks="CyclomaticComplexity" files="MetricCollector.java"/>
<suppress checks="CyclomaticComplexity" files="SingleBrokerTest.java"/>
<suppress checks="NPathComplexity" files="SingleBrokerTest.java"/>
<suppress checks="JavaNCSSCheck" files="MetricsRegistry.java"/>
<suppress checks="JavaNCSSCheck" files="RemoteStorageManagerMetricsTest.java"/>
<suppress checks="JavaNCSSCheck" files="SingleBrokerTest.java"/>
</suppressions>
3 changes: 0 additions & 3 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ ext {
caffeineVersion = "3.1.7"

zstdVersion = "1.5.5-5"

awaitilityVersion = "4.2.0"
}

dependencies {
Expand All @@ -51,6 +49,5 @@ dependencies {
testImplementation(project(":storage:filesystem"))

testImplementation "com.github.luben:zstd-jni:$zstdVersion"
testImplementation "org.awaitility:awaitility:$awaitilityVersion"
integrationTestImplementation sourceSets.test.output
}
8 changes: 8 additions & 0 deletions e2e/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# End-to-end tests for Kafka tiered storage

## Usage

Docker is needed for running the tests.

1. Build the image with < TBD >.
2. `./gradlew test`
30 changes: 30 additions & 0 deletions e2e/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage-api:$kafkaVersion"

testImplementation "commons-io:commons-io:$apacheCommonsIOVersion"
testImplementation "com.amazonaws:aws-java-sdk-s3:$awsSdkVersion"

testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
testImplementation "org.testcontainers:kafka:$testcontainersVersion"

testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import static org.assertj.core.api.Assertions.assertThat;

public class LocalSystemSingleBrokerTest extends SingleBrokerTest {
// File system backend.
static final String TS_DATA_SUBDIR_HOST = "ts-data";
static final String TS_DATA_DIR_CONTAINER = "/home/appuser/kafka-tiered-storage";

// File system backend.
static Path tempDirTsData;

@BeforeAll
static void init() {
preInit();
// File system backend.
tempDirTsData = tempDir.resolve(TS_DATA_SUBDIR_HOST);
tempDirTsData.toFile().mkdirs();
tempDirTsData.toFile().setWritable(true, false);

// File system backend.
kafka
.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS_NAME",
"io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_ROOT", TS_DATA_DIR_CONTAINER)
.withFileSystemBind(tempDirTsData.toString(), TS_DATA_DIR_CONTAINER);

SingleBrokerTest.init();
}

@AfterAll
static void cleanup() {
stopKafka();
cleanupStorage();
}

static void cleanupStorage() {
final Path tempDir;
if (tempDirData != null) {
tempDir = tempDirData.getParent();
} else if (tempDirTsData != null) {
// File system backend.
tempDir = tempDirTsData.getParent();
} else {
tempDir = null;
}
if (tempDir != null) {
FileUtils.deleteQuietly(tempDir.toFile());
}
}

@Override
void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());
// File system backend.
assertThat(tempDirTsData.toFile().listFiles())
.doesNotContain(tempDirTsData.resolve(prefix).toFile());
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
// File system backend.
final Path dir = tempDirTsData.resolve(
String.format("%s-%s/%s", topicIdPartition.topic(), topicIdPartition.topicId().toString(),
topicIdPartition.partition()));
try (final var paths = Files.list(dir)) {
return paths.map(Path::getFileName)
.map(Path::toString)
.sorted()
.collect(Collectors.toList());
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.Testcontainers;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy;
import org.testcontainers.utility.DockerImageName;

import static org.assertj.core.api.Assertions.assertThat;

public class S3MinioSingleBrokerTest extends SingleBrokerTest {

public static final int MINIO_PORT = 9000;
static final GenericContainer<?> MINIO = new GenericContainer<>(DockerImageName.parse("minio/minio"))
.withCommand("server", "/data", "--console-address", ":9090")
.withExposedPorts(MINIO_PORT)
.withNetwork(NETWORK)
.withNetworkAliases("minio");

static AmazonS3 s3Client;

@BeforeAll
static void init() {
preInit();
// Localstack backend.
MINIO.start();

// Localstack backend.
final String minioServerUrl = String.format("http://minio:%s", MINIO_PORT);

// Localstack backend.
kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS_NAME",
"io.aiven.kafka.tieredstorage.storage.s3.S3Storage")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", "us-east-1")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", "minioadmin")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", "minioadmin")
.dependsOn(MINIO);
kafka.withEnv(
"KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL",
minioServerUrl);


final String cmd =
"/usr/bin/mc config host add local "
+ minioServerUrl + " minioadmin minioadmin --api s3v4 &&"
+ "/usr/bin/mc mb local/test-bucket;\n";

final GenericContainer<?> mcContainer = new GenericContainer<>("minio/mc")
.withNetwork(NETWORK)
.withStartupCheckStrategy(new OneShotStartupCheckStrategy())
.withCreateContainerCmdModifier(containerCommand -> containerCommand
.withTty(true)
.withEntrypoint("/bin/sh", "-c", cmd));
mcContainer.start();


final Integer mappedPort = MINIO.getFirstMappedPort();
Testcontainers.exposeHostPorts(mappedPort);
s3Client = AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
"http://localhost:" + mappedPort,
"us-east-1"
)
)
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials("minioadmin", "minioadmin")
)
)
.withPathStyleAccessEnabled(true)
.build();

s3Client.listBuckets()
.forEach(bucket -> System.out.println("Buckets: " + bucket.getName()));

SingleBrokerTest.init();
}


@AfterAll
static void cleanup() {
stopKafka();

// Localstack backend.
MINIO.stop();

cleanupStorage();
}

@Override
void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());

// Localstack backend.
final var summaries = s3Client.listObjectsV2(BUCKET, prefix).getObjectSummaries();
assertThat(summaries).isEmpty();
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
// Localstack backend.
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(BUCKET);
final List<S3ObjectSummary> summaries = new ArrayList<>();
ListObjectsV2Result result;
while ((result = s3Client.listObjectsV2(request)).isTruncated()) {
summaries.addAll(result.getObjectSummaries());
request = request.withContinuationToken(result.getNextContinuationToken());
}
summaries.addAll(result.getObjectSummaries());

return summaries.stream()
.map(S3ObjectSummary::getKey)
.map(k -> k.substring(k.lastIndexOf('/') + 1))
.sorted()
.collect(Collectors.toList());
}
}
Loading

0 comments on commit c4f2b46

Please sign in to comment.