Skip to content

Commit

Permalink
Add GCS e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanyu committed Oct 3, 2023
1 parent d8f2823 commit d755e90
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main_push_and_pull_request_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
strategy:
matrix:
java-version: [ 11 ]
test: [ 'LocalSystem', 'S3' ]
test: [ 'LocalSystem', 'S3', 'GCS' ]
name: E2E tests for ${{ matrix.test }} with jdk ${{ matrix.java-version }}
runs-on: ubuntu-latest
steps:
Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ subprojects {

awsSdkVersion = "2.20.140"

gcpSdkVersion = "2.26.1"

testcontainersVersion = "1.19.0"

testcontainersFakeGcsServerVersion = "0.2.0"
}

dependencies {
Expand Down
3 changes: 3 additions & 0 deletions e2e/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ dependencies {
testImplementation("software.amazon.awssdk:s3:$awsSdkVersion") {
exclude group: "org.slf4j"
}
implementation "com.google.cloud:google-cloud-storage:$gcpSdkVersion"

testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
testImplementation "org.testcontainers:kafka:$testcontainersVersion"

testImplementation "io.aiven:testcontainers-fake-gcs-server:$testcontainersFakeGcsServerVersion"

testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import io.aiven.testcontainers.fakegcsserver.FakeGcsServerContainer;

import com.google.cloud.NoCredentials;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;

public class GcsSingleBrokerTest extends SingleBrokerTest {
static final String NETWORK_ALIAS = "fake-gcs-server";

static final FakeGcsServerContainer GCS_SERVER = new FakeGcsServerContainer()
.withNetwork(NETWORK)
.withNetworkAliases(NETWORK_ALIAS)
.withExternalURL(String.format("http://%s:%s", NETWORK_ALIAS, FakeGcsServerContainer.PORT));
static final String BUCKET = "test-bucket";

static Storage gcsClient;

@BeforeAll
static void init() throws Exception {
GCS_SERVER.start();

gcsClient = StorageOptions.newBuilder()
.setCredentials(NoCredentials.getInstance())
.setHost(GCS_SERVER.url())
.setProjectId("test-project")
.build()
.getService();

gcsClient.create(BucketInfo.newBuilder(BUCKET).build());

setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.gcs.GcsStorage")
.withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
"/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/gcs/*")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_GCS_BUCKET_NAME", BUCKET)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_GCS_ENDPOINT_URL", GCS_SERVER.externalUrl())
.withEnv("KAFKA_RSM_CONFIG_STORAGE_GCS_CREDENTIALS_DEFAULT", "false")
.dependsOn(GCS_SERVER));
}

@AfterAll
static void cleanup() {
stopKafka();

GCS_SERVER.stop();

cleanupStorage();
}

@Override
boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());

final var list = gcsClient.list(BUCKET, Storage.BlobListOption.prefix(prefix));
return list.streamAll().findAny().isEmpty();
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
return gcsClient.list(BUCKET).streamAll()
.map(BlobInfo::getName)
.map(k -> k.substring(k.lastIndexOf('/') + 1))
.sorted()
.collect(Collectors.toList());
}
}
8 changes: 1 addition & 7 deletions storage/gcs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@

archivesBaseName = "storage-gcs"

ext {
// Keep empty lines between versions to avoid conflicts on mass update (e.g. Dependabot).

gcpSdkVersion = "2.27.1"
}

dependencies {
implementation project(":storage:core")

Expand All @@ -32,5 +26,5 @@ dependencies {

testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"

testImplementation "io.aiven:testcontainers-fake-gcs-server:0.2.0"
testImplementation "io.aiven:testcontainers-fake-gcs-server:$testcontainersFakeGcsServerVersion"
}

0 comments on commit d755e90

Please sign in to comment.