diff --git a/e2e/README.md b/e2e/README.md new file mode 100644 index 000000000..18088da90 --- /dev/null +++ b/e2e/README.md @@ -0,0 +1,8 @@ +# End-to-end tests for Kafka tiered storage + +## Usage + +Docker is needed for running the tests. + +1. Build the image with < TBD >. +2. `./gradlew test` diff --git a/e2e/build.gradle b/e2e/build.gradle new file mode 100644 index 000000000..67119e9f2 --- /dev/null +++ b/e2e/build.gradle @@ -0,0 +1,52 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ext { + kafkaVersion = "3.4.0" + junitVersion = "5.9.2" + testcontainersVersion = "1.18.0" + slf4jVersion = "2.0.7" +} + +dependencies { + testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion" + testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion" + testImplementation "org.apache.kafka:kafka-storage-api:$kafkaVersion" + + testImplementation "commons-io:commons-io:2.11.0" + testImplementation "com.amazonaws:aws-java-sdk-s3:1.12.418" + testImplementation "org.slf4j:slf4j-api:$slf4jVersion" + + testImplementation "org.junit.jupiter:junit-jupiter-api:$junitVersion" + testImplementation "org.junit.jupiter:junit-jupiter-params:$junitVersion" + testImplementation "org.assertj:assertj-core:3.24.2" + testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" + testImplementation "org.testcontainers:localstack:$testcontainersVersion" + testImplementation "org.testcontainers:kafka:$testcontainersVersion" + + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion" + testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" +} + +test { + testLogging { + exceptionFormat "full" + + info { + showStackTraces = true + } + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java new file mode 100644 index 000000000..b2d1e92fb --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/LocalSystemSingleBrokerTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import static org.assertj.core.api.Assertions.assertThat; + +public class LocalSystemSingleBrokerTest extends SingleBrokerTest { + // File system backend. + static final String TS_DATA_SUBDIR_HOST = "ts-data"; + static final String TS_DATA_DIR_CONTAINER = "/home/appuser/kafka-tiered-storage"; + + // File system backend. + static Path tempDirTsData; + + @BeforeAll + static void init() { + preInit(); + // File system backend. + tempDirTsData = tempDir.resolve(TS_DATA_SUBDIR_HOST); + tempDirTsData.toFile().mkdirs(); + tempDirTsData.toFile().setWritable(true, false); + + // File system backend. + kafka + .withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS_NAME", + "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_ROOT", TS_DATA_DIR_CONTAINER) + .withFileSystemBind(tempDirTsData.toString(), TS_DATA_DIR_CONTAINER); + + SingleBrokerTest.init(); + } + + @AfterAll + static void cleanup() { + stopKafka(); + cleanupStorage(); + } + + static void cleanupStorage() { + final Path tempDir; + if (tempDirData != null) { + tempDir = tempDirData.getParent(); + } else if (tempDirTsData != null) { + // File system backend. + tempDir = tempDirTsData.getParent(); + } else { + tempDir = null; + } + if (tempDir != null) { + FileUtils.deleteQuietly(tempDir.toFile()); + } + } + + @Override + void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { + final String prefix = String.format("%s-%s", topicName, topicId.toString()); + // File system backend. + assertThat(tempDirTsData.toFile().listFiles()) + .doesNotContain(tempDirTsData.resolve(prefix).toFile()); + } + + @Override + List remotePartitionFiles(final TopicIdPartition topicIdPartition) { + // File system backend. + final Path dir = tempDirTsData.resolve( + String.format("%s-%s/%s", topicIdPartition.topic(), topicIdPartition.topicId().toString(), + topicIdPartition.partition())); + try (final var paths = Files.list(dir)) { + return paths.map(Path::getFileName) + .map(Path::toString) + .sorted() + .collect(Collectors.toList()); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3LocalstackSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3LocalstackSingleBrokerTest.java new file mode 100644 index 000000000..8bb3bfce0 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/S3LocalstackSingleBrokerTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.utility.DockerImageName; + +import static org.assertj.core.api.Assertions.assertThat; + +public class S3LocalstackSingleBrokerTest extends SingleBrokerTest { + + static final LocalStackContainer LOCALSTACK = new LocalStackContainer( + DockerImageName.parse("localstack/localstack:2.0.2") + ) + .withServices(LocalStackContainer.Service.S3) + .withNetwork(NETWORK); + + @BeforeAll + static void init() { + preInit(); + // Localstack backend. + LOCALSTACK.start(); + + // Localstack backend. + kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS_NAME", + "io.aiven.kafka.tieredstorage.storage.s3.S3Storage") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_BUCKET_NAME", BUCKET) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_REGION", LOCALSTACK.getRegion()) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_S3_PATH_STYLE_ACCESS_ENABLED", "true") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_ACCESS_KEY_ID", LOCALSTACK.getAccessKey()) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AWS_SECRET_ACCESS_KEY", LOCALSTACK.getSecretKey()) + .dependsOn(LOCALSTACK); + kafka.withEnv( + "KAFKA_RSM_CONFIG_STORAGE_S3_ENDPOINT_URL", + "http://" + LOCALSTACK.getContainerInfo().getConfig().getHostName() + ":4566"); + + // Localstack backend. + s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + LOCALSTACK.getEndpointOverride(LocalStackContainer.Service.S3).toString(), + LOCALSTACK.getRegion() + ) + ) + .withCredentials( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(LOCALSTACK.getAccessKey(), LOCALSTACK.getSecretKey()) + ) + ) + .build(); + s3Client.createBucket(BUCKET); + + SingleBrokerTest.init(); + } + + + @AfterAll + static void cleanup() { + // Localstack backend. + LOCALSTACK.stop(); + + stopKafka(); + cleanupStorage(); + } + + @Override + void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { + final String prefix = String.format("%s-%s", topicName, topicId.toString()); + + // Localstack backend. + final var summaries = s3Client.listObjectsV2(BUCKET, prefix).getObjectSummaries(); + assertThat(summaries).isEmpty(); + } + + @Override + List remotePartitionFiles(final TopicIdPartition topicIdPartition) { + // Localstack backend. + ListObjectsV2Request request = new ListObjectsV2Request() + .withBucketName(BUCKET); + final List summaries = new ArrayList<>(); + ListObjectsV2Result result; + while ((result = s3Client.listObjectsV2(request)).isTruncated()) { + summaries.addAll(result.getObjectSummaries()); + request = request.withContinuationToken(result.getNextContinuationToken()); + } + summaries.addAll(result.getObjectSummaries()); + + return summaries.stream() + .map(S3ObjectSummary::getKey) + .map(k -> k.substring(k.lastIndexOf('/') + 1)) + .sorted() + .collect(Collectors.toList()); + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java new file mode 100644 index 000000000..bd63b9b38 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/SingleBrokerTest.java @@ -0,0 +1,521 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import io.aiven.kafka.tieredstorage.e2e.internal.RemoteLogMetadataTracker; +import io.aiven.kafka.tieredstorage.e2e.internal.RemoteSegment; +import io.aiven.kafka.tieredstorage.e2e.internal.TestUtils; + +import com.amazonaws.services.s3.AmazonS3; +import com.github.dockerjava.api.model.Ulimit; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import static java.util.stream.Collectors.groupingBy; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +@Testcontainers +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +abstract class SingleBrokerTest { + + static final Logger LOG = LoggerFactory.getLogger(SingleBrokerTest.class); + + static final String KAFKA_DATA_SUBDIR_HOST = "data"; + static final String KAFKA_DATA_DIR_CONTAINER = "/var/lib/kafka/data"; + + static final String BUCKET = "test-bucket"; + + static final String TOPIC_0 = "topic0"; + static final String TOPIC_1 = "topic1"; + static final TopicPartition TP_0_0 = new TopicPartition(TOPIC_0, 0); + static final TopicPartition TP_0_1 = new TopicPartition(TOPIC_0, 1); + static final TopicPartition TP_1_0 = new TopicPartition(TOPIC_1, 0); + + static final int CHUNK_SIZE = 1024; // TODO something more reasonable? + static final int SEGMENT_SIZE_0 = 256 * CHUNK_SIZE + CHUNK_SIZE / 2; + static final int SEGMENT_SIZE_1 = 123 * CHUNK_SIZE + 123; + + static final int VALUE_SIZE_MIN = CHUNK_SIZE / 4 - 3; + static final int VALUE_SIZE_MAX = CHUNK_SIZE * 2 + 5; + + static final int RECORDS_TO_PRODUCE = 10_000; + + static Path tempDir; + // Can't use @TempDir, because it's initialized too late. + static Path tempDirData; + + static final Network NETWORK = Network.newNetwork(); + + static KafkaContainer kafka; + + static AmazonS3 s3Client; + static AdminClient adminClient; + + static RemoteLogMetadataTracker remoteLogMetadataTracker; + + static TopicIdPartition t0p0; + static TopicIdPartition t0p1; + static TopicIdPartition t1p0; + + static void init() { + kafka.start(); + + adminClient = AdminClient.create(Map.of( + "bootstrap.servers", kafka.getBootstrapServers() + )); + + remoteLogMetadataTracker = new RemoteLogMetadataTracker(kafka.getBootstrapServers()); + } + + static void preInit() { + try { + tempDir = Files.createTempDirectory("junit"); + tempDir.toFile().deleteOnExit(); // works only if empty + tempDirData = tempDir.resolve(KAFKA_DATA_SUBDIR_HOST); + tempDirData.toFile().mkdirs(); + tempDirData.toFile().setWritable(true, false); + } catch (final IOException e) { + throw new RuntimeException(e); + } + + kafka = new KafkaContainer(DockerImageName.parse("aivenoy/kafka:3.3-2022-10-06-tiered-storage-1-ts-1") + .asCompatibleSubstituteFor("confluentinc/cp-kafka") + ) + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false") + .withEnv("KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS", "10000") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_SYSTEM_ENABLE", "true") + .withEnv("KAFKA_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS", "5000") + .withEnv("KAFKA_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME", + "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager") + .withEnv("KAFKA_RLMM_CONFIG_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR", "1") + .withEnv("KAFKA_REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME", "BROKER") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH", "/tiered-storage-for-apache-kafka/*") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_NAME", "io.aiven.kafka.tieredstorage.RemoteStorageManager") + .withEnv("KAFKA_RSM_CONFIG_CHUNK_SIZE", "102400") + .withEnv("KAFKA_OPTS", "") // disable JMX exporter + .withCreateContainerCmdModifier( + cmd -> cmd.getHostConfig().withUlimits(List.of(new Ulimit("nofile", 30_000L, 30_000L)))) + .withFileSystemBind(tempDirData.toString(), KAFKA_DATA_DIR_CONTAINER) + .withNetwork(NETWORK); + } + + static void stopKafka() { + LOG.info("Kafka container logs:"); + LOG.info(kafka.getLogs()); + + kafka.stop(); + } + + static void cleanupStorage() { + final Path tempDir; + if (tempDirData != null) { + tempDir = tempDirData.getParent(); + } else { + tempDir = null; + } + if (tempDir != null) { + FileUtils.deleteQuietly(tempDir.toFile()); + } + } + + @Test + @Order(0) + void prepare() throws ExecutionException, InterruptedException, TimeoutException { + createTopics(); + fillTopics(); + waitUntilSegmentsAreTransferredToRemoteTier(); + } + + @Test + @Order(1) + void read() { + KafkaConsumer consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "fetch.max.bytes", "1" + ), new ByteArrayDeserializer(), new ByteArrayDeserializer()); + + // Check the beginning and end offsets. + final Map startOffsets = consumer.beginningOffsets(List.of(TP_0_0, TP_0_1, TP_1_0)); + assertThat(startOffsets.get(TP_0_0)).isEqualTo(0); + assertThat(startOffsets.get(TP_0_1)).isEqualTo(0); + assertThat(startOffsets.get(TP_1_0)).isEqualTo(0); + final Map endOffsets = consumer.endOffsets(List.of(TP_0_0, TP_0_1, TP_1_0)); + assertThat(endOffsets.get(TP_0_0)).isEqualTo(RECORDS_TO_PRODUCE + 1); + assertThat(endOffsets.get(TP_0_1)).isEqualTo(RECORDS_TO_PRODUCE + 1); + assertThat(endOffsets.get(TP_1_0)).isEqualTo(RECORDS_TO_PRODUCE + 1); + + // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client + + // Read by record. + for (final TopicPartition tp : List.of(TP_0_0, TP_0_1, TP_1_0)) { + consumer.assign(List.of(tp)); + for (long offset = 0; offset < RECORDS_TO_PRODUCE; offset++) { + consumer.seek(tp, offset); + final ConsumerRecord record = consumer.poll(Duration.ofSeconds(1)).records(tp).get(0); + final var expectedRecord = createProducerRecord(tp.topic(), tp.partition(), offset); + checkRecord(record, offset, expectedRecord); + } + } + + // Read by batches. + for (final TopicPartition tp : List.of(TP_0_0, TP_0_1, TP_1_0)) { + consumer.assign(List.of(tp)); + long offset = 0; + while (offset < RECORDS_TO_PRODUCE) { + consumer.seek(tp, offset); + final List> records = consumer.poll(Duration.ofSeconds(1)).records(tp); + assertThat(records.size()).isEqualTo(batchSize(offset)); + for (final ConsumerRecord record : records) { + final var expectedRecord = createProducerRecord(tp.topic(), tp.partition(), offset); + checkRecord(record, offset, expectedRecord); + offset += 1; + } + } + } + consumer.close(); + + // Read over batch borders. + final ArrayList batchBorders = new ArrayList<>(); + // Skip the first and last batches because we can't read "over" their left and right border. + for (long offset = 1; offset < RECORDS_TO_PRODUCE - 1; ) { + batchBorders.add(offset); + final int batchSize = batchSize(offset); + offset += batchSize; + } + consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "key.deserializer", ByteArrayDeserializer.class, + "value.deserializer", ByteArrayDeserializer.class, + "fetch.max.bytes", Integer.toString(VALUE_SIZE_MAX * 50) + )); + for (final TopicPartition tp : List.of(TP_0_0, TP_0_1, TP_1_0)) { + consumer.assign(List.of(tp)); + for (final long batchBorder : batchBorders) { + consumer.seek(tp, batchBorder - 1); + List> records = consumer.poll(Duration.ofSeconds(1)).records(tp); + checkRecord( + records.get(0), + batchBorder - 1, + createProducerRecord(tp.topic(), tp.partition(), batchBorder - 1)); + if (records.size() > 1) { + checkRecord( + records.get(1), + batchBorder, + createProducerRecord(tp.topic(), tp.partition(), batchBorder)); + } else { + // It's possible when the batch is the last in the segment: + // the broker won't return records over a segment border. + records = consumer.poll(Duration.ofSeconds(1)).records(tp); + checkRecord( + records.get(0), + batchBorder, + createProducerRecord(tp.topic(), tp.partition(), batchBorder)); + } + } + } + consumer.close(); + } + + @Test + @Disabled("https://github.com/aiven/kafka/issues/20") + @Order(2) + void remoteManualDelete() throws ExecutionException, InterruptedException, TimeoutException { + final int newStartOffset = RECORDS_TO_PRODUCE / 2; + + final List remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments() + .get(t0p0); + final List segmentsToBeDeleted = remoteSegmentsBefore.stream() + .filter(rs -> rs.endOffset() < newStartOffset) + .collect(Collectors.toList()); + + adminClient.deleteRecords(Map.of(TP_0_0, RecordsToDelete.beforeOffset(newStartOffset))) + .all().get(5, TimeUnit.SECONDS); + + remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted); + + final KafkaConsumer consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "auto.offset.reset", "earliest" + ), new ByteArrayDeserializer(), new ByteArrayDeserializer()); + + // Check the beginning and end offsets. + final Map startOffsets = consumer.beginningOffsets(List.of(TP_0_0)); + assertThat(startOffsets.get(TP_0_0)).isEqualTo(newStartOffset); + final Map endOffsets = consumer.endOffsets(List.of(TP_0_0)); + assertThat(endOffsets.get(TP_0_0)).isEqualTo(RECORDS_TO_PRODUCE + 1); + // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client + + // TODO check segments deleted on the remote + + // Check what we can now consume. + consumer.assign(List.of(TP_0_0)); + consumer.seek(TP_0_0, 0); + final ConsumerRecord record = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0).get(0); + assertThat(record.offset()).isEqualTo(newStartOffset); + } + + @Test + @Order(3) + void remoteCleanupDueToRetention() throws ExecutionException, InterruptedException, TimeoutException, IOException { + final var remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments(); + final var segmentsToBeDeleted = Stream.concat( + remoteSegmentsBefore.get(t0p0).stream(), + remoteSegmentsBefore.get(t0p1).stream() + ).collect(Collectors.toList()); + + final AlterConfigOp alterConfigOp = new AlterConfigOp( + new ConfigEntry("retention.bytes", "1"), AlterConfigOp.OpType.SET); + adminClient.incrementalAlterConfigs(Map.of( + new ConfigResource(ConfigResource.Type.TOPIC, TP_0_1.topic()), List.of(alterConfigOp) + )).all().get(5, TimeUnit.SECONDS); + + final KafkaConsumer consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "auto.offset.reset", "earliest" + ), new ByteArrayDeserializer(), new ByteArrayDeserializer()); + + final long newStartOffset = localPartitionFiles(TP_0_0).stream() + .filter(f -> f.getName().endsWith(".log")) + .mapToLong(f -> Long.parseLong(f.getName().replace(".log", ""))) + .max() + .getAsLong(); + TestUtils.waitUntilTrue( + () -> consumer.beginningOffsets(List.of(TP_0_0)).get(TP_0_0).equals(newStartOffset), + "Start offset is not " + newStartOffset, + Duration.ofSeconds(30) + ); + final Map endOffsets = consumer.endOffsets(List.of(TP_0_0)); + assertThat(endOffsets.get(TP_0_0)).isEqualTo(RECORDS_TO_PRODUCE + 1); + // TODO check for EARLIEST_LOCAL_TIMESTAMP when available in client + + consumer.assign(List.of(TP_0_0)); + consumer.seek(TP_0_0, 0); + final ConsumerRecord record = consumer.poll(Duration.ofSeconds(1)).records(TP_0_0).get(0); + assertThat(record.offset()).isEqualTo(newStartOffset); + + remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted); + assertNoTopicDataOnTierStorage(t0p0.topic(), t0p0.topicId()); + } + + @Test + @Order(4) + void topicDelete() throws ExecutionException, InterruptedException, TimeoutException { + final var remoteSegmentsBefore = remoteLogMetadataTracker.remoteSegments(); + final var segmentsToBeDeleted = remoteSegmentsBefore.get(t1p0); + + adminClient.deleteTopics(List.of(TP_1_0.topic())) + .all().get(30, TimeUnit.SECONDS); + + remoteLogMetadataTracker.waitUntilSegmentsAreDeleted(segmentsToBeDeleted); + assertNoTopicDataOnTierStorage(t1p0.topic(), t1p0.topicId()); + } + + void createTopics() throws ExecutionException, InterruptedException, TimeoutException { + final NewTopic newTopic0 = new NewTopic(TOPIC_0, 2, (short) 1) + .configs(Map.of( + "remote.storage.enable", "true", + "segment.bytes", Integer.toString(SEGMENT_SIZE_0), + "retention.ms", "3600000", + "local.retention.ms", "5000" + )); + final NewTopic newTopic1 = new NewTopic(TOPIC_1, 1, (short) 1) + .configs(Map.of( + "remote.storage.enable", "true", + "segment.bytes", Integer.toString(SEGMENT_SIZE_1), + "retention.ms", "3600000", + "local.retention.ms", "5000" + )); + adminClient.createTopics(List.of(newTopic0, newTopic1)) + .all().get(30, TimeUnit.SECONDS); + + adminClient.describeTopics(List.of(TOPIC_0, TOPIC_1)) + .allTopicNames().get(30, TimeUnit.SECONDS) + .values().forEach(td -> { + if (td.name().equals(TOPIC_0)) { + t0p0 = new TopicIdPartition(td.topicId(), TP_0_0); + t0p1 = new TopicIdPartition(td.topicId(), TP_0_1); + } else if (td.name().equals(TOPIC_1)) { + t1p0 = new TopicIdPartition(td.topicId(), TP_1_0); + } else { + fail("Unknown topic %s" + td); + } + }); + } + + void fillTopics() throws ExecutionException, InterruptedException, TimeoutException { + final KafkaProducer producer = new KafkaProducer<>(Map.of( + "bootstrap.servers", kafka.getBootstrapServers(), + "linger.ms", "10000", + "batch.size", "10000000" + ), new ByteArraySerializer(), new ByteArraySerializer()); + + for (final TopicPartition topicPartition : List.of(TP_0_0, TP_0_1, TP_1_0)) { + long offset = 0; + while (offset < RECORDS_TO_PRODUCE) { + final int batchSize = batchSize(offset); + final ArrayList> sendFutures = new ArrayList<>(batchSize); + for (int i = 0; i < batchSize; i++) { + final var record = createProducerRecord( + topicPartition.topic(), + topicPartition.partition(), + offset++ + ); + final Future sendFuture = producer.send(record); + sendFutures.add(sendFuture); + } + producer.flush(); + for (final Future f : sendFutures) { + f.get(30, TimeUnit.SECONDS); + } + } + } + } + + void waitUntilSegmentsAreTransferredToRemoteTier() { + remoteLogMetadataTracker.initialize(); + + // Check remote segments are present. + final var remoteSegments = remoteLogMetadataTracker.remoteSegments(); + assertThat(remoteSegments.keySet().stream().map(TopicIdPartition::topicPartition)) + .containsExactlyInAnyOrder(TP_0_0, TP_0_1, TP_1_0); + for (final Map.Entry> entry : remoteSegments.entrySet()) { + final Map> segmentFiles = remotePartitionFiles(entry.getKey()).stream() + .collect(groupingBy(SingleBrokerTest::extractSegmentId)); + for (final RemoteSegment remoteLogSegment : entry.getValue()) { + final String key = remoteLogSegment.remoteLogSegmentId().id().toString(); + assertThat(segmentFiles).containsKey(key); + assertThat(segmentFiles.get(key).stream().map(SingleBrokerTest::extractSuffix)) + .containsExactlyInAnyOrder( + "index", "leader-epoch-checkpoint", "log", "rsm-manifest", "snapshot", "timeindex"); + } + } + + // Check local segments are deleted. + for (final TopicPartition tp : List.of(TP_0_0, TP_0_1, TP_1_0)) { + TestUtils.waitUntilTrue( + () -> { + try { + return localPartitionFiles(tp).stream() + .filter(f -> f.getName().endsWith(".log")).count() == 1; + } catch (final IOException e) { + throw new RuntimeException(e); + } + }, + "Segments are still present locally", + Duration.ofSeconds(60) + ); + } + } + + private static String extractSegmentId(final String fileName) { + return fileName.substring(21, fileName.lastIndexOf('.')); + } + + private static String extractSuffix(final String fileName) { + return fileName.substring(fileName.lastIndexOf('.') + 1); + } + + private static int batchSize(final long offset) { + return (int) offset % 10 + 1; + } + + private static ProducerRecord createProducerRecord(final String topic, + final int partition, + final long offset) { + final int d = (int) ((offset + partition) % 10); + final int keySize = d * 2 + 1; + final ByteBuffer key = ByteBuffer.allocate(keySize); + final byte[] keyPattern = (topic + "-" + partition + "-" + offset).getBytes(); + while (key.remaining() >= keyPattern.length) { + key.put(keyPattern); + } + key.put(keyPattern, 0, key.remaining()); + assert key.remaining() == 0; + + final int valueSize = VALUE_SIZE_MIN + (VALUE_SIZE_MAX - VALUE_SIZE_MIN) / 10 * d; + final ByteBuffer value = ByteBuffer.allocate(valueSize); + + return new ProducerRecord<>(topic, partition, key.array(), value.array()); + } + + private void checkRecord(final ConsumerRecord actual, + final long offset, + final ProducerRecord expected) { + assertThat(actual.offset()).isEqualTo(offset); + assertThat(actual.key()).isEqualTo(expected.key()); + assertThat(actual.value()).isEqualTo(expected.value()); + } + + private static List localPartitionFiles(final TopicPartition tp) throws IOException { + final Path dir = tempDirData.resolve(String.format("%s-%d", tp.topic(), tp.partition())); + try (final var paths = Files.list(dir)) { + return paths + .map(Path::toFile) + .sorted(Comparator.comparing(File::getName)) + .collect(Collectors.toList()); + } + } + + abstract List remotePartitionFiles(final TopicIdPartition topicIdPartition); + + abstract void assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId); +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java new file mode 100644 index 000000000..232099762 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataDeserializer.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; + +public class RemoteLogMetadataDeserializer implements Deserializer { + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + + @Override + public RemoteLogMetadata deserialize(final String topic, final byte[] data) { + return serde.deserialize(data); + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java new file mode 100644 index 000000000..77cc5603a --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java @@ -0,0 +1,159 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; + +import org.assertj.core.api.Assertions; + +import static org.assertj.core.api.Assertions.fail; + +public class RemoteLogMetadataTracker { + private static final String REMOTE_LOG_METADATA_TOPIC = "__remote_log_metadata"; + + private final KafkaConsumer consumer; + private List partitions; + + private final Map> remoteSegments = new HashMap<>(); + private final Map remoteSegmentStates = new HashMap<>(); + + public RemoteLogMetadataTracker(final String bootstrapServers) { + consumer = new KafkaConsumer<>(Map.of( + "bootstrap.servers", bootstrapServers, + "auto.offset.reset", "earliest" + ), new ByteArrayDeserializer(), new RemoteLogMetadataDeserializer()); + } + + public Map> remoteSegments() { + final Map> result = new HashMap<>(); + for (final Map.Entry> entry : remoteSegments.entrySet()) { + final List list = entry.getValue().stream() + .sorted(Comparator.comparing(RemoteSegment::startOffset)) + .collect(Collectors.toList()); + result.put(entry.getKey(), list); + } + return result; + } + + /** + * Initializes the tracker. + * + *

It expects at least one record to be present in __remote_log_metadata + * and that all remote segments are in {@code COPY_SEGMENT_FINISHED} state. + */ + public void initialize() { + // TODO set some max wait time + + partitions = consumer.partitionsFor(REMOTE_LOG_METADATA_TOPIC).stream() + .map(pi -> new TopicPartition(pi.topic(), pi.partition())) + .collect(Collectors.toList()); + + TestUtils.waitUntilTrue( + () -> consumer.endOffsets(partitions).values().stream().mapToLong(Long::longValue).sum() > 0, + "No remote log metadata", + Duration.ofSeconds(60) + ); + + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + + ConsumerRecords records; + while (!(records = consumer.poll(Duration.ofSeconds(5))).isEmpty() || !this.allRemoteSegmentsAreFinished()) { + records.forEach(r -> { + final RemoteLogMetadata metadata = r.value(); + remoteSegments.putIfAbsent(metadata.topicIdPartition(), new HashSet<>()); + if (metadata instanceof RemoteLogSegmentMetadata) { + final var rlsm = (RemoteLogSegmentMetadata) metadata; + remoteSegmentStates.put(rlsm.remoteLogSegmentId(), rlsm.state()); + remoteSegments.get(metadata.topicIdPartition()).add( + new RemoteSegment(rlsm.remoteLogSegmentId(), rlsm.startOffset(), rlsm.endOffset()) + ); + } else if (metadata instanceof RemoteLogSegmentMetadataUpdate) { + final var rlsmu = (RemoteLogSegmentMetadataUpdate) metadata; + remoteSegmentStates.put(rlsmu.remoteLogSegmentId(), rlsmu.state()); + + // Sanity check: if we see an update, the original record should be already taken into account. + Assertions.assertThat(remoteSegments.get(metadata.topicIdPartition()).stream() + .filter(rs -> rs.remoteLogSegmentId().equals(rlsmu.remoteLogSegmentId())) + .findFirst()).isNotEmpty(); + } else { + fail("Unexpected metadata: %s", metadata); + } + }); + } + } + + private boolean allRemoteSegmentsAreFinished() { + return remoteSegmentStates.values().stream() + .allMatch(s -> s.equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)); + } + + public void waitUntilSegmentsAreDeleted(final List segmentsToBeDeleted) { + final Supplier segmentsDeleted = () -> segmentsToBeDeleted.stream() + .allMatch(rs -> + remoteSegmentStates.get(rs.remoteLogSegmentId()).equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)); + ConsumerRecords records; + + final var metadataRecords = new ArrayList(); + final var timeout = Duration.ofSeconds(30); + + final var startAt = System.currentTimeMillis(); + + while (!(records = consumer.poll(Duration.ofSeconds(5))).isEmpty() || !segmentsDeleted.get()) { + records.forEach(r -> { + final RemoteLogMetadata metadata = r.value(); + metadataRecords.add(metadata); + if (metadata instanceof RemoteLogSegmentMetadataUpdate) { + final var metadataUpdate = (RemoteLogSegmentMetadataUpdate) metadata; + remoteSegmentStates.put(metadataUpdate.remoteLogSegmentId(), metadataUpdate.state()); + if (metadataUpdate.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)) { + remoteSegments.get(metadata.topicIdPartition()) + .removeIf( + segment -> segment.remoteLogSegmentId().equals(metadataUpdate.remoteLogSegmentId())); + } + } + }); + if (System.currentTimeMillis() - startAt > timeout.toMillis()) { + break; + } + } + + if (!segmentsDeleted.get()) { + fail("Fail to receive delete metadata records. Records received: %s", metadataRecords); + } + } +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java new file mode 100644 index 000000000..aa00b0eae --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteSegment.java @@ -0,0 +1,75 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import java.util.Objects; + +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; + +public final class RemoteSegment { + private final RemoteLogSegmentId remoteLogSegmentId; + private final long startOffset; + private final long endOffset; + + RemoteSegment(final RemoteLogSegmentId remoteLogSegmentId, + final long startOffset, + final long endOffset) { + this.remoteLogSegmentId = remoteLogSegmentId; + this.startOffset = startOffset; + this.endOffset = endOffset; + } + + public RemoteLogSegmentId remoteLogSegmentId() { + return remoteLogSegmentId; + } + + public long startOffset() { + return startOffset; + } + + public long endOffset() { + return endOffset; + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null || obj.getClass() != this.getClass()) { + return false; + } + final var that = (RemoteSegment) obj; + return Objects.equals(this.remoteLogSegmentId, that.remoteLogSegmentId) + && this.startOffset == that.startOffset + && this.endOffset == that.endOffset; + } + + @Override + public int hashCode() { + return Objects.hash(remoteLogSegmentId, startOffset, endOffset); + } + + @Override + public String toString() { + return "RemoteSegment[" + + "remoteLogSegmentId=" + remoteLogSegmentId + ", " + + "startOffset=" + startOffset + ", " + + "endOffset=" + endOffset + ']'; + } + +} diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/TestUtils.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/TestUtils.java new file mode 100644 index 000000000..d4c258f57 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/TestUtils.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e.internal; + +import java.time.Duration; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.fail; + +public class TestUtils { + public static void waitUntilTrue(final Supplier condition, + final String failMsg, + final Duration waitTime) { + final long pause = 100L; + final long waitTimeMs = waitTime.toMillis(); + final long startTime = System.currentTimeMillis(); + while (true) { + if (condition.get()) { + return; + } else if (System.currentTimeMillis() > startTime + waitTimeMs) { + fail(failMsg); + } + try { + Thread.sleep(Math.min(waitTimeMs, pause)); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/e2e/src/test/resources/log4j.properties b/e2e/src/test/resources/log4j.properties new file mode 100644 index 000000000..a0b872fb6 --- /dev/null +++ b/e2e/src/test/resources/log4j.properties @@ -0,0 +1,23 @@ +## +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.logger.org.apache.kafka.clients.consumer.KafkaConsumer=WARN diff --git a/settings.gradle b/settings.gradle index a901fae08..daf6bd823 100644 --- a/settings.gradle +++ b/settings.gradle @@ -20,3 +20,4 @@ include 'storage' include 'storage:core' include 'storage:filesystem' include 'storage:s3' +include 'e2e'