From c4f2b46a73be24bff684b8107f4e2821d0276ace Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Wed, 23 Aug 2023 00:41:52 +0300 Subject: [PATCH] feat(e2e): move e2e tests from [1] [1]: https://github.com/aiven/kafka-tiered-storage-test --- build.gradle | 10 +- checkstyle/suppressions.xml | 3 + core/build.gradle | 3 - e2e/README.md | 8 + e2e/build.gradle | 30 + .../e2e/LocalSystemSingleBrokerTest.java | 104 +++ .../e2e/S3MinioSingleBrokerTest.java | 154 +++++ .../tieredstorage/e2e/SingleBrokerTest.java | 619 ++++++++++++++++++ .../RemoteLogMetadataDeserializer.java | 30 + .../internal/RemoteLogMetadataTracker.java | 231 +++++++ .../e2e/internal/RemoteSegment.java | 75 +++ e2e/src/test/resources/log4j.properties | 28 + settings.gradle | 1 + storage/s3/build.gradle | 8 - 14 files changed, 1292 insertions(+), 12 deletions(-) create mode 100644 e2e/README.md create mode 100644 e2e/build.gradle create mode 100644 e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java create mode 100644 e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java create mode 100644 e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java create mode 100644 e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java create mode 100644 e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java create mode 100644 e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java create mode 100644 e2e/src/test/resources/log4j.properties diff --git a/build.gradle b/build.gradle index e27f09801..ef4ff6922 100644 --- a/build.gradle +++ b/build.gradle @@ -103,13 +103,19 @@ subprojects { slf4jVersion = "1.7.36" // Don't bump this version without need, as this is the min supported version for the plugin. - kafkaVersion = "3.0.0" + kafkaVersion = "3.3.2" assertJVersion = "3.24.2" apacheCommonsIOVersion = "2.13.0" jacksonVersion = "2.15.2" + + awaitilityVersion = "4.2.0" + + awsSdkVersion = "1.12.520" + + testcontainersVersion = "1.18.3" } dependencies { @@ -131,6 +137,8 @@ subprojects { testImplementation "org.mockito:mockito-core:$mockitoVersion" testImplementation "org.mockito:mockito-junit-jupiter:$mockitoVersion" + testImplementation "org.awaitility:awaitility:$awaitilityVersion" + testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" } diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e633a30e1..f78191257 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -28,6 +28,9 @@ + + + diff --git a/core/build.gradle b/core/build.gradle index 7757c71e1..d711652eb 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -27,8 +27,6 @@ ext { caffeineVersion = "3.1.7" zstdVersion = "1.5.5-5" - - awaitilityVersion = "4.2.0" } dependencies { @@ -51,6 +49,5 @@ dependencies { testImplementation(project(":storage:filesystem")) testImplementation "com.github.luben:zstd-jni:$zstdVersion" - testImplementation "org.awaitility:awaitility:$awaitilityVersion" integrationTestImplementation sourceSets.test.output } diff --git a/e2e/README.md b/e2e/README.md new file mode 100644 index 000000000..18088da90 --- /dev/null +++ b/e2e/README.md @@ -0,0 +1,8 @@ +# End-to-end tests for Kafka tiered storage + +## Usage + +Docker is needed for running the tests. + +1. Build the image with < TBD >. +2. `./gradlew test` diff --git a/e2e/build.gradle b/e2e/build.gradle new file mode 100644 index 000000000..e79e91764 --- /dev/null +++ b/e2e/build.gradle @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion" + testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion" + testImplementation "org.apache.kafka:kafka-storage-api:$kafkaVersion" + + testImplementation "commons-io:commons-io:$apacheCommonsIOVersion" + testImplementation "com.amazonaws:aws-java-sdk-s3:$awsSdkVersion" + + testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" + testImplementation "org.testcontainers:kafka:$testcontainersVersion" + + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java new file mode 100644 index 000000000..b2d1e92fb --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import static org.assertj.core.api.Assertions.assertThat; + +public class LocalSystemSingleBrokerTest extends SingleBrokerTest { + // File system backend. + static final String TS_DATA_SUBDIR_HOST = "ts-data"; + static final String TS_DATA_DIR_CONTAINER = "/home/appuser/kafka-tiered-storage"; + + // File system backend. + static Path tempDirTsData; + + @BeforeAll + static void init() { + preInit(); + // File system backend. + tempDirTsData = tempDir.resolve(TS_DATA_SUBDIR_HOST); + tempDirTsData.toFile().mkdirs(); + tempDirTsData.toFile().setWritable(true, false); + + // File system backend. + kafka + .withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS_NAME", + "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_ROOT", TS_DATA_DIR_CONTAINER) + .withFileSystemBind(tempDirTsData.toString(), TS_DATA_DIR_CONTAINER); + + SingleBrokerTest.init(); + } + + @AfterAll + static void cleanup() { + stopKafka(); + cleanupStorage(); + } + + static void cleanupStorage() { + final Path tempDir; + if (tempDirData != null) { + tempDir = tempDirData.getParent(); + } else if (tempDirTsData != null) { + // File system backend. + tempDir = tempDirTsData.getParent(); + } else { + tempDir = null; + } + if (tempDir != null) { + FileUtils.deleteQuietly(tempDir.toFile()); + } + } + + @Override + void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { + final String prefix = String.format("%s-%s", topicName, topicId.toString()); + // File system backend. + assertThat(tempDirTsData.toFile().listFiles()) + .doesNotContain(tempDirTsData.resolve(prefix).toFile()); + } + + @Override + List remotePartitionFiles(final TopicIdPartition topicIdPartition) { + // File system backend. + final Path dir = tempDirTsData.resolve( + String.format("%s-%s/%s", topicIdPartition.topic(), topicIdPartition.topicId().toString(), + topicIdPartition.partition())); + try (final var paths = Files.list(dir)) { + return paths.map(Path::getFileName) + .map(Path::toString) + .sorted() + .collect(Collectors.toList()); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java new file mode 100644 index 000000000..d7c6a8d0a --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3MinioSingleBrokerTest.java @@ -0,0 +1,154 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.Testcontainers; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy; +import org.testcontainers.utility.DockerImageName; + +import static org.assertj.core.api.Assertions.assertThat; + +public class S3MinioSingleBrokerTest extends SingleBrokerTest { + + public static final int MINIO_PORT = 9000; + static final GenericContainer MINIO = new GenericContainer<>(DockerImageName.parse("minio/minio")) + .withCommand("server", "/data", "--console-address", ":9090") + .withExposedPorts(MINIO_PORT) + .withNetwork(NETWORK) + .withNetworkAliases("minio"); + + static AmazonS3 s3Client; + + @BeforeAll + static void init() { + preInit(); + // Localstack backend. + MINIO.start(); + + // Localstack backend. + final String minioServerUrl = String.format("http://minio:%s", MINIO_PORT); + + // Localstack backend. + kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS_NAME", + "io.aiven.kafka.tieredstorage.storage.s3.S3Storage") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", "us-east-1") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", "minioadmin") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", "minioadmin") + .dependsOn(MINIO); + kafka.withEnv( + "KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", + minioServerUrl); + + + final String cmd = + "/usr/bin/mc config host add local " + + minioServerUrl + " minioadmin minioadmin --api s3v4 &&" + + "/usr/bin/mc mb local/test-bucket;\n"; + + final GenericContainer mcContainer = new GenericContainer<>("minio/mc") + .withNetwork(NETWORK) + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCreateContainerCmdModifier(containerCommand -> containerCommand + .withTty(true) + .withEntrypoint("/bin/sh", "-c", cmd)); + mcContainer.start(); + + + final Integer mappedPort = MINIO.getFirstMappedPort(); + Testcontainers.exposeHostPorts(mappedPort); + s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + "http://localhost:" + mappedPort, + "us-east-1" + ) + ) + .withCredentials( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials("minioadmin", "minioadmin") + ) + ) + .withPathStyleAccessEnabled(true) + .build(); + + s3Client.listBuckets() + .forEach(bucket -> System.out.println("Buckets: " + bucket.getName())); + + SingleBrokerTest.init(); + } + + + @AfterAll + static void cleanup() { + stopKafka(); + + // Localstack backend. + MINIO.stop(); + + cleanupStorage(); + } + + @Override + void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { + final String prefix = String.format("%s-%s", topicName, topicId.toString()); + + // Localstack backend. + final var summaries = s3Client.listObjectsV2(BUCKET, prefix).getObjectSummaries(); + assertThat(summaries).isEmpty(); + } + + @Override + List remotePartitionFiles(final TopicIdPartition topicIdPartition) { + // Localstack backend. + ListObjectsV2Request request = new ListObjectsV2Request() + .withBucketName(BUCKET); + final List summaries = new ArrayList<>(); + ListObjectsV2Result result; + while ((result = s3Client.listObjectsV2(request)).isTruncated()) { + summaries.addAll(result.getObjectSummaries()); + request = request.withContinuationToken(result.getNextContinuationToken()); + } + summaries.addAll(result.getObjectSummaries()); + + return summaries.stream() + .map(S3ObjectSummary::getKey) + .map(k -> k.substring(k.lastIndexOf('/') + 1)) + .sorted() + .collect(Collectors.toList()); + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java new file mode 100644 index 000000000..662dfc643 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java @@ -0,0 +1,619 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import io.aiven.kafka.tieredstorage.e2e.internal.RemoteLogMetadataTracker; +import io.aiven.kafka.tieredstorage.e2e.internal.RemoteSegment; + +import com.github.dockerjava.api.model.Ulimit; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.TestWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import static java.util.stream.Collectors.groupingBy; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +@Testcontainers +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@ExtendWith(SingleBrokerTest.FailFastTestWatcher.class) +abstract class SingleBrokerTest { + + static final Logger LOG = LoggerFactory.getLogger(SingleBrokerTest.class); + + static final String KAFKA_DATA_SUBDIR_HOST = "data"; + static final String KAFKA_DATA_DIR_CONTAINER = "/var/lib/kafka/data"; + + static final String BUCKET = "test-bucket"; + + static final String TOPIC_0 = "topic0"; + static final String TOPIC_1 = "topic1"; + static final TopicPartition TP_0_0 = new TopicPartition(TOPIC_0, 0); + static final TopicPartition TP_0_1 = new TopicPartition(TOPIC_0, 1); + static final TopicPartition TP_1_0 = new TopicPartition(TOPIC_1, 0); + static final List TOPIC_PARTITIONS = List.of(TP_0_0, TP_0_1, TP_1_0); + + static final int CHUNK_SIZE = 1024; // TODO something more reasonable? + static final int SEGMENT_SIZE_T0 = 256 * CHUNK_SIZE + CHUNK_SIZE / 2; + static final int SEGMENT_SIZE_T1 = 123 * CHUNK_SIZE + 123; + + static final int VALUE_SIZE_MIN = CHUNK_SIZE / 4 - 3; + static final int VALUE_SIZE_MAX = CHUNK_SIZE * 2 + 5; + + static final int RECORDS_TO_PRODUCE = 10_000; + + static final Duration TOTAL_RETENTION = Duration.ofHours(1); + static final Duration LOCAL_RETENTION = Duration.ofSeconds(5); + static final Network NETWORK = Network.newNetwork(); + + static Path tempDir; + // Can't use @TempDir, because it's initialized too late. + static Path tempDirData; + + static KafkaContainer kafka; + + static AdminClient adminClient; + + static RemoteLogMetadataTracker remoteLogMetadataTracker; + + static TopicIdPartition t0p0; + static TopicIdPartition t0p1; + static TopicIdPartition t1p0; + + static boolean failed; + + static void init() { + failed = false; + + kafka.start(); + + adminClient = AdminClient.create(Map.of( + "bootstrap.servers", kafka.getBootstrapServers() + )); + + remoteLogMetadataTracker = new RemoteLogMetadataTracker(kafka.getBootstrapServers()); + } + + static void preInit() { + try { + tempDir = Files.createTempDirectory("junit"); + tempDir.toFile().deleteOnExit(); // works only if empty + tempDirData = tempDir.resolve(KAFKA_DATA_SUBDIR_HOST); + tempDirData.toFile().mkdirs(); + tempDirData.toFile().setWritable(true, false); + } catch (final IOException e) { + throw new RuntimeException(e); + } + + kafka = new KafkaContainer(DockerImageName.parse("aivenoy/kafka:3.3-2022-10-06-tiered-storage-1-ts-1") + .asCompatibleSubstituteFor("confluentinc/cp-kafka") + ) + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false") + .withEnv("KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS", "10000") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_SYSTEM_ENABLE", "true") + .withEnv("KAFKA_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS", "5000") + .withEnv("KAFKA_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME", + "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager") + .withEnv("KAFKA_RLMM_CONFIG_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR", "1") + .withEnv("KAFKA_REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME", "BROKER") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH", "/tiered-storage-for-apache-kafka/*") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_NAME", "io.aiven.kafka.tieredstorage.RemoteStorageManager") + .withEnv("KAFKA_RSM_CONFIG_CHUNK_SIZE", Integer.toString(CHUNK_SIZE)) + .withEnv("KAFKA_OPTS", "") // disable JMX exporter + .withEnv("KAFKA_LOG4J_LOGGERS", "io.aiven.kafka.tieredstorage=DEBUG," + + "org.apache.kafka.server.log.remote.metadata.storage=DEBUG," + + "state.change.logger=INFO") + .withCreateContainerCmdModifier( + cmd -> cmd.getHostConfig().withUlimits(List.of(new Ulimit("nofile", 30_000L, 30_000L)))) + .withFileSystemBind(tempDirData.toString(), KAFKA_DATA_DIR_CONTAINER) + .withNetwork(NETWORK); + } + + static void stopKafka() { + if (adminClient != null) { + adminClient.close(); + } + + if (failed) { + LOG.info("Kafka container logs:"); + LOG.info(kafka.getLogs()); + } + + kafka.stop(); + } + + static void cleanupStorage() { + final Path tempDir; + if (tempDirData != null) { + tempDir = tempDirData.getParent(); + } else { + tempDir = null; + } + if (tempDir != null) { + FileUtils.deleteQuietly(tempDir.toFile()); + } + } + + @Test + @Order(0) + void createTopics() throws Exception { + final NewTopic newTopic0 = new NewTopic(TOPIC_0, 2, (short) 1) + .configs(Map.of( + "remote.storage.enable", "true", + "segment.bytes", Integer.toString(SEGMENT_SIZE_T0), + "retention.ms", Long.toString(TOTAL_RETENTION.toMillis()), + "local.retention.ms", Long.toString(LOCAL_RETENTION.toMillis()) + )); + final NewTopic newTopic1 = new NewTopic(TOPIC_1, 1, (short) 1) + .configs(Map.of( + "remote.storage.enable", "true", + "segment.bytes", Integer.toString(SEGMENT_SIZE_T1), + "retention.ms", Long.toString(TOTAL_RETENTION.toMillis()), + "local.retention.ms", Long.toString(LOCAL_RETENTION.toMillis()) + )); + final var topicsToCreate = List.of(newTopic0, newTopic1); + adminClient.createTopics(topicsToCreate) + .all().get(30, TimeUnit.SECONDS); + + adminClient.describeTopics(List.of(TOPIC_0, TOPIC_1)) + .allTopicNames().get(30, TimeUnit.SECONDS) + .values().forEach(td -> { + if (td.name().equals(TOPIC_0)) { + t0p0 = new TopicIdPartition(td.topicId(), TP_0_0); + t0p1 = new TopicIdPartition(td.topicId(), TP_0_1); + } else if (td.name().equals(TOPIC_1)) { + t1p0 = new TopicIdPartition(td.topicId(), TP_1_0); + } else { + fail("Unknown topic %s" + td); + } + }); + + LOG.info("Topics {} created successfully", topicsToCreate); + } + + @Test + @Order(1) + void remoteCopy() throws Exception { + maybeFailFast(); + + fillTopics(); + + remoteLogMetadataTracker.initialize(List.of(t0p0, t0p1, t1p0)); + + // Check remote segments are present. + final var allRemoteSegments = remoteLogMetadataTracker.remoteSegments(); + + for (final Map.Entry> entry : allRemoteSegments.entrySet()) { + final Map> segmentFiles = remotePartitionFiles(entry.getKey()).stream() + .collect(groupingBy(SingleBrokerTest::extractSegmentId)); + + for (final RemoteSegment remoteLogSegment : entry.getValue()) { + final String key = remoteLogSegment.remoteLogSegmentId().id().toString(); + assertThat(segmentFiles).containsKey(key); + assertThat(segmentFiles.get(key).stream() + .map(SingleBrokerTest::extractSuffix)) + .containsExactlyInAnyOrder( + "index", "leader-epoch-checkpoint", "log", "rsm-manifest", "snapshot", "timeindex"); + } + } + + // Check local segments are deleted. + for (final TopicIdPartition tp : List.of(t0p0, t0p1, t1p0)) { + await().atMost(Duration.ofSeconds(60)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> localLogFiles(tp.topicPartition()).size(), size -> size == 1); + + final var localLogs = localLogFiles(tp.topicPartition()); + LOG.info("Local logs for {} [{}]: \n{}", tp, localLogs.size(), localLogs); + final var remoteSegments = allRemoteSegments.get(tp); + LOG.info("Remote logs for {} [{}]: \n{}", tp, remoteSegments.size(), remoteSegments); + } + } + + void fillTopics() throws Exception { + try (final var producer = new KafkaProducer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "linger.ms", Long.toString(Duration.ofSeconds(1).toMillis()), + "batch.size", Integer.toString(1_000_000) + ), new ByteArraySerializer(), new ByteArraySerializer())) { + + for (final TopicPartition topicPartition : TOPIC_PARTITIONS) { + long offset = 0; + while (offset < RECORDS_TO_PRODUCE) { + final int batchSize = batchSize(offset); + final ArrayList> sendFutures = new ArrayList<>(batchSize); + for (int i = 0; i < batchSize; i++) { + final var record = createProducerRecord( + topicPartition.topic(), + topicPartition.partition(), + offset++ + ); + final Future sendFuture = producer.send(record); + sendFutures.add(sendFuture); + } + producer.flush(); + for (final Future f : sendFutures) { + f.get(30, TimeUnit.SECONDS); + } + } + + LOG.info("{} records produced to {}", RECORDS_TO_PRODUCE, topicPartition); + } + } + } + + private static List localLogFiles(final TopicPartition tp) { + return localPartitionFiles(tp).stream() + .filter(f -> f.getName().endsWith(".log")) + .collect(Collectors.toList()); + } + + @Test + @Order(2) + void remoteRead() { + maybeFailFast(); + + try (final var consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "fetch.max.bytes", "1" + ), new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + + // Check the beginning and end offsets. + final Map startOffsets = consumer.beginningOffsets(TOPIC_PARTITIONS); + assertThat(startOffsets.get(TP_0_0)).isEqualTo(0); + assertThat(startOffsets.get(TP_0_1)).isEqualTo(0); + assertThat(startOffsets.get(TP_1_0)).isEqualTo(0); + final Map endOffsets = consumer.endOffsets(TOPIC_PARTITIONS); + assertThat(endOffsets.get(TP_0_0)).isEqualTo(RECORDS_TO_PRODUCE + 1); + assertThat(endOffsets.get(TP_0_1)).isEqualTo(RECORDS_TO_PRODUCE + 1); + assertThat(endOffsets.get(TP_1_0)).isEqualTo(RECORDS_TO_PRODUCE + 1); + + LOG.info("start and end offsets are as expected"); + + // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client + + // Read by record. + LOG.info("Starting validation per record"); + + for (final TopicPartition tp : TOPIC_PARTITIONS) { + consumer.assign(List.of(tp)); + LOG.info("Checking records from partition {}", tp); + for (long offset = 0; offset < RECORDS_TO_PRODUCE; offset++) { + consumer.seek(tp, offset); + final var record = consumer.poll(Duration.ofSeconds(5)).records(tp).get(0); + final var expectedRecord = createProducerRecord(tp.topic(), tp.partition(), offset); + checkRecord(record, offset, expectedRecord); + if (offset % 500 == 0) { + LOG.info("{} of {} records checked", offset, RECORDS_TO_PRODUCE); + } + } + LOG.info("Records from partition {} checked", tp); + } + + LOG.info("Validation per record completed"); + + // Read by batches. + LOG.info("Starting validation per batch"); + + for (final TopicPartition tp : TOPIC_PARTITIONS) { + consumer.assign(List.of(tp)); + long offset = 0; + while (offset < RECORDS_TO_PRODUCE) { + consumer.seek(tp, offset); + final var records = consumer.poll(Duration.ofSeconds(1)).records(tp); + assertThat(records.size()).isEqualTo(batchSize(offset)); + for (final var record : records) { + final var expectedRecord = createProducerRecord(tp.topic(), tp.partition(), offset); + checkRecord(record, offset, expectedRecord); + offset += 1; + } + } + } + + LOG.info("Validation per batch completed"); + } + + // Read over batch borders. + LOG.info("Starting validation over batch borders"); + + final ArrayList batchBorders = new ArrayList<>(); + // Skip the first and last batches because we can't read "over" their left and right border. + for (long offset = 1; offset < RECORDS_TO_PRODUCE - 1; ) { + batchBorders.add(offset); + final int batchSize = batchSize(offset); + offset += batchSize; + } + try (final var consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "fetch.max.bytes", Integer.toString(VALUE_SIZE_MAX * 50) + ), new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + for ( + final TopicPartition tp : TOPIC_PARTITIONS) { + consumer.assign(List.of(tp)); + for (final long batchBorder : batchBorders) { + consumer.seek(tp, batchBorder - 1); + List> records = consumer.poll(Duration.ofSeconds(1)).records(tp); + checkRecord( + records.get(0), + batchBorder - 1, + createProducerRecord(tp.topic(), tp.partition(), batchBorder - 1)); + if (records.size() > 1) { + checkRecord( + records.get(1), + batchBorder, + createProducerRecord(tp.topic(), tp.partition(), batchBorder)); + } else { + // It's possible when the batch is the last in the segment: + // the broker won't return records over a segment border. + records = consumer.poll(Duration.ofSeconds(1)).records(tp); + checkRecord( + records.get(0), + batchBorder, + createProducerRecord(tp.topic(), tp.partition(), batchBorder)); + } + } + } + } + + LOG.info("Validation over batch borders completed"); + } + + @Test + @Disabled("https://github.com/aiven/kafka/issues/20") + @Order(3) + void remoteManualDelete() throws Exception { + maybeFailFast(); + + final int newStartOffset = RECORDS_TO_PRODUCE / 2; + + final List remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments() + .get(t0p0); + final List segmentsToBeDeleted = remoteSegmentsBefore.stream() + .filter(rs -> rs.endOffset() < newStartOffset) + .collect(Collectors.toList()); + + adminClient.deleteRecords(Map.of(TP_0_0, RecordsToDelete.beforeOffset(newStartOffset))) + .all().get(5, TimeUnit.SECONDS); + + remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted); + + try (final var consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "auto.offset.reset", "earliest" + ), new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + + // Check the beginning and end offsets. + final Map startOffsets = consumer.beginningOffsets(List.of(TP_0_0)); + assertThat(startOffsets.get(TP_0_0)).isEqualTo(newStartOffset); + final Map endOffsets = consumer.endOffsets(List.of(TP_0_0)); + assertThat(endOffsets.get(TP_0_0)).isEqualTo(RECORDS_TO_PRODUCE + 1); + // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client + + // TODO check segments deleted on the remote + + // Check what we can now consume. + consumer.assign(List.of(TP_0_0)); + consumer.seek(TP_0_0, 0); + final ConsumerRecord record = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0).get(0); + assertThat(record.offset()).isEqualTo(newStartOffset); + } + } + + @Test + @Order(4) + void remoteCleanupDueToRetention() throws Exception { + maybeFailFast(); + + // Collect all remote segments, as after changing retention, all should be deleted. + final var remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments(); + final var segmentsToBeDeleted = Stream.concat( + remoteSegmentsBefore.get(t0p0).stream(), + remoteSegmentsBefore.get(t0p1).stream() + ).collect(Collectors.toList()); + + LOG.info("Forcing cleanup by setting bytes retention to 1"); + + final var alterConfigs = List.of(new AlterConfigOp( + new ConfigEntry("retention.bytes", "1"), AlterConfigOp.OpType.SET)); + adminClient.incrementalAlterConfigs(Map.of( + new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_0), alterConfigs + )).all().get(5, TimeUnit.SECONDS); + + LOG.info("Starting cleanup validation"); + + try (final var consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "auto.offset.reset", "earliest" + ), new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + + // Get earliest offset available locally + final long newStartOffset = localLogFiles(TP_0_0).stream() + .mapToLong(f -> Long.parseLong(f.getName().replace(".log", ""))) + .max() + .getAsLong(); + // and wait til expected earliest offset is in place + await() + .pollInterval(Duration.ofSeconds(5)) + .atMost(Duration.ofSeconds(30)) + .until(() -> { + final var beginningOffset = consumer.beginningOffsets(List.of(TP_0_0)).get(TP_0_0); + LOG.info("Beginning offset found {}, expecting {}", beginningOffset, newStartOffset); + return beginningOffset.equals(newStartOffset); + }); + + final Map endOffsets = consumer.endOffsets(List.of(TP_0_0)); + assertThat(endOffsets.get(TP_0_0)).isEqualTo(RECORDS_TO_PRODUCE + 1); + + // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client + + consumer.assign(List.of(TP_0_0)); + consumer.seek(TP_0_0, 0); + + final ConsumerRecord record = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0).get(0); + assertThat(record.offset()).isEqualTo(newStartOffset); + + remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted); + assertNoTopicDataOnTierStorage(t0p0.topic(), t0p0.topicId()); + } + + LOG.info("Cleanup validation completed"); + } + + @Test + @Disabled("https://github.com/aiven/kafka/issues/35") + @Order(5) + void topicDelete() throws Exception { + maybeFailFast(); + + LOG.info("Starting topic delete test"); + + final var remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments(); + final var segmentsToBeDeleted = remoteSegmentsBefore.get(t1p0); + + adminClient.deleteTopics(List.of(TP_1_0.topic())) + .all().get(30, TimeUnit.SECONDS); + + remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted); + assertNoTopicDataOnTierStorage(t1p0.topic(), t1p0.topicId()); + + LOG.info("Topic delete test completed"); + } + + private static void maybeFailFast() { + if (failed) { // fail-fast + throw new IllegalStateException("Already failed"); + } + } + + private static String extractSegmentId(final String fileName) { + return fileName.substring(21, fileName.lastIndexOf('.')); + } + + private static String extractSuffix(final String fileName) { + return fileName.substring(fileName.lastIndexOf('.') + 1); + } + + /** + * Variable batch size based on the offset received + */ + private static int batchSize(final long offset) { + return (int) offset % 10 + 1; + } + + private static ProducerRecord createProducerRecord(final String topic, + final int partition, + final long offset) { + final int d = (int) ((offset + partition) % 10); + + final int keySize = d * 2 + 1; + final var key = ByteBuffer.allocate(keySize); + final var keyPattern = (topic + "-" + partition + "-" + offset).getBytes(); + while (key.remaining() >= keyPattern.length) { + key.put(keyPattern); + } + key.put(keyPattern, 0, key.remaining()); + assertThat(key.remaining()).isEqualTo(0); + + final int valueSize = VALUE_SIZE_MIN + (VALUE_SIZE_MAX - VALUE_SIZE_MIN) / 10 * d; + final var value = ByteBuffer.allocate(valueSize); + + return new ProducerRecord<>(topic, partition, key.array(), value.array()); + } + + private void checkRecord(final ConsumerRecord actual, + final long offset, + final ProducerRecord expected) { + assertThat(actual.offset()).isEqualTo(offset); + assertThat(actual.key()).isEqualTo(expected.key()); + assertThat(actual.value()).isEqualTo(expected.value()); + } + + private static List localPartitionFiles(final TopicPartition tp) { + final Path dir = tempDirData.resolve(String.format("%s-%d", tp.topic(), tp.partition())); + try (final var paths = Files.list(dir)) { + return paths + .map(Path::toFile) + .sorted(Comparator.comparing(File::getName)) + .collect(Collectors.toList()); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + abstract List remotePartitionFiles(final TopicIdPartition topicIdPartition); + + abstract void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId); + + /** + * Flag when a step has failed so next steps fail-fast. + * + *

Needed to allow running {@code SingleBrokerTest#stopKafka} after all tests. + */ + public static class FailFastTestWatcher implements TestWatcher { + + @Override + public void testFailed(final ExtensionContext context, final Throwable cause) { + System.out.println("Test failed: " + context.getDisplayName()); + failed = true; + } + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java new file mode 100644 index 000000000..232099762 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; + +public class RemoteLogMetadataDeserializer implements Deserializer { + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + @Override + public RemoteLogMetadata deserialize(final String topic, final byte[] data) { + return serde.deserialize(data); + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java new file mode 100644 index 000000000..097275b10 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java @@ -0,0 +1,231 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import java.time.Duration; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +public class RemoteLogMetadataTracker { + private static final Logger LOG = LoggerFactory.getLogger(RemoteLogMetadataTracker.class); + private static final String REMOTE_LOG_METADATA_TOPIC = "__remote_log_metadata"; + + private final KafkaConsumer consumer; + private List partitions; + + private final Map> remoteSegments = new HashMap<>(); + private final Map remoteSegmentStates = new HashMap<>(); + + public RemoteLogMetadataTracker(final String bootstrapServers) { + consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", bootstrapServers, + "auto.offset.reset", "earliest" + ), new ByteArrayDeserializer(), new RemoteLogMetadataDeserializer()); + } + + public Map> remoteSegments() { + final Map> result = new HashMap<>(); + for (final Map.Entry> entry : remoteSegments.entrySet()) { + final List list = entry.getValue().stream() + .sorted(Comparator.comparing(RemoteSegment::startOffset)) + .collect(Collectors.toList()); + result.put(entry.getKey(), list); + } + return result; + } + + /** + * Initializes the tracker. + * + *

It expects at least one record to be present in __remote_log_metadata + * and that all remote segments are in {@code COPY_SEGMENT_FINISHED} state. + */ + public void initialize(final List expectedPartitions) { + partitions = consumer.partitionsFor(REMOTE_LOG_METADATA_TOPIC).stream() + .map(pi -> new TopicPartition(pi.topic(), pi.partition())) + .collect(Collectors.toList()); + + await().atMost(Duration.ofSeconds(60)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> + consumer.endOffsets(partitions).values().stream() + .mapToLong(Long::longValue) + .sum() > 0); + + // supply segment states where copy has not finished + final Supplier> segmentsStillCopying = () -> + remoteSegmentStates.entrySet().stream() + .filter(s -> !s.getValue().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + + LOG.info("Initializing remote segments"); + + final var timeout = Duration.ofMinutes(1); + final var startAt = System.currentTimeMillis(); + + final var metadataRecords = new LinkedHashMap, RemoteLogMetadata>(); + boolean isReady = false; + while (!isReady) { + consumer.poll(Duration.ofSeconds(5)).forEach(r -> { + final RemoteLogMetadata metadata = r.value(); + remoteSegments.putIfAbsent(metadata.topicIdPartition(), new HashSet<>()); + metadataRecords.put(Map.entry(new TopicPartition(r.topic(), r.partition()), r.offset()), metadata); + if (metadata instanceof RemoteLogSegmentMetadata) { + final var segmentMetadata = (RemoteLogSegmentMetadata) metadata; + remoteSegmentStates.put(segmentMetadata.remoteLogSegmentId(), segmentMetadata.state()); + remoteSegments.get(metadata.topicIdPartition()).add( + new RemoteSegment( + segmentMetadata.remoteLogSegmentId(), + segmentMetadata.startOffset(), + segmentMetadata.endOffset() + )); + } else if (metadata instanceof RemoteLogSegmentMetadataUpdate) { + final var update = (RemoteLogSegmentMetadataUpdate) metadata; + remoteSegmentStates.put(update.remoteLogSegmentId(), update.state()); + + // Sanity check: if we see an update, the original record should be already taken into account. + assertThat(remoteSegments.get(metadata.topicIdPartition()).stream() + .filter(rs -> rs.remoteLogSegmentId().equals(update.remoteLogSegmentId())) + .findFirst()).isNotEmpty(); + } else { + fail("Unexpected metadata: %s", metadata); + } + }); + + isReady = segmentsStillCopying.get().isEmpty() // copies have not finished + && expectedPartitions.stream().allMatch(remoteSegments::containsKey); // AND not all segments present + + // check for timeout + final var running = System.currentTimeMillis() - startAt; + if (running > timeout.toMillis()) { + LOG.warn("Timeout waiting for segments copy finished events to arrive after {} running", running); + break; + } + } + + if (!isReady) { // if finished because of timeout + final var notCopied = segmentsStillCopying.get(); + fail("Fail to receive copy metadata records for %s out of %s segments." + + "%nSegments missing: %n%s" + + "%nMetadata events received: %n%s", + notCopied.size(), remoteSegments.size(), + notCopied.entrySet().stream() + .map(e -> e.getKey().toString() + + " => " + + e.getValue()) + .collect(Collectors.joining("\n")), + metadataRecords.entrySet().stream() + .map(e -> e.getKey().getKey() + + "-" + e.getKey().getValue() + ":" + e.getValue() + " => " + + e.getValue()) + .collect(Collectors.joining("\n")) + ); + } + + assertThat(remoteSegments.keySet()).containsExactlyInAnyOrderElementsOf(expectedPartitions); + + LOG.info("Remote Log Metadata Tracker initialized"); + } + + public void waitUntilSegmentsAreDeleted(final List segmentsToBeDeleted) { + final Supplier> segmentsNotDeleted = + () -> segmentsToBeDeleted.stream() + .map(rs -> Map.entry(rs, remoteSegmentStates.get(rs.remoteLogSegmentId()))) + .filter(rs -> !rs.getValue().equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + LOG.info("Starting validation of remote segments deleted"); + + final var timeout = Duration.ofSeconds(30); + final var startAt = System.currentTimeMillis(); + + final var metadataRecords = new LinkedHashMap, RemoteLogMetadata>(); + + boolean isReady = false; + while (!isReady) { + (consumer.poll(Duration.ofSeconds(5))).forEach(r -> { + final RemoteLogMetadata metadata = r.value(); + metadataRecords.put(Map.entry(new TopicPartition(r.topic(), r.partition()), r.offset()), metadata); + if (metadata instanceof RemoteLogSegmentMetadataUpdate) { + final var metadataUpdate = (RemoteLogSegmentMetadataUpdate) metadata; + remoteSegmentStates.put(metadataUpdate.remoteLogSegmentId(), metadataUpdate.state()); + if (metadataUpdate.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)) { + remoteSegments.get(metadata.topicIdPartition()) + .removeIf( + segment -> segment.remoteLogSegmentId().equals(metadataUpdate.remoteLogSegmentId())); + } + } + }); + + isReady = segmentsNotDeleted.get().isEmpty(); + + final var running = System.currentTimeMillis() - startAt; + if (running > timeout.toMillis()) { + LOG.warn("Timeout waiting for segments delete finished events to arrive after {} running", running); + break; + } + } + + if (!segmentsNotDeleted.get().isEmpty()) { + final var notDeleted = segmentsNotDeleted.get(); + fail("Fail to receive delete metadata records for %s out of %s segments." + + "%nSegments missing: %n%s" + + "%nMetadata events received: %n%s", + notDeleted.size(), segmentsToBeDeleted.size(), + notDeleted.entrySet().stream() + .map(e -> e.getKey().remoteLogSegmentId().toString() + + "[" + e.getKey().startOffset() + "-" + e.getKey().endOffset() + "] => " + + e.getValue()) + .collect(Collectors.joining("\n")), + metadataRecords.entrySet().stream() + .map(e -> e.getKey().getKey() + + "-" + e.getKey().getValue() + ":" + e.getValue() + " => " + + e.getValue()) + .collect(Collectors.joining("\n")) + ); + } + + LOG.info("Remote segments deleted validation complete"); + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java new file mode 100644 index 000000000..aa00b0eae --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java @@ -0,0 +1,75 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import java.util.Objects; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; + +public final class RemoteSegment { + private final RemoteLogSegmentId remoteLogSegmentId; + private final long startOffset; + private final long endOffset; + + RemoteSegment(final RemoteLogSegmentId remoteLogSegmentId, + final long startOffset, + final long endOffset) { + this.remoteLogSegmentId = remoteLogSegmentId; + this.startOffset = startOffset; + this.endOffset = endOffset; + } + + public RemoteLogSegmentId remoteLogSegmentId() { + return remoteLogSegmentId; + } + + public long startOffset() { + return startOffset; + } + + public long endOffset() { + return endOffset; + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null || obj.getClass() != this.getClass()) { + return false; + } + final var that = (RemoteSegment) obj; + return Objects.equals(this.remoteLogSegmentId, that.remoteLogSegmentId) + && this.startOffset == that.startOffset + && this.endOffset == that.endOffset; + } + + @Override + public int hashCode() { + return Objects.hash(remoteLogSegmentId, startOffset, endOffset); + } + + @Override + public String toString() { + return "RemoteSegment[" + + "remoteLogSegmentId=" + remoteLogSegmentId + ", " + + "startOffset=" + startOffset + ", " + + "endOffset=" + endOffset + ']'; + } + +} diff --git a/e2e/src/test/resources/log4j.properties b/e2e/src/test/resources/log4j.properties new file mode 100644 index 000000000..9fed516a4 --- /dev/null +++ b/e2e/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +## +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.logger.org.apache.kafka.clients.consumer.KafkaConsumer=WARN + +org.testcontainers=INFO +tc=INFO +com.github.dockerjava=WARN +com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire=OFF diff --git a/settings.gradle b/settings.gradle index a901fae08..daf6bd823 100644 --- a/settings.gradle +++ b/settings.gradle @@ -20,3 +20,4 @@ include 'storage' include 'storage:core' include 'storage:filesystem' include 'storage:s3' +include 'e2e' diff --git a/storage/s3/build.gradle b/storage/s3/build.gradle index 550fed279..50001e516 100644 --- a/storage/s3/build.gradle +++ b/storage/s3/build.gradle @@ -16,14 +16,6 @@ archivesBaseName = "storage-s3" -ext { - // Keep empty lines between versions to avoid conflicts on mass update (e.g. Dependabot). - - awsSdkVersion = "1.12.520" - - testcontainersVersion = "1.18.3" -} - dependencies { implementation project(":storage:core")