From d875630c86d9db62d58eebc233a90a2c8b9db585 Mon Sep 17 00:00:00 2001 From: Ivan Yurchenko Date: Wed, 18 Oct 2023 07:04:41 +0300 Subject: [PATCH] Add support for Azure Blob Storage Metrics are not supported in this PR and will be added in another one. Related to https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/issues/421 --- .../main_push_and_pull_request_workflow.yml | 2 +- Makefile | 5 +- README.md | 2 +- build.gradle | 2 + checkstyle/suppressions.xml | 1 + demo/Makefile | 13 ++ demo/README.md | 31 +++ demo/compose-azure-blob-azurite.yml | 90 +++++++++ docker/Dockerfile | 8 +- e2e/build.gradle | 3 + .../e2e/AzureSingleBrokerTest.java | 106 +++++++++++ settings.gradle | 1 + storage/azure/build.gradle | 35 ++++ .../azure/AzureBlobStorageAccountKeyTest.java | 40 ++++ .../AzureBlobStorageConnectionStringTest.java | 34 ++++ .../storage/azure/AzureBlobStorageTest.java | 92 +++++++++ .../resources/log4j.properties | 21 +++ .../storage/azure/AzureBlobStorage.java | 149 +++++++++++++++ .../storage/azure/AzureStorageConfig.java | 136 ++++++++++++++ .../azure/AzureBlobStorageConfigTest.java | 177 ++++++++++++++++++ .../azure/src/test/resources/log4j.properties | 21 +++ .../storage/BaseStorageTest.java | 2 +- 22 files changed, 966 insertions(+), 5 deletions(-) create mode 100644 demo/compose-azure-blob-azurite.yml create mode 100644 e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java create mode 100644 storage/azure/build.gradle create mode 100644 storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java create mode 100644 storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java create mode 100644 storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java create mode 100644 storage/azure/src/integration-test/resources/log4j.properties create mode 100644 storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java create mode 100644 storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureStorageConfig.java create mode 100644 storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java create mode 100644 storage/azure/src/test/resources/log4j.properties diff --git a/.github/workflows/main_push_and_pull_request_workflow.yml b/.github/workflows/main_push_and_pull_request_workflow.yml index 3ec679be0..307acb9b7 100644 --- a/.github/workflows/main_push_and_pull_request_workflow.yml +++ b/.github/workflows/main_push_and_pull_request_workflow.yml @@ -35,7 +35,7 @@ jobs: strategy: matrix: java-version: [ 11 ] - test: [ 'LocalSystem', 'S3', 'Gcs' ] + test: [ 'LocalSystem', 'S3', 'Gcs', 'Azure' ] name: E2E tests for ${{ matrix.test }} with jdk ${{ matrix.java-version }} runs-on: ubuntu-latest steps: diff --git a/Makefile b/Makefile index b5d9deaad..3dcce492a 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ clean: checkstyle: ./gradlew checkstyleMain checkstyleTest checkstyleIntegrationTest -build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz +build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz storage/gcs/build/distributions/azure-$(VERSION).tgz build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz: ./gradlew build distTar -x test -x integrationTest -x e2e:test @@ -39,6 +39,9 @@ storage/s3/build/distributions/s3-$(VERSION).tgz: storage/gcs/build/distributions/gcs-$(VERSION).tgz: ./gradlew build :storage:gcs:distTar -x test -x integrationTest -x e2e:test +storage/gcs/build/distributions/azure-$(VERSION).tgz: + ./gradlew build :storage:azure:distTar -x test -x integrationTest -x e2e:test + test: build ./gradlew test -x e2e:test diff --git a/README.md b/README.md index 3eb84b86f..81e705cb1 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ This project is an implementation of `RemoteStorageManager` for Apache Kafka tiered storage. -The implementation will have multiple configurable storage backends. Currently, AWS S3 and Google Cloud Storage are supported. We intend to support Azure Blob Storage in the near future. +The implementation will have multiple configurable storage backends. Currently, AWS S3, Google Cloud Storage, and Azure Blob Storage are supported. The project follows the API specifications according to the latest version of [KIP-405: Kafka Tiered Storage](https://cwiki.apache.org/confluence/x/KJDQBQ). diff --git a/build.gradle b/build.gradle index 244d22dca..ea7004954 100644 --- a/build.gradle +++ b/build.gradle @@ -119,6 +119,8 @@ subprojects { gcpSdkVersion = "2.26.1" + azureSdkVersion = "1.2.17" + testcontainersVersion = "1.19.0" testcontainersFakeGcsServerVersion = "0.2.0" diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index df303d627..ed2d23d9d 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -28,6 +28,7 @@ + diff --git a/demo/Makefile b/demo/Makefile index dfd30a0c6..e5f880ae2 100644 --- a/demo/Makefile +++ b/demo/Makefile @@ -111,12 +111,17 @@ run_s3_minio: run_gcs_fake_gcs_server: docker compose -f compose-gcs-fake-gcs-server.yml up +.PHONY: run_azure_blob_azurite +run_azure_blob_azurite: + docker compose -f compose-azure-blob-azurite.yml up + .PHONY: clean clean: docker compose -f compose-local-fs.yml down docker compose -f compose-s3-aws.yml down docker compose -f compose-s3-minio.yml down docker compose -f compose-gcs-fake-gcs-server.yml down + docker compose -f compose-azure-blob-azurite.yml down .PHONY: show_local_data show_local_data: @@ -141,6 +146,14 @@ show_remote_data_s3_minio: show_remote_data_gcs_fake_gcs_server: curl http://localhost:4443/storage/v1/b/test-bucket/o | jq -r '.items | map(.name) | .[]' +.PHONY: show_remote_data_azurite +show_remote_data_azurite: + docker run --rm --network=host mcr.microsoft.com/azure-cli \ + az storage blob list --container-name test-container \ + --account-name devstoreaccount1 \ + --account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== \ + --blob-endpoint http://127.0.0.1:10000/devstoreaccount1 --output table | grep $(topic) + offset = 0 consume = 10 .PHONY: consume diff --git a/demo/README.md b/demo/README.md index 0d23139fa..536b9ffe2 100644 --- a/demo/README.md +++ b/demo/README.md @@ -159,6 +159,37 @@ make show_local_data make consume ``` +### Azurite as remote storage: `compose-azure-blob-azurite.yml` + +This scenario uses `AzureBlobStorage` with [Azurite](https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite) as the remote storage. + +```bash +# Start the compose +make run_azure_blob_azurite + +# Create the topic with any variation +make create_topic_by_size_ts +# or +# make create_topic_by_time_ts +# or with TS disabled +# make create_topic_*_no_ts + +# Fill the topic +make fill_topic + +# See that segments are uploaded to the remote storage +# (this may take several seconds) +make show_remote_data_azurite + +# Check that early segments are deleted +# (completely or renamed with `.deleted` suffix) +# from the local storage (this may take several seconds) +make show_local_data + +# Check the data is consumable +make consume +``` + ## Additional features ### Encryption diff --git a/demo/compose-azure-blob-azurite.yml b/demo/compose-azure-blob-azurite.yml new file mode 100644 index 000000000..696a0ddb8 --- /dev/null +++ b/demo/compose-azure-blob-azurite.yml @@ -0,0 +1,90 @@ +## +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## +version: '3.8' +services: + zookeeper: + image: "confluentinc/cp-zookeeper:7.3.3" + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: "aivenoy/kafka-with-ts-plugin" + container_name: "kafka-ts" + depends_on: + - zookeeper + - azurite + ports: + - "9092:9092" + - "7000:7000" #prometheus metrics + environment: + KAFKA_BROKER_ID: 0 + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:29092" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092,BROKER://kafka:29092" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT" + KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: false + # Increase Tiered Storage log level + KAFKA_LOG4J_LOGGERS: "io.aiven.kafka.tieredstorage=DEBUG" + # Tweak retention checking + KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000 + # Enable Tiered Storage + KAFKA_REMOTE_LOG_STORAGE_SYSTEM_ENABLE: true + KAFKA_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS: 5000 + # Remote metadata manager + KAFKA_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME: "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager" + KAFKA_REMOTE_LOG_METADATA_MANAGER_IMPL_PREFIX: "rlmm.config." + KAFKA_REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME: "BROKER" + KAFKA_RLMM_CONFIG_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR: 1 + # Remote storage manager + KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH: "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*" + KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_NAME: "io.aiven.kafka.tieredstorage.RemoteStorageManager" + KAFKA_REMOTE_LOG_STORAGE_MANAGER_IMPL_PREFIX: "rsm.config." + KAFKA_RSM_CONFIG_CHUNK_SIZE: 5242880 # 5MiB + KAFKA_RSM_CONFIG_CHUNK_CACHE_CLASS: "io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache" + KAFKA_RSM_CONFIG_CHUNK_CACHE_SIZE: -1 + KAFKA_RSM_CONFIG_CUSTOM_METADATA_FIELDS_INCLUDE: "REMOTE_SIZE" + # Storage backend + KAFKA_RSM_CONFIG_KEY_PREFIX: "tiered-storage-demo/" + KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS: "io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage" + KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME: "test-container" + # The well-known account Azurite name and key. + KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME: "devstoreaccount1" + KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" + KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL: "http://azurite:10000/devstoreaccount1" + + azurite: + image: mcr.microsoft.com/azure-storage/azurite + ports: + - "10000:10000" + command: azurite-blob --blobHost 0.0.0.0 + + azurite-create-container: + image: mcr.microsoft.com/azure-cli + restart: "no" + depends_on: + - azurite + command: > + az storage container create --name test-container + --account-name devstoreaccount1 + --account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + --blob-endpoint http://azurite:10000/devstoreaccount1 diff --git a/docker/Dockerfile b/docker/Dockerfile index ffda0574f..05230ecbc 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -21,7 +21,8 @@ USER root RUN mkdir -p /tiered-storage-for-apache-kafka/core \ && mkdir -p /tiered-storage-for-apache-kafka/s3 \ - && mkdir -p /tiered-storage-for-apache-kafka/gcs + && mkdir -p /tiered-storage-for-apache-kafka/gcs \ + && mkdir -p /tiered-storage-for-apache-kafka/azure COPY build/distributions/tiered-storage-for-apache-kafka-${_VERSION}.tgz /tiered-storage-for-apache-kafka/core RUN cd /tiered-storage-for-apache-kafka/core \ @@ -38,6 +39,11 @@ RUN cd /tiered-storage-for-apache-kafka/gcs \ && tar -xf gcs-${_VERSION}.tgz --strip-components=1 \ && rm gcs-${_VERSION}.tgz +COPY storage/azure/build/distributions/azure-${_VERSION}.tgz /tiered-storage-for-apache-kafka/azure +RUN cd /tiered-storage-for-apache-kafka/azure \ + && tar -xf azure-${_VERSION}.tgz --strip-components=1 \ + && rm azure-${_VERSION}.tgz + # Installing JMX exporter agent ARG JMX_EXPORTER_VERSION=0.18.0 RUN mkdir -p /opt/prometheus/jmx-exporter diff --git a/e2e/build.gradle b/e2e/build.gradle index e3903c67e..582904b2a 100644 --- a/e2e/build.gradle +++ b/e2e/build.gradle @@ -31,6 +31,9 @@ dependencies { testImplementation "io.aiven:testcontainers-fake-gcs-server:$testcontainersFakeGcsServerVersion" + testImplementation platform("com.azure:azure-sdk-bom:$azureSdkVersion") + testImplementation "com.azure:azure-storage-blob" + testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" } diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java new file mode 100644 index 000000000..93d738540 --- /dev/null +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.e2e; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; + +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobItem; +import com.azure.storage.blob.models.ListBlobsOptions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +public class AzureSingleBrokerTest extends SingleBrokerTest { + private static final int BLOB_STORAGE_PORT = 10000; + static final String NETWORK_ALIAS = "blob-storage"; + + static final GenericContainer AZURITE_SERVER = + new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite")) + .withExposedPorts(BLOB_STORAGE_PORT) + .withCommand("azurite-blob --blobHost 0.0.0.0") + .withNetwork(NETWORK) + .withNetworkAliases(NETWORK_ALIAS); + + static final String AZURE_CONTAINER = "test-container"; + + static BlobContainerClient blobContainerClient; + + @BeforeAll + static void init() throws Exception { + AZURITE_SERVER.start(); + + // The well-known Azurite account name and key. + final String accountName = "devstoreaccount1"; + final String accountKey = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + + final String endpointForTestCode = + "http://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1"; + final String connectionString = "DefaultEndpointsProtocol=http;" + + "AccountName=" + accountName + ";" + + "AccountKey=" + accountKey + ";" + + "BlobEndpoint=" + endpointForTestCode + ";"; + final var blobServiceClient = new BlobServiceClientBuilder() + .connectionString(connectionString) + .buildClient(); + blobContainerClient = blobServiceClient.createBlobContainer(AZURE_CONTAINER); + + final String endpointForKafka = "http://" + NETWORK_ALIAS + ":" + BLOB_STORAGE_PORT + "/devstoreaccount1"; + setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS", + "io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage") + .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH", + "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*") + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME", AZURE_CONTAINER) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME", accountName) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY", accountKey) + .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL", endpointForKafka) + .dependsOn(AZURITE_SERVER)); + } + + @AfterAll + static void cleanup() { + stopKafka(); + + AZURITE_SERVER.stop(); + + cleanupStorage(); + } + + @Override + boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) { + final String prefix = String.format("%s-%s", topicName, topicId.toString()); + + final var list = blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(prefix), null); + return list.stream().findAny().isEmpty(); + } + + @Override + List remotePartitionFiles(final TopicIdPartition topicIdPartition) { + return blobContainerClient.listBlobs().stream() + .map(BlobItem::getName) + .map(k -> k.substring(k.lastIndexOf('/') + 1)) + .sorted() + .collect(Collectors.toList()); + } +} diff --git a/settings.gradle b/settings.gradle index 425fbe138..1d6840abc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,6 +19,7 @@ include 'core' include 'storage' include 'storage:core' include 'storage:filesystem' +include 'storage:azure' include 'storage:gcs' include 'storage:s3' include 'e2e' diff --git a/storage/azure/build.gradle b/storage/azure/build.gradle new file mode 100644 index 000000000..0d5b46f88 --- /dev/null +++ b/storage/azure/build.gradle @@ -0,0 +1,35 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +archivesBaseName = "storage-azure" + +dependencies { + implementation project(":storage:core") + + implementation platform("com.azure:azure-sdk-bom:$azureSdkVersion") + implementation ("com.azure:azure-identity") { + exclude group: "org.slf4j" + } + implementation ("com.azure:azure-storage-blob") { + exclude group: "org.slf4j" + } + + implementation project(":commons") + + testImplementation(testFixtures(project(":storage:core"))) + + testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion" +} diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java new file mode 100644 index 000000000..f1a6dfdfa --- /dev/null +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.Map; + +import io.aiven.kafka.tieredstorage.storage.StorageBackend; + +class AzureBlobStorageAccountKeyTest extends AzureBlobStorageTest { + @Override + protected StorageBackend storage() { + final AzureBlobStorage azureBlobStorage = new AzureBlobStorage(); + // The well-known Azurite account name and key. + final String accountName = "devstoreaccount1"; + final String accountKey = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + final Map configs = Map.of( + "azure.container.name", azureContainerName, + "azure.account.name", accountName, + "azure.account.key", accountKey, + "azure.endpoint.url", endpoint() + ); + azureBlobStorage.configure(configs); + return azureBlobStorage; + } +} diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java new file mode 100644 index 000000000..6ab5e2b73 --- /dev/null +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.Map; + +import io.aiven.kafka.tieredstorage.storage.StorageBackend; + +class AzureBlobStorageConnectionStringTest extends AzureBlobStorageTest { + @Override + protected StorageBackend storage() { + final AzureBlobStorage azureBlobStorage = new AzureBlobStorage(); + final Map configs = Map.of( + "azure.container.name", azureContainerName, + "azure.connection.string", connectionString() + ); + azureBlobStorage.configure(configs); + return azureBlobStorage; + } +} diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java new file mode 100644 index 000000000..55f72746f --- /dev/null +++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.io.ByteArrayInputStream; + +import io.aiven.kafka.tieredstorage.storage.BaseStorageTest; +import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; + +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@Testcontainers +abstract class AzureBlobStorageTest extends BaseStorageTest { + private static final int BLOB_STORAGE_PORT = 10000; + @Container + static final GenericContainer AZURITE_SERVER = + new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite")) + .withExposedPorts(BLOB_STORAGE_PORT) + .withCommand("azurite-blob --blobHost 0.0.0.0"); + + static BlobServiceClient blobServiceClient; + + protected String azureContainerName; + + protected static String endpoint() { + return "http://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1"; + } + + protected static String connectionString() { + // The well-known Azurite connection string. + return "DefaultEndpointsProtocol=http;" + + "AccountName=devstoreaccount1;" + + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" + + "BlobEndpoint=" + endpoint() + ";"; + } + + @BeforeAll + static void setUpClass() { + blobServiceClient = new BlobServiceClientBuilder() + .connectionString(connectionString()) + .buildClient(); + } + + @BeforeEach + void setUp(final TestInfo testInfo) { + azureContainerName = testInfo.getDisplayName() + .toLowerCase() + .replace("(", "") + .replace(")", ""); + while (azureContainerName.length() < 3) { + azureContainerName += azureContainerName; + } + blobServiceClient.createBlobContainer(azureContainerName); + } + + @Override + protected void testFetchWithRangeOutsideFileSize() throws StorageBackendException { + // For some reason, Azure (or only Azurite) considers range 3-5 valid for a 3-byte blob, + // so the generic test fails. + final String content = "ABC"; + storage().upload(new ByteArrayInputStream(content.getBytes()), TOPIC_PARTITION_SEGMENT_KEY); + + assertThatThrownBy(() -> storage().fetch(TOPIC_PARTITION_SEGMENT_KEY, BytesRange.of(4, 6))) + .isInstanceOf(InvalidRangeException.class); + } +} diff --git a/storage/azure/src/integration-test/resources/log4j.properties b/storage/azure/src/integration-test/resources/log4j.properties new file mode 100644 index 000000000..3e86cfa01 --- /dev/null +++ b/storage/azure/src/integration-test/resources/log4j.properties @@ -0,0 +1,21 @@ +# +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n diff --git a/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java new file mode 100644 index 000000000..bf04e0878 --- /dev/null +++ b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java @@ -0,0 +1,149 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +import io.aiven.kafka.tieredstorage.storage.BytesRange; +import io.aiven.kafka.tieredstorage.storage.InvalidRangeException; +import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException; +import io.aiven.kafka.tieredstorage.storage.ObjectKey; +import io.aiven.kafka.tieredstorage.storage.StorageBackend; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; + +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobRange; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; + +public class AzureBlobStorage implements StorageBackend { + private AzureStorageConfig config; + private BlobContainerClient blobContainerClient; + + @Override + public void configure(final Map configs) { + final AzureStorageConfig config = new AzureStorageConfig(configs); + this.config = config; + + final BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder(); + if (config.connectionString() != null) { + blobServiceClientBuilder.connectionString(config.connectionString()); + } else { + blobServiceClientBuilder.endpoint(endpointUrl()); + + if (config.accountKey() == null) { + blobServiceClientBuilder.credential( + new DefaultAzureCredentialBuilder().build()); + } else { + blobServiceClientBuilder.credential( + new StorageSharedKeyCredential(config.accountName(), config.accountKey())); + } + } + + blobContainerClient = blobServiceClientBuilder.buildClient() + .getBlobContainerClient(config.containerName()); + } + + private String endpointUrl() { + if (config.endpointUrl() != null) { + return config.endpointUrl(); + } else { + return "https://" + config.accountName() + ".blob.core.windows.net"; + } + } + + @Override + public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException { + final var specializedBlobClientBuilder = new SpecializedBlobClientBuilder(); + if (config.connectionString() != null) { + specializedBlobClientBuilder.connectionString(config.connectionString()); + } else { + specializedBlobClientBuilder.endpoint(endpointUrl()); + + if (config.accountKey() == null) { + specializedBlobClientBuilder.credential( + new DefaultAzureCredentialBuilder().build()); + } else { + specializedBlobClientBuilder.credential( + new StorageSharedKeyCredential(config.accountName(), config.accountKey())); + } + } + final BlockBlobClient blockBlobClient = specializedBlobClientBuilder + .containerName(config.containerName()) + .blobName(key.value()) + .buildBlockBlobClient(); + + try (OutputStream os = new BufferedOutputStream(blockBlobClient.getBlobOutputStream(true))) { + return inputStream.transferTo(os); + } catch (final IOException e) { + throw new StorageBackendException("Failed to upload " + key, e); + } + } + + @Override + public InputStream fetch(final ObjectKey key) throws StorageBackendException { + try { + return blobContainerClient.getBlobClient(key.value()).openInputStream(); + } catch (final BlobStorageException e) { + if (e.getStatusCode() == 404) { + throw new KeyNotFoundException(this, key, e); + } else { + throw new StorageBackendException("Failed to fetch " + key, e); + } + } + } + + @Override + public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException { + try { + return blobContainerClient.getBlobClient(key.value()).openInputStream( + new BlobRange(range.from, (long) range.size()), null); + } catch (final BlobStorageException e) { + if (e.getStatusCode() == 404) { + throw new KeyNotFoundException(this, key, e); + } else if (e.getStatusCode() == 416) { + throw new InvalidRangeException("Invalid range " + range, e); + } else { + throw new StorageBackendException("Failed to fetch " + key, e); + } + } + } + + @Override + public void delete(final ObjectKey key) throws StorageBackendException { + try { + blobContainerClient.getBlobClient(key.value()).deleteIfExists(); + } catch (final BlobStorageException e) { + throw new StorageBackendException("Failed to delete " + key, e); + } + } + + @Override + public String toString() { + return "AzureStorage{" + + "containerName='" + config.containerName() + '\'' + + '}'; + } +} diff --git a/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureStorageConfig.java b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureStorageConfig.java new file mode 100644 index 000000000..79bcd0496 --- /dev/null +++ b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureStorageConfig.java @@ -0,0 +1,136 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; + +import io.aiven.kafka.tieredstorage.config.validators.NonEmptyPassword; +import io.aiven.kafka.tieredstorage.config.validators.Null; +import io.aiven.kafka.tieredstorage.config.validators.ValidUrl; + +public class AzureStorageConfig extends AbstractConfig { + static final String AZURE_ACCOUNT_NAME_CONFIG = "azure.account.name"; + private static final String AZURE_ACCOUNT_NAME_DOC = "Azure account name"; + + static final String AZURE_ACCOUNT_KEY_CONFIG = "azure.account.key"; + private static final String AZURE_ACCOUNT_KEY_DOC = "Azure account key"; + + static final String AZURE_CONTAINER_NAME_CONFIG = "azure.container.name"; + private static final String AZURE_CONTAINER_NAME_DOC = "Azure container to store log segments"; + + static final String AZURE_ENDPOINT_URL_CONFIG = "azure.endpoint.url"; + private static final String AZURE_ENDPOINT_URL_DOC = "Custom Azure Blob Storage endpoint URL"; + + static final String AZURE_CONNECTION_STRING_CONFIG = "azure.connection.string"; + private static final String AZURE_CONNECTION_STRING_DOC = "Azure connection string. " + + "Cannot be used together with azure.account.name, azure.account.key, and azure.endpoint.url"; + + private static final ConfigDef CONFIG; + + static { + CONFIG = new ConfigDef() + .define( + AZURE_ACCOUNT_NAME_CONFIG, + ConfigDef.Type.STRING, + null, + Null.or(new ConfigDef.NonEmptyString()), + ConfigDef.Importance.HIGH, + AZURE_ACCOUNT_NAME_DOC) + .define( + AZURE_ACCOUNT_KEY_CONFIG, + ConfigDef.Type.PASSWORD, + null, + Null.or(new NonEmptyPassword()), + ConfigDef.Importance.MEDIUM, + AZURE_ACCOUNT_KEY_DOC) + .define( + AZURE_CONTAINER_NAME_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.NO_DEFAULT_VALUE, + new ConfigDef.NonEmptyString(), + ConfigDef.Importance.HIGH, + AZURE_CONTAINER_NAME_DOC) + .define( + AZURE_ENDPOINT_URL_CONFIG, + ConfigDef.Type.STRING, + null, + Null.or(new ValidUrl()), + ConfigDef.Importance.LOW, + AZURE_ENDPOINT_URL_DOC) + .define( + AZURE_CONNECTION_STRING_CONFIG, + ConfigDef.Type.PASSWORD, + null, + Null.or(new NonEmptyPassword()), + ConfigDef.Importance.MEDIUM, + AZURE_CONNECTION_STRING_DOC); + } + + public AzureStorageConfig(final Map props) { + super(CONFIG, props); + validate(); + } + + private void validate() { + if (connectionString() != null) { + if (accountName() != null) { + throw new ConfigException( + "\"azure.connection.string\" cannot be set together with \"azure.account.name\"."); + } + if (accountKey() != null) { + throw new ConfigException( + "\"azure.connection.string\" cannot be set together with \"azure.account.key\"."); + } + if (endpointUrl() != null) { + throw new ConfigException( + "\"azure.connection.string\" cannot be set together with \"azure.endpoint.url\"."); + } + } else { + if (accountName() == null) { + throw new ConfigException( + "\"azure.account.name\" must be set if \"azure.connection.string\" is not set."); + } + } + } + + String accountName() { + return getString(AZURE_ACCOUNT_NAME_CONFIG); + } + + String accountKey() { + final Password key = getPassword(AZURE_ACCOUNT_KEY_CONFIG); + return key == null ? null : key.value(); + } + + String containerName() { + return getString(AZURE_CONTAINER_NAME_CONFIG); + } + + String endpointUrl() { + return getString(AZURE_ENDPOINT_URL_CONFIG); + } + + String connectionString() { + final Password connectionString = getPassword(AZURE_CONNECTION_STRING_CONFIG); + return connectionString == null ? null : connectionString.value(); + } +} diff --git a/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java b/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java new file mode 100644 index 000000000..2757b4bab --- /dev/null +++ b/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java @@ -0,0 +1,177 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.storage.azure; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class AzureBlobStorageConfigTest { + private static final String ACCOUNT_NAME = "account1"; + private static final String ACCOUNT_KEY = "account_key"; + private static final String CONTAINER_NAME = "c1"; + private static final String ENDPOINT = "http://localhost:10000/"; + // The well-known Azurite connection string. + private static final String CONNECTION_STRING = "DefaultEndpointsProtocol=http;" + + "AccountName=devstoreaccount1;" + + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" + + "BlobEndpoint=http://localhost:10000/devstoreaccount1;"; + + @Test + void minimalConfig() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.container.name", CONTAINER_NAME + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.containerName()).isEqualTo(CONTAINER_NAME); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isNull(); + assertThat(config.endpointUrl()).isNull(); + assertThat(config.connectionString()).isNull(); + } + + @Test + void shouldRequireContainerName() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("Missing required configuration \"azure.container.name\" which has no default value."); + } + + @Test + void shouldRequireAccountNameIfNoConnectionString() { + final var configs = Map.of( + "azure.container.name", CONTAINER_NAME + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("\"azure.account.name\" must be set if \"azure.connection.string\" is not set."); + } + + @Test + void authAccountName() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.container.name", CONTAINER_NAME + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isNull(); + assertThat(config.endpointUrl()).isNull(); + assertThat(config.connectionString()).isNull(); + } + + @Test + void authAccountNameAndEndpoint() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.container.name", CONTAINER_NAME, + "azure.endpoint.url", ENDPOINT + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isNull(); + assertThat(config.endpointUrl()).isEqualTo(ENDPOINT); + assertThat(config.connectionString()).isNull(); + } + + @Test + void authAccountNameAndKey() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.account.key", ACCOUNT_KEY, + "azure.container.name", CONTAINER_NAME + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isEqualTo(ACCOUNT_KEY); + assertThat(config.endpointUrl()).isNull(); + assertThat(config.connectionString()).isNull(); + } + + @Test + void authAccountNameAndKeyAndEndpoint() { + final var configs = Map.of( + "azure.account.name", ACCOUNT_NAME, + "azure.account.key", ACCOUNT_KEY, + "azure.container.name", CONTAINER_NAME, + "azure.endpoint.url", ENDPOINT + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME); + assertThat(config.accountKey()).isEqualTo(ACCOUNT_KEY); + assertThat(config.endpointUrl()).isEqualTo(ENDPOINT); + assertThat(config.connectionString()).isNull(); + } + + @Test + void authConnectionString() { + final var configs = Map.of( + "azure.connection.string", CONNECTION_STRING, + "azure.container.name", CONTAINER_NAME + ); + final var config = new AzureStorageConfig(configs); + assertThat(config.accountName()).isNull(); + assertThat(config.accountKey()).isNull(); + assertThat(config.endpointUrl()).isNull(); + assertThat(config.connectionString()).isEqualTo(CONNECTION_STRING); + } + + @Test + void connectionStringAndAccountNameClash() { + final var configs = Map.of( + "azure.connection.string", CONNECTION_STRING, + "azure.container.name", CONTAINER_NAME, + "azure.account.name", ACCOUNT_NAME + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("\"azure.connection.string\" cannot be set together with \"azure.account.name\"."); + } + + @Test + void connectionStringAndAccountKeyClash() { + final var configs = Map.of( + "azure.connection.string", CONNECTION_STRING, + "azure.container.name", CONTAINER_NAME, + "azure.account.key", ACCOUNT_KEY + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("\"azure.connection.string\" cannot be set together with \"azure.account.key\"."); + } + + @Test + void connectionStringAndEndpointClash() { + final var configs = Map.of( + "azure.connection.string", CONNECTION_STRING, + "azure.container.name", CONTAINER_NAME, + "azure.endpoint.url", ENDPOINT + ); + assertThatThrownBy(() -> new AzureStorageConfig(configs)) + .isInstanceOf(ConfigException.class) + .hasMessage("\"azure.connection.string\" cannot be set together with \"azure.endpoint.url\"."); + } +} diff --git a/storage/azure/src/test/resources/log4j.properties b/storage/azure/src/test/resources/log4j.properties new file mode 100644 index 000000000..3e86cfa01 --- /dev/null +++ b/storage/azure/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +# +# Copyright 2023 Aiven Oy +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java index 5a57fab46..54d02cf90 100644 --- a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java +++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java @@ -135,7 +135,7 @@ void testFetchWithOffsetRangeLargerThanFileSize() throws IOException, StorageBac } @Test - void testFetchWithRangeOutsideFileSize() throws StorageBackendException { + protected void testFetchWithRangeOutsideFileSize() throws StorageBackendException { final String content = "ABC"; storage().upload(new ByteArrayInputStream(content.getBytes()), TOPIC_PARTITION_SEGMENT_KEY);