Skip to content

Commit

Permalink
feat(e2e): move e2e tests from [1]
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Aug 18, 2023
1 parent c9fb0f3 commit c17be76
Show file tree
Hide file tree
Showing 11 changed files with 1,142 additions and 0 deletions.
8 changes: 8 additions & 0 deletions e2e/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# End-to-end tests for Kafka tiered storage

## Usage

Docker is needed for running the tests.

1. Build the image with < TBD >.
2. `./gradlew test`
52 changes: 52 additions & 0 deletions e2e/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ext {
kafkaVersion = "3.4.0"
junitVersion = "5.9.2"
testcontainersVersion = "1.18.0"
slf4jVersion = "2.0.7"
}

dependencies {
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage-api:$kafkaVersion"

testImplementation "commons-io:commons-io:2.11.0"
testImplementation "com.amazonaws:aws-java-sdk-s3:1.12.418"
testImplementation "org.slf4j:slf4j-api:$slf4jVersion"

testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion"
testImplementation "org.junit.jupiter:junit-jupiter-params:$junitVersion"
testImplementation "org.assertj:assertj-core:3.24.2"
testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
testImplementation "org.testcontainers:localstack:$testcontainersVersion"
testImplementation "org.testcontainers:kafka:$testcontainersVersion"

testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}

test {
testLogging {
exceptionFormat "full"

info {
showStackTraces = true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

import static org.assertj.core.api.Assertions.assertThat;

public class LocalSystemSingleBrokerTest extends SingleBrokerTest {
// File system backend.
static final String TS_DATA_SUBDIR_HOST = "ts-data";
static final String TS_DATA_DIR_CONTAINER = "/home/appuser/kafka-tiered-storage";

// File system backend.
static Path tempDirTsData;

@BeforeAll
static void init() {
preInit();
// File system backend.
tempDirTsData = tempDir.resolve(TS_DATA_SUBDIR_HOST);
tempDirTsData.toFile().mkdirs();
tempDirTsData.toFile().setWritable(true, false);

// File system backend.
kafka
.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS_NAME",
"io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_ROOT", TS_DATA_DIR_CONTAINER)
.withFileSystemBind(tempDirTsData.toString(), TS_DATA_DIR_CONTAINER);

SingleBrokerTest.init();
}

@AfterAll
static void cleanup() {
stopKafka();
cleanupStorage();
}

static void cleanupStorage() {
final Path tempDir;
if (tempDirData != null) {
tempDir = tempDirData.getParent();
} else if (tempDirTsData != null) {
// File system backend.
tempDir = tempDirTsData.getParent();
} else {
tempDir = null;
}
if (tempDir != null) {
FileUtils.deleteQuietly(tempDir.toFile());
}
}

@Override
void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());
// File system backend.
assertThat(tempDirTsData.toFile().listFiles())
.doesNotContain(tempDirTsData.resolve(prefix).toFile());
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
// File system backend.
final Path dir = tempDirTsData.resolve(
String.format("%s-%s/%s", topicIdPartition.topic(), topicIdPartition.topicId().toString(),
topicIdPartition.partition()));
try (final var paths = Files.list(dir)) {
return paths.map(Path::getFileName)
.map(Path::toString)
.sorted()
.collect(Collectors.toList());
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;

import static org.assertj.core.api.Assertions.assertThat;

public class S3LocalstackSingleBrokerTest extends SingleBrokerTest {

static final LocalStackContainer LOCALSTACK = new LocalStackContainer(
DockerImageName.parse("localstack/localstack:2.0.2")
)
.withServices(LocalStackContainer.Service.S3)
.withNetwork(NETWORK);

@BeforeAll
static void init() {
preInit();
// Localstack backend.
LOCALSTACK.start();

// Localstack backend.
kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS_NAME",
"io.aiven.kafka.tieredstorage.storage.s3.S3Storage")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", LOCALSTACK.getRegion())
.withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", LOCALSTACK.getAccessKey())
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", LOCALSTACK.getSecretKey())
.dependsOn(LOCALSTACK);
kafka.withEnv(
"KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL",
"http://" + LOCALSTACK.getContainerInfo().getConfig().getHostName() + ":4566");

// Localstack backend.
s3Client = AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
LOCALSTACK.getEndpointOverride(LocalStackContainer.Service.S3).toString(),
LOCALSTACK.getRegion()
)
)
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(LOCALSTACK.getAccessKey(), LOCALSTACK.getSecretKey())
)
)
.build();
s3Client.createBucket(BUCKET);

SingleBrokerTest.init();
}


@AfterAll
static void cleanup() {
// Localstack backend.
LOCALSTACK.stop();

stopKafka();
cleanupStorage();
}

@Override
void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());

// Localstack backend.
final var summaries = s3Client.listObjectsV2(BUCKET, prefix).getObjectSummaries();
assertThat(summaries).isEmpty();
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
// Localstack backend.
ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(BUCKET);
final List<S3ObjectSummary> summaries = new ArrayList<>();
ListObjectsV2Result result;
while ((result = s3Client.listObjectsV2(request)).isTruncated()) {
summaries.addAll(result.getObjectSummaries());
request = request.withContinuationToken(result.getNextContinuationToken());
}
summaries.addAll(result.getObjectSummaries());

return summaries.stream()
.map(S3ObjectSummary::getKey)
.map(k -> k.substring(k.lastIndexOf('/') + 1))
.sorted()
.collect(Collectors.toList());
}
}
Loading

0 comments on commit c17be76

Please sign in to comment.