diff --git a/.github/workflows/main_push_and_pull_request_workflow.yml b/.github/workflows/main_push_and_pull_request_workflow.yml index f640d47e6..3ec679be0 100644 --- a/.github/workflows/main_push_and_pull_request_workflow.yml +++ b/.github/workflows/main_push_and_pull_request_workflow.yml @@ -35,7 +35,7 @@ jobs: strategy: matrix: java-version: [ 11 ] - test: [ 'LocalSystem', 'S3' ] + test: [ 'LocalSystem', 'S3', 'Gcs' ] name: E2E tests for ${{ matrix.test }} with jdk ${{ matrix.java-version }} runs-on: ubuntu-latest steps: diff --git a/build.gradle b/build.gradle index f337f026d..67a337fb2 100644 --- a/build.gradle +++ b/build.gradle @@ -118,7 +118,11 @@ subprojects { awsSdkVersion = "2.20.140" + gcpSdkVersion = "2.26.1" + testcontainersVersion = "1.19.0" + + testcontainersFakeGcsServerVersion = "0.2.0" } dependencies { diff --git a/e2e/build.gradle b/e2e/build.gradle index f783e9e21..058c72ccf 100644 --- a/e2e/build.gradle +++ b/e2e/build.gradle @@ -24,10 +24,13 @@ dependencies { testImplementation("software.amazon.awssdk:s3:$awsSdkVersion") { exclude group: "org.slf4j" } + implementation "com.google.cloud:google-cloud-storage:$gcpSdkVersion" testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" testImplementation "org.testcontainers:kafka:$testcontainersVersion" + testImplementation "io.aiven:testcontainers-fake-gcs-server:$testcontainersFakeGcsServerVersion" + testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" } diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/GcsSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/GcsSingleBrokerTest.java new file mode 100644 index 000000000..72cb4ccd9 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/GcsSingleBrokerTest.java @@ -0,0 +1,94 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import io.aiven.testcontainers.fakegcsserver.FakeGcsServerContainer; + +import com.google.cloud.NoCredentials; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class GcsSingleBrokerTest extends SingleBrokerTest { + static final String NETWORK_ALIAS = "fake-gcs-server"; + + static final FakeGcsServerContainer GCS_SERVER = new FakeGcsServerContainer() + .withNetwork(NETWORK) + .withNetworkAliases(NETWORK_ALIAS) + .withExternalURL(String.format("http://%s:%s", NETWORK_ALIAS, FakeGcsServerContainer.PORT)); + static final String BUCKET = "test-bucket"; + + static Storage gcsClient; + + @BeforeAll + static void init() throws Exception { + GCS_SERVER.start(); + + gcsClient = StorageOptions.newBuilder() + .setCredentials(NoCredentials.getInstance()) + .setHost(GCS_SERVER.url()) + .setProjectId("test-project") + .build() + .getService(); + + gcsClient.create(BucketInfo.newBuilder(BUCKET).build()); + + setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS", + "io.aiven.kafka.tieredstorage.storage.gcs.GcsStorage") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH", + "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/gcs/*") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_GCS_BUCKET_NAME", BUCKET) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_GCS_ENDPOINT_URL", GCS_SERVER.externalUrl()) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_GCS_CREDENTIALS_DEFAULT", "false") + .dependsOn(GCS_SERVER)); + } + + @AfterAll + static void cleanup() { + stopKafka(); + + GCS_SERVER.stop(); + + cleanupStorage(); + } + + @Override + boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { + final String prefix = String.format("%s-%s", topicName, topicId.toString()); + + final var list = gcsClient.list(BUCKET, Storage.BlobListOption.prefix(prefix)); + return list.streamAll().findAny().isEmpty(); + } + + @Override + List remotePartitionFiles(final TopicIdPartition topicIdPartition) { + return gcsClient.list(BUCKET).streamAll() + .map(BlobInfo::getName) + .map(k -> k.substring(k.lastIndexOf('/') + 1)) + .sorted() + .collect(Collectors.toList()); + } +} diff --git a/storage/gcs/build.gradle b/storage/gcs/build.gradle index f7492bb07..7bbc051ca 100644 --- a/storage/gcs/build.gradle +++ b/storage/gcs/build.gradle @@ -16,12 +16,6 @@ archivesBaseName = "storage-gcs" -ext { - // Keep empty lines between versions to avoid conflicts on mass update (e.g. Dependabot). - - gcpSdkVersion = "2.27.1" -} - dependencies { implementation project(":storage:core") @@ -32,5 +26,5 @@ dependencies { testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" - testImplementation "io.aiven:testcontainers-fake-gcs-server:0.2.0" + testImplementation "io.aiven:testcontainers-fake-gcs-server:$testcontainersFakeGcsServerVersion" }