diff --git a/.github/workflows/main_push_and_pull_request_workflow.yml b/.github/workflows/main_push_and_pull_request_workflow.yml index 3ec679be0..307acb9b7 100644 --- a/.github/workflows/main_push_and_pull_request_workflow.yml +++ b/.github/workflows/main_push_and_pull_request_workflow.yml @@ -35,7 +35,7 @@ jobs: strategy: matrix: java-version: [ 11 ] - test: [ 'LocalSystem', 'S3', 'Gcs' ] + test: [ 'LocalSystem', 'S3', 'Gcs', 'Azure' ] name: E2E tests for ${{ matrix.test }} with jdk ${{ matrix.java-version }} runs-on: ubuntu-latest steps: diff --git a/Makefile b/Makefile index b5d9deaad..3dcce492a 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ clean: checkstyle: ./gradlew checkstyleMain checkstyleTest checkstyleIntegrationTest -build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz +build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz storage/gcs/build/distributions/azure-$(VERSION).tgz build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz: ./gradlew build distTar -x test -x integrationTest -x e2e:test @@ -39,6 +39,9 @@ storage/s3/build/distributions/s3-$(VERSION).tgz: storage/gcs/build/distributions/gcs-$(VERSION).tgz: ./gradlew build :storage:gcs:distTar -x test -x integrationTest -x e2e:test +storage/gcs/build/distributions/azure-$(VERSION).tgz: + ./gradlew build :storage:azure:distTar -x test -x integrationTest -x e2e:test + test: build ./gradlew test -x e2e:test diff --git a/build.gradle b/build.gradle index 244d22dca..ea7004954 100644 --- a/build.gradle +++ b/build.gradle @@ -119,6 +119,8 @@ subprojects { gcpSdkVersion = "2.26.1" + azureSdkVersion = "1.2.17" + testcontainersVersion = "1.19.0" testcontainersFakeGcsServerVersion = "0.2.0" diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index df303d627..ed2d23d9d 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -28,6 +28,7 @@ + diff --git a/demo/Makefile b/demo/Makefile index dfd30a0c6..e5f880ae2 100644 --- a/demo/Makefile +++ b/demo/Makefile @@ -111,12 +111,17 @@ run_s3_minio: run_gcs_fake_gcs_server: docker compose -f compose-gcs-fake-gcs-server.yml up +.PHONY: run_azure_blob_azurite +run_azure_blob_azurite: + docker compose -f compose-azure-blob-azurite.yml up + .PHONY: clean clean: docker compose -f compose-local-fs.yml down docker compose -f compose-s3-aws.yml down docker compose -f compose-s3-minio.yml down docker compose -f compose-gcs-fake-gcs-server.yml down + docker compose -f compose-azure-blob-azurite.yml down .PHONY: show_local_data show_local_data: @@ -141,6 +146,14 @@ show_remote_data_s3_minio: show_remote_data_gcs_fake_gcs_server: curl http://localhost:4443/storage/v1/b/test-bucket/o | jq -r '.items | map(.name) | .[]' +.PHONY: show_remote_data_azurite +show_remote_data_azurite: + docker run --rm --network=host mcr.microsoft.com/azure-cli \ + az storage blob list --container-name test-container \ + --account-name devstoreaccount1 \ + --account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== \ + --blob-endpoint http://127.0.0.1:10000/devstoreaccount1 --output table | grep $(topic) + offset = 0 consume = 10 .PHONY: consume diff --git a/demo/compose-azure-blob-azurite.yml b/demo/compose-azure-blob-azurite.yml new file mode 100644 index 000000000..3987c0973 --- /dev/null +++ b/demo/compose-azure-blob-azurite.yml @@ -0,0 +1,89 @@ +## +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## +version: '3.8' +services: + zookeeper: + image: "confluentinc/cp-zookeeper:7.3.3" + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: "aivenoy/kafka-with-ts-plugin" + container_name: "kafka-ts" + depends_on: + - zookeeper + - azurite + ports: + - "9092:9092" + - "7000:7000" #prometheus metrics + environment: + KAFKA_BROKER_ID: 0 + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:29092" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092,BROKER://kafka:29092" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: false + # Increase Tiered Storage log level + KAFKA_LOG4J_LOGGERS: "io.aiven.kafka.tieredstorage=DEBUG" + # Tweak retention checking + KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000 + # Enable Tiered Storage + KAFKA_REMOTE_LOG_STORAGE_SYSTEM_ENABLE: true + KAFKA_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS: 5000 + # Remote metadata manager + KAFKA_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME: "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager" + KAFKA_REMOTE_LOG_METADATA_MANAGER_IMPL_PREFIX: "rlmm.config." + KAFKA_REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME: "BROKER" + KAFKA_RLMM_CONFIG_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR: 1 + # Remote storage manager + KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH: "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*" + KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_NAME: "io.aiven.kafka.tieredstorage.RemoteStorageManager" + KAFKA_REMOTE_LOG_STORAGE_MANAGER_IMPL_PREFIX: "rsm.config." + KAFKA_RSM_CONFIG_CHUNK_SIZE: 5242880 # 5MiB + KAFKA_RSM_CONFIG_CHUNK_CACHE_CLASS: "io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache" + KAFKA_RSM_CONFIG_CHUNK_CACHE_SIZE: -1 + KAFKA_RSM_CONFIG_CUSTOM_METADATA_FIELDS_INCLUDE: "REMOTE_SIZE" + # Storage backend + KAFKA_RSM_CONFIG_KEY_PREFIX: "tiered-storage-demo/" + KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS: "io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage" + KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME: "test-container" + KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME: "devstoreaccount1" + KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL: "http://azurite:10000/devstoreaccount1" + + azurite: + image: mcr.microsoft.com/azure-storage/azurite + ports: + - "10000:10000" + command: azurite-blob --blobHost 0.0.0.0 + + azurite-create-container: + image: mcr.microsoft.com/azure-cli + restart: "no" + depends_on: + - azurite + command: > + az storage container create --name test-container + --account-name devstoreaccount1 + --account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + --blob-endpoint http://azurite:10000/devstoreaccount1 diff --git a/docker/Dockerfile b/docker/Dockerfile index ffda0574f..05230ecbc 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -21,7 +21,8 @@ USER root RUN mkdir -p /tiered-storage-for-apache-kafka/core \ && mkdir -p /tiered-storage-for-apache-kafka/s3 \ - && mkdir -p /tiered-storage-for-apache-kafka/gcs + && mkdir -p /tiered-storage-for-apache-kafka/gcs \ + && mkdir -p /tiered-storage-for-apache-kafka/azure COPY build/distributions/tiered-storage-for-apache-kafka-${_VERSION}.tgz /tiered-storage-for-apache-kafka/core RUN cd /tiered-storage-for-apache-kafka/core \ @@ -38,6 +39,11 @@ RUN cd /tiered-storage-for-apache-kafka/gcs \ && tar -xf gcs-${_VERSION}.tgz --strip-components=1 \ && rm gcs-${_VERSION}.tgz +COPY storage/azure/build/distributions/azure-${_VERSION}.tgz /tiered-storage-for-apache-kafka/azure +RUN cd /tiered-storage-for-apache-kafka/azure \ + && tar -xf azure-${_VERSION}.tgz --strip-components=1 \ + && rm azure-${_VERSION}.tgz + # Installing JMX exporter agent ARG JMX_EXPORTER_VERSION=0.18.0 RUN mkdir -p /opt/prometheus/jmx-exporter diff --git a/e2e/build.gradle b/e2e/build.gradle index e3903c67e..582904b2a 100644 --- a/e2e/build.gradle +++ b/e2e/build.gradle @@ -31,6 +31,9 @@ dependencies { testImplementation "io.aiven:testcontainers-fake-gcs-server:$testcontainersFakeGcsServerVersion" + testImplementation platform("com.azure:azure-sdk-bom:$azureSdkVersion") + testImplementation "com.azure:azure-storage-blob" + testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" } diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java new file mode 100644 index 000000000..93d738540 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.ListBlobsOptions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class AzureSingleBrokerTest extends SingleBrokerTest { + private static final int BLOB_STORAGE_PORT = 10000; + static final String NETWORK_ALIAS = "blob-storage"; + + static final GenericContainer AZURITE_SERVER = + new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite")) + .withExposedPorts(BLOB_STORAGE_PORT) + .withCommand("azurite-blob --blobHost 0.0.0.0") + .withNetwork(NETWORK) + .withNetworkAliases(NETWORK_ALIAS); + + static final String AZURE_CONTAINER = "test-container"; + + static BlobContainerClient blobContainerClient; + + @BeforeAll + static void init() throws Exception { + AZURITE_SERVER.start(); + + // The well-known Azurite account name and key. + final String accountName = "devstoreaccount1"; + final String accountKey = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + + final String endpointForTestCode = + "http://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1"; + final String connectionString = "DefaultEndpointsProtocol=http;" + + "AccountName=" + accountName + ";" + + "AccountKey=" + accountKey + ";" + + "BlobEndpoint=" + endpointForTestCode + ";"; + final var blobServiceClient = new BlobServiceClientBuilder() + .connectionString(connectionString) + .buildClient(); + blobContainerClient = blobServiceClient.createBlobContainer(AZURE_CONTAINER); + + final String endpointForKafka = "http://" + NETWORK_ALIAS + ":" + BLOB_STORAGE_PORT + "/devstoreaccount1"; + setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS", + "io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH", + "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME", AZURE_CONTAINER) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME", accountName) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY", accountKey) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL", endpointForKafka) + .dependsOn(AZURITE_SERVER)); + } + + @AfterAll + static void cleanup() { + stopKafka(); + + AZURITE_SERVER.stop(); + + cleanupStorage(); + } + + @Override + boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { + final String prefix = String.format("%s-%s", topicName, topicId.toString()); + + final var list = blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(prefix), null); + return list.stream().findAny().isEmpty(); + } + + @Override + List remotePartitionFiles(final TopicIdPartition topicIdPartition) { + return blobContainerClient.listBlobs().stream() + .map(BlobItem::getName) + .map(k -> k.substring(k.lastIndexOf('/') + 1)) + .sorted() + .collect(Collectors.toList()); + } +} diff --git a/settings.gradle b/settings.gradle index 425fbe138..1d6840abc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,6 +19,7 @@ include 'core' include 'storage' include 'storage:core' include 'storage:filesystem' +include 'storage:azure' include 'storage:gcs' include 'storage:s3' include 'e2e' diff --git a/storage/azure/build.gradle b/storage/azure/build.gradle new file mode 100644 index 000000000..0d5b46f88 --- /dev/null +++ b/storage/azure/build.gradle @@ -0,0 +1,35 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +archivesBaseName = "storage-azure" + +dependencies { + implementation project(":storage:core") + + implementation platform("com.azure:azure-sdk-bom:$azureSdkVersion") + implementation ("com.azure:azure-identity") { + exclude group: "org.slf4j" + } + implementation ("com.azure:azure-storage-blob") { + exclude group: "org.slf4j" + } + + implementation project(":commons") + + testImplementation(testFixtures(project(":storage:core"))) + + testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" +} diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java new file mode 100644 index 000000000..f1a6dfdfa --- /dev/null +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.Map; + +import io.aiven.kafka.tieredstorage.storage.StorageBackend; + +class AzureBlobStorageAccountKeyTest extends AzureBlobStorageTest { + @Override + protected StorageBackend storage() { + final AzureBlobStorage azureBlobStorage = new AzureBlobStorage(); + // The well-known Azurite account name and key. + final String accountName = "devstoreaccount1"; + final String accountKey = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + final Map configs = Map.of( + "azure.container.name", azureContainerName, + "azure.account.name", accountName, + "azure.account.key", accountKey, + "azure.endpoint.url", endpoint() + ); + azureBlobStorage.configure(configs); + return azureBlobStorage; + } +} diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java new file mode 100644 index 000000000..6ab5e2b73 --- /dev/null +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.Map; + +import io.aiven.kafka.tieredstorage.storage.StorageBackend; + +class AzureBlobStorageConnectionStringTest extends AzureBlobStorageTest { + @Override + protected StorageBackend storage() { + final AzureBlobStorage azureBlobStorage = new AzureBlobStorage(); + final Map configs = Map.of( + "azure.container.name", azureContainerName, + "azure.connection.string", connectionString() + ); + azureBlobStorage.configure(configs); + return azureBlobStorage; + } +} diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java new file mode 100644 index 000000000..55f72746f --- /dev/null +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.io.ByteArrayInputStream; + +import io.aiven.kafka.tieredstorage.storage.BaseStorageTest; +import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; + +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@Testcontainers +abstract class AzureBlobStorageTest extends BaseStorageTest { + private static final int BLOB_STORAGE_PORT = 10000; + @Container + static final GenericContainer AZURITE_SERVER = + new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite")) + .withExposedPorts(BLOB_STORAGE_PORT) + .withCommand("azurite-blob --blobHost 0.0.0.0"); + + static BlobServiceClient blobServiceClient; + + protected String azureContainerName; + + protected static String endpoint() { + return "http://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1"; + } + + protected static String connectionString() { + // The well-known Azurite connection string. + return "DefaultEndpointsProtocol=http;" + + "AccountName=devstoreaccount1;" + + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" + + "BlobEndpoint=" + endpoint() + ";"; + } + + @BeforeAll + static void setUpClass() { + blobServiceClient = new BlobServiceClientBuilder() + .connectionString(connectionString()) + .buildClient(); + } + + @BeforeEach + void setUp(final TestInfo testInfo) { + azureContainerName = testInfo.getDisplayName() + .toLowerCase() + .replace("(", "") + .replace(")", ""); + while (azureContainerName.length() < 3) { + azureContainerName += azureContainerName; + } + blobServiceClient.createBlobContainer(azureContainerName); + } + + @Override + protected void testFetchWithRangeOutsideFileSize() throws StorageBackendException { + // For some reason, Azure (or only Azurite) considers range 3-5 valid for a 3-byte blob, + // so the generic test fails. + final String content = "ABC"; + storage().upload(new ByteArrayInputStream(content.getBytes()), TOPIC_PARTITION_SEGMENT_KEY); + + assertThatThrownBy(() -> storage().fetch(TOPIC_PARTITION_SEGMENT_KEY, BytesRange.of(4, 6))) + .isInstanceOf(InvalidRangeException.class); + } +} diff --git a/storage/azure/src/integration-test/resources/log4j.properties b/storage/azure/src/integration-test/resources/log4j.properties new file mode 100644 index 000000000..3e86cfa01 --- /dev/null +++ b/storage/azure/src/integration-test/resources/log4j.properties @@ -0,0 +1,21 @@ +# +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n diff --git a/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java new file mode 100644 index 000000000..bf04e0878 --- /dev/null +++ b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java @@ -0,0 +1,149 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; +import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; +import io.aiven.kafka.tieredstorage.storage.StorageBackend; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; + +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobRange; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; + +public class AzureBlobStorage implements StorageBackend { + private AzureStorageConfig config; + private BlobContainerClient blobContainerClient; + + @Override + public void configure(final Map configs) { + final AzureStorageConfig config = new AzureStorageConfig(configs); + this.config = config; + + final BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder(); + if (config.connectionString() != null) { + blobServiceClientBuilder.connectionString(config.connectionString()); + } else { + blobServiceClientBuilder.endpoint(endpointUrl()); + + if (config.accountKey() == null) { + blobServiceClientBuilder.credential( + new DefaultAzureCredentialBuilder().build()); + } else { + blobServiceClientBuilder.credential( + new StorageSharedKeyCredential(config.accountName(), config.accountKey())); + } + } + + blobContainerClient = blobServiceClientBuilder.buildClient() + .getBlobContainerClient(config.containerName()); + } + + private String endpointUrl() { + if (config.endpointUrl() != null) { + return config.endpointUrl(); + } else { + return "https://" + config.accountName() + ".blob.core.windows.net"; + } + } + + @Override + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { + final var specializedBlobClientBuilder = new SpecializedBlobClientBuilder(); + if (config.connectionString() != null) { + specializedBlobClientBuilder.connectionString(config.connectionString()); + } else { + specializedBlobClientBuilder.endpoint(endpointUrl()); + + if (config.accountKey() == null) { + specializedBlobClientBuilder.credential( + new DefaultAzureCredentialBuilder().build()); + } else { + specializedBlobClientBuilder.credential( + new StorageSharedKeyCredential(config.accountName(), config.accountKey())); + } + } + final BlockBlobClient blockBlobClient = specializedBlobClientBuilder + .containerName(config.containerName()) + .blobName(key.value()) + .buildBlockBlobClient(); + + try (OutputStream os = new BufferedOutputStream(blockBlobClient.getBlobOutputStream(true))) { + return inputStream.transferTo(os); + } catch (final IOException e) { + throw new StorageBackendException("Failed to upload " + key, e); + } + } + + @Override + public InputStream fetch(final ObjectKey key) throws StorageBackendException { + try { + return blobContainerClient.getBlobClient(key.value()).openInputStream(); + } catch (final BlobStorageException e) { + if (e.getStatusCode() == 404) { + throw new KeyNotFoundException(this, key, e); + } else { + throw new StorageBackendException("Failed to fetch " + key, e); + } + } + } + + @Override + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { + try { + return blobContainerClient.getBlobClient(key.value()).openInputStream( + new BlobRange(range.from, (long) range.size()), null); + } catch (final BlobStorageException e) { + if (e.getStatusCode() == 404) { + throw new KeyNotFoundException(this, key, e); + } else if (e.getStatusCode() == 416) { + throw new InvalidRangeException("Invalid range " + range, e); + } else { + throw new StorageBackendException("Failed to fetch " + key, e); + } + } + } + + @Override + public void delete(final ObjectKey key) throws StorageBackendException { + try { + blobContainerClient.getBlobClient(key.value()).deleteIfExists(); + } catch (final BlobStorageException e) { + throw new StorageBackendException("Failed to delete " + key, e); + } + } + + @Override + public String toString() { + return "AzureStorage{" + + "containerName='" + config.containerName() + '\'' + + '}'; + } +} diff --git a/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureStorageConfig.java b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureStorageConfig.java new file mode 100644 index 000000000..79bcd0496 --- /dev/null +++ b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureStorageConfig.java @@ -0,0 +1,136 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; + +import io.aiven.kafka.tieredstorage.config.validators.NonEmptyPassword; +import io.aiven.kafka.tieredstorage.config.validators.Null; +import io.aiven.kafka.tieredstorage.config.validators.ValidUrl; + +public class AzureStorageConfig extends AbstractConfig { + static final String AZURE_ACCOUNT_NAME_CONFIG = "azure.account.name"; + private static final String AZURE_ACCOUNT_NAME_DOC = "Azure account name"; + + static final String AZURE_ACCOUNT_KEY_CONFIG = "azure.account.key"; + private static final String AZURE_ACCOUNT_KEY_DOC = "Azure account key"; + + static final String AZURE_CONTAINER_NAME_CONFIG = "azure.container.name"; + private static final String AZURE_CONTAINER_NAME_DOC = "Azure container to store log segments"; + + static final String AZURE_ENDPOINT_URL_CONFIG = "azure.endpoint.url"; + private static final String AZURE_ENDPOINT_URL_DOC = "Custom Azure Blob Storage endpoint URL"; + + static final String AZURE_CONNECTION_STRING_CONFIG = "azure.connection.string"; + private static final String AZURE_CONNECTION_STRING_DOC = "Azure connection string. " + + "Cannot be used together with azure.account.name, azure.account.key, and azure.endpoint.url"; + + private static final ConfigDef CONFIG; + + static { + CONFIG = new ConfigDef() + .define( + AZURE_ACCOUNT_NAME_CONFIG, + ConfigDef.Type.STRING, + null, + Null.or(new ConfigDef.NonEmptyString()), + ConfigDef.Importance.HIGH, + AZURE_ACCOUNT_NAME_DOC) + .define( + AZURE_ACCOUNT_KEY_CONFIG, + ConfigDef.Type.PASSWORD, + null, + Null.or(new NonEmptyPassword()), + ConfigDef.Importance.MEDIUM, + AZURE_ACCOUNT_KEY_DOC) + .define( + AZURE_CONTAINER_NAME_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.NO_DEFAULT_VALUE, + new ConfigDef.NonEmptyString(), + ConfigDef.Importance.HIGH, + AZURE_CONTAINER_NAME_DOC) + .define( + AZURE_ENDPOINT_URL_CONFIG, + ConfigDef.Type.STRING, + null, + Null.or(new ValidUrl()), + ConfigDef.Importance.LOW, + AZURE_ENDPOINT_URL_DOC) + .define( + AZURE_CONNECTION_STRING_CONFIG, + ConfigDef.Type.PASSWORD, + null, + Null.or(new NonEmptyPassword()), + ConfigDef.Importance.MEDIUM, + AZURE_CONNECTION_STRING_DOC); + } + + public AzureStorageConfig(final Map props) { + super(CONFIG, props); + validate(); + } + + private void validate() { + if (connectionString() != null) { + if (accountName() != null) { + throw new ConfigException( + "\"azure.connection.string\" cannot be set together with \"azure.account.name\"."); + } + if (accountKey() != null) { + throw new ConfigException( + "\"azure.connection.string\" cannot be set together with \"azure.account.key\"."); + } + if (endpointUrl() != null) { + throw new ConfigException( + "\"azure.connection.string\" cannot be set together with \"azure.endpoint.url\"."); + } + } else { + if (accountName() == null) { + throw new ConfigException( + "\"azure.account.name\" must be set if \"azure.connection.string\" is not set."); + } + } + } + + String accountName() { + return getString(AZURE_ACCOUNT_NAME_CONFIG); + } + + String accountKey() { + final Password key = getPassword(AZURE_ACCOUNT_KEY_CONFIG); + return key == null ? null : key.value(); + } + + String containerName() { + return getString(AZURE_CONTAINER_NAME_CONFIG); + } + + String endpointUrl() { + return getString(AZURE_ENDPOINT_URL_CONFIG); + } + + String connectionString() { + final Password connectionString = getPassword(AZURE_CONNECTION_STRING_CONFIG); + return connectionString == null ? null : connectionString.value(); + } +} diff --git a/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java b/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java new file mode 100644 index 000000000..2757b4bab --- /dev/null +++ b/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java @@ -0,0 +1,177 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class AzureBlobStorageConfigTest { + private static final String ACCOUNT_NAME = "account1"; + private static final String ACCOUNT_KEY = "account_key"; + private static final String CONTAINER_NAME = "c1"; + private static final String ENDPOINT = "http://localhost:10000/"; + // The well-known Azurite connection string. + private static final String CONNECTION_STRING = "DefaultEndpointsProtocol=http;" + + "AccountName=devstoreaccount1;" + + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" + + "BlobEndpoint=http://localhost:10000/devstoreaccount1;"; + + @Test + void minimalConfig() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.container.name", CONTAINER_NAME + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.containerName()).isEqualTo(CONTAINER_NAME); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isNull(); + assertThat(config.endpointUrl()).isNull(); + assertThat(config.connectionString()).isNull(); + } + + @Test + void shouldRequireContainerName() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("Missing required configuration \"azure.container.name\" which has no default value."); + } + + @Test + void shouldRequireAccountNameIfNoConnectionString() { + final var configs = Map.of( + "azure.container.name", CONTAINER_NAME + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("\"azure.account.name\" must be set if \"azure.connection.string\" is not set."); + } + + @Test + void authAccountName() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.container.name", CONTAINER_NAME + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isNull(); + assertThat(config.endpointUrl()).isNull(); + assertThat(config.connectionString()).isNull(); + } + + @Test + void authAccountNameAndEndpoint() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.container.name", CONTAINER_NAME, + "azure.endpoint.url", ENDPOINT + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isNull(); + assertThat(config.endpointUrl()).isEqualTo(ENDPOINT); + assertThat(config.connectionString()).isNull(); + } + + @Test + void authAccountNameAndKey() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.account.key", ACCOUNT_KEY, + "azure.container.name", CONTAINER_NAME + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isEqualTo(ACCOUNT_KEY); + assertThat(config.endpointUrl()).isNull(); + assertThat(config.connectionString()).isNull(); + } + + @Test + void authAccountNameAndKeyAndEndpoint() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.account.key", ACCOUNT_KEY, + "azure.container.name", CONTAINER_NAME, + "azure.endpoint.url", ENDPOINT + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isEqualTo(ACCOUNT_KEY); + assertThat(config.endpointUrl()).isEqualTo(ENDPOINT); + assertThat(config.connectionString()).isNull(); + } + + @Test + void authConnectionString() { + final var configs = Map.of( + "azure.connection.string", CONNECTION_STRING, + "azure.container.name", CONTAINER_NAME + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isNull(); + assertThat(config.accountKey()).isNull(); + assertThat(config.endpointUrl()).isNull(); + assertThat(config.connectionString()).isEqualTo(CONNECTION_STRING); + } + + @Test + void connectionStringAndAccountNameClash() { + final var configs = Map.of( + "azure.connection.string", CONNECTION_STRING, + "azure.container.name", CONTAINER_NAME, + "azure.account.name", ACCOUNT_NAME + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("\"azure.connection.string\" cannot be set together with \"azure.account.name\"."); + } + + @Test + void connectionStringAndAccountKeyClash() { + final var configs = Map.of( + "azure.connection.string", CONNECTION_STRING, + "azure.container.name", CONTAINER_NAME, + "azure.account.key", ACCOUNT_KEY + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("\"azure.connection.string\" cannot be set together with \"azure.account.key\"."); + } + + @Test + void connectionStringAndEndpointClash() { + final var configs = Map.of( + "azure.connection.string", CONNECTION_STRING, + "azure.container.name", CONTAINER_NAME, + "azure.endpoint.url", ENDPOINT + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("\"azure.connection.string\" cannot be set together with \"azure.endpoint.url\"."); + } +} diff --git a/storage/azure/src/test/resources/log4j.properties b/storage/azure/src/test/resources/log4j.properties new file mode 100644 index 000000000..3e86cfa01 --- /dev/null +++ b/storage/azure/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +# +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java index 5a57fab46..54d02cf90 100644 --- a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java @@ -135,7 +135,7 @@ void testFetchWithOffsetRangeLargerThanFileSize() throws IOException, StorageBac } @Test - void testFetchWithRangeOutsideFileSize() throws StorageBackendException { + protected void testFetchWithRangeOutsideFileSize() throws StorageBackendException { final String content = "ABC"; storage().upload(new ByteArrayInputStream(content.getBytes()), TOPIC_PARTITION_SEGMENT_KEY);