diff --git a/.github/workflows/main_push_and_pull_request_workflow.yml b/.github/workflows/main_push_and_pull_request_workflow.yml
index 3ec679be0..307acb9b7 100644
--- a/.github/workflows/main_push_and_pull_request_workflow.yml
+++ b/.github/workflows/main_push_and_pull_request_workflow.yml
@@ -35,7 +35,7 @@ jobs:
strategy:
matrix:
java-version: [ 11 ]
- test: [ 'LocalSystem', 'S3', 'Gcs' ]
+ test: [ 'LocalSystem', 'S3', 'Gcs', 'Azure' ]
name: E2E tests for ${{ matrix.test }} with jdk ${{ matrix.java-version }}
runs-on: ubuntu-latest
steps:
diff --git a/Makefile b/Makefile
index b5d9deaad..3dcce492a 100644
--- a/Makefile
+++ b/Makefile
@@ -28,7 +28,7 @@ clean:
checkstyle:
./gradlew checkstyleMain checkstyleTest checkstyleIntegrationTest
-build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz
+build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz storage/gcs/build/distributions/azure-$(VERSION).tgz
build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz:
./gradlew build distTar -x test -x integrationTest -x e2e:test
@@ -39,6 +39,9 @@ storage/s3/build/distributions/s3-$(VERSION).tgz:
storage/gcs/build/distributions/gcs-$(VERSION).tgz:
./gradlew build :storage:gcs:distTar -x test -x integrationTest -x e2e:test
+storage/gcs/build/distributions/azure-$(VERSION).tgz:
+ ./gradlew build :storage:azure:distTar -x test -x integrationTest -x e2e:test
+
test: build
./gradlew test -x e2e:test
diff --git a/README.md b/README.md
index 3eb84b86f..81e705cb1 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
This project is an implementation of `RemoteStorageManager` for Apache Kafka tiered storage.
-The implementation will have multiple configurable storage backends. Currently, AWS S3 and Google Cloud Storage are supported. We intend to support Azure Blob Storage in the near future.
+The implementation will have multiple configurable storage backends. Currently, AWS S3, Google Cloud Storage, and Azure Blob Storage are supported.
The project follows the API specifications according to the latest version of [KIP-405: Kafka Tiered Storage](https://cwiki.apache.org/confluence/x/KJDQBQ).
diff --git a/build.gradle b/build.gradle
index 244d22dca..ea7004954 100644
--- a/build.gradle
+++ b/build.gradle
@@ -119,6 +119,8 @@ subprojects {
gcpSdkVersion = "2.26.1"
+ azureSdkVersion = "1.2.17"
+
testcontainersVersion = "1.19.0"
testcontainersFakeGcsServerVersion = "0.2.0"
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index df303d627..ed2d23d9d 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -28,6 +28,7 @@
+
diff --git a/demo/Makefile b/demo/Makefile
index dfd30a0c6..e5f880ae2 100644
--- a/demo/Makefile
+++ b/demo/Makefile
@@ -111,12 +111,17 @@ run_s3_minio:
run_gcs_fake_gcs_server:
docker compose -f compose-gcs-fake-gcs-server.yml up
+.PHONY: run_azure_blob_azurite
+run_azure_blob_azurite:
+ docker compose -f compose-azure-blob-azurite.yml up
+
.PHONY: clean
clean:
docker compose -f compose-local-fs.yml down
docker compose -f compose-s3-aws.yml down
docker compose -f compose-s3-minio.yml down
docker compose -f compose-gcs-fake-gcs-server.yml down
+ docker compose -f compose-azure-blob-azurite.yml down
.PHONY: show_local_data
show_local_data:
@@ -141,6 +146,14 @@ show_remote_data_s3_minio:
show_remote_data_gcs_fake_gcs_server:
curl http://localhost:4443/storage/v1/b/test-bucket/o | jq -r '.items | map(.name) | .[]'
+.PHONY: show_remote_data_azurite
+show_remote_data_azurite:
+ docker run --rm --network=host mcr.microsoft.com/azure-cli \
+ az storage blob list --container-name test-container \
+ --account-name devstoreaccount1 \
+ --account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== \
+ --blob-endpoint http://127.0.0.1:10000/devstoreaccount1 --output table | grep $(topic)
+
offset = 0
consume = 10
.PHONY: consume
diff --git a/demo/README.md b/demo/README.md
index 0d23139fa..536b9ffe2 100644
--- a/demo/README.md
+++ b/demo/README.md
@@ -159,6 +159,37 @@ make show_local_data
make consume
```
+### Azurite as remote storage: `compose-azure-blob-azurite.yml`
+
+This scenario uses `AzureBlobStorage` with [Azurite](https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite) as the remote storage.
+
+```bash
+# Start the compose
+make run_azure_blob_azurite
+
+# Create the topic with any variation
+make create_topic_by_size_ts
+# or
+# make create_topic_by_time_ts
+# or with TS disabled
+# make create_topic_*_no_ts
+
+# Fill the topic
+make fill_topic
+
+# See that segments are uploaded to the remote storage
+# (this may take several seconds)
+make show_remote_data_azurite
+
+# Check that early segments are deleted
+# (completely or renamed with `.deleted` suffix)
+# from the local storage (this may take several seconds)
+make show_local_data
+
+# Check the data is consumable
+make consume
+```
+
## Additional features
### Encryption
diff --git a/demo/compose-azure-blob-azurite.yml b/demo/compose-azure-blob-azurite.yml
new file mode 100644
index 000000000..696a0ddb8
--- /dev/null
+++ b/demo/compose-azure-blob-azurite.yml
@@ -0,0 +1,90 @@
+##
+# Copyright 2023 Aiven Oy
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+version: '3.8'
+services:
+ zookeeper:
+ image: "confluentinc/cp-zookeeper:7.3.3"
+ ports:
+ - "2181:2181"
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+
+ kafka:
+ image: "aivenoy/kafka-with-ts-plugin"
+ container_name: "kafka-ts"
+ depends_on:
+ - zookeeper
+ - azurite
+ ports:
+ - "9092:9092"
+ - "7000:7000" #prometheus metrics
+ environment:
+ KAFKA_BROKER_ID: 0
+ KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
+ KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:29092"
+ KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092,BROKER://kafka:29092"
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT"
+ KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER"
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
+ # Increase Tiered Storage log level
+ KAFKA_LOG4J_LOGGERS: "io.aiven.kafka.tieredstorage=DEBUG"
+ # Tweak retention checking
+ KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000
+ # Enable Tiered Storage
+ KAFKA_REMOTE_LOG_STORAGE_SYSTEM_ENABLE: true
+ KAFKA_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS: 5000
+ # Remote metadata manager
+ KAFKA_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME: "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager"
+ KAFKA_REMOTE_LOG_METADATA_MANAGER_IMPL_PREFIX: "rlmm.config."
+ KAFKA_REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME: "BROKER"
+ KAFKA_RLMM_CONFIG_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR: 1
+ # Remote storage manager
+ KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH: "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*"
+ KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_NAME: "io.aiven.kafka.tieredstorage.RemoteStorageManager"
+ KAFKA_REMOTE_LOG_STORAGE_MANAGER_IMPL_PREFIX: "rsm.config."
+ KAFKA_RSM_CONFIG_CHUNK_SIZE: 5242880 # 5MiB
+ KAFKA_RSM_CONFIG_CHUNK_CACHE_CLASS: "io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache"
+ KAFKA_RSM_CONFIG_CHUNK_CACHE_SIZE: -1
+ KAFKA_RSM_CONFIG_CUSTOM_METADATA_FIELDS_INCLUDE: "REMOTE_SIZE"
+ # Storage backend
+ KAFKA_RSM_CONFIG_KEY_PREFIX: "tiered-storage-demo/"
+ KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS: "io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage"
+ KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME: "test-container"
+ # The well-known account Azurite name and key.
+ KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME: "devstoreaccount1"
+ KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
+ KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL: "http://azurite:10000/devstoreaccount1"
+
+ azurite:
+ image: mcr.microsoft.com/azure-storage/azurite
+ ports:
+ - "10000:10000"
+ command: azurite-blob --blobHost 0.0.0.0
+
+ azurite-create-container:
+ image: mcr.microsoft.com/azure-cli
+ restart: "no"
+ depends_on:
+ - azurite
+ command: >
+ az storage container create --name test-container
+ --account-name devstoreaccount1
+ --account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==
+ --blob-endpoint http://azurite:10000/devstoreaccount1
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 7e150448a..2797b53f0 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -21,7 +21,8 @@ USER root
RUN mkdir -p /tiered-storage-for-apache-kafka/core \
&& mkdir -p /tiered-storage-for-apache-kafka/s3 \
- && mkdir -p /tiered-storage-for-apache-kafka/gcs
+ && mkdir -p /tiered-storage-for-apache-kafka/gcs \
+ && mkdir -p /tiered-storage-for-apache-kafka/azure
COPY build/distributions/tiered-storage-for-apache-kafka-${_VERSION}.tgz /tiered-storage-for-apache-kafka/core
RUN cd /tiered-storage-for-apache-kafka/core \
@@ -38,6 +39,11 @@ RUN cd /tiered-storage-for-apache-kafka/gcs \
&& tar -xf gcs-${_VERSION}.tgz --strip-components=1 \
&& rm gcs-${_VERSION}.tgz
+COPY storage/azure/build/distributions/azure-${_VERSION}.tgz /tiered-storage-for-apache-kafka/azure
+RUN cd /tiered-storage-for-apache-kafka/azure \
+ && tar -xf azure-${_VERSION}.tgz --strip-components=1 \
+ && rm azure-${_VERSION}.tgz
+
# Installing JMX exporter agent
ARG JMX_EXPORTER_VERSION=0.18.0
RUN mkdir -p /opt/prometheus/jmx-exporter
diff --git a/e2e/build.gradle b/e2e/build.gradle
index e3903c67e..582904b2a 100644
--- a/e2e/build.gradle
+++ b/e2e/build.gradle
@@ -31,6 +31,9 @@ dependencies {
testImplementation "io.aiven:testcontainers-fake-gcs-server:$testcontainersFakeGcsServerVersion"
+ testImplementation platform("com.azure:azure-sdk-bom:$azureSdkVersion")
+ testImplementation "com.azure:azure-storage-blob"
+
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}
diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java
new file mode 100644
index 000000000..93d738540
--- /dev/null
+++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/AzureSingleBrokerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.e2e;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class AzureSingleBrokerTest extends SingleBrokerTest {
+ private static final int BLOB_STORAGE_PORT = 10000;
+ static final String NETWORK_ALIAS = "blob-storage";
+
+ static final GenericContainer> AZURITE_SERVER =
+ new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite"))
+ .withExposedPorts(BLOB_STORAGE_PORT)
+ .withCommand("azurite-blob --blobHost 0.0.0.0")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(NETWORK_ALIAS);
+
+ static final String AZURE_CONTAINER = "test-container";
+
+ static BlobContainerClient blobContainerClient;
+
+ @BeforeAll
+ static void init() throws Exception {
+ AZURITE_SERVER.start();
+
+ // The well-known Azurite account name and key.
+ final String accountName = "devstoreaccount1";
+ final String accountKey =
+ "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+
+ final String endpointForTestCode =
+ "http://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1";
+ final String connectionString = "DefaultEndpointsProtocol=http;"
+ + "AccountName=" + accountName + ";"
+ + "AccountKey=" + accountKey + ";"
+ + "BlobEndpoint=" + endpointForTestCode + ";";
+ final var blobServiceClient = new BlobServiceClientBuilder()
+ .connectionString(connectionString)
+ .buildClient();
+ blobContainerClient = blobServiceClient.createBlobContainer(AZURE_CONTAINER);
+
+ final String endpointForKafka = "http://" + NETWORK_ALIAS + ":" + BLOB_STORAGE_PORT + "/devstoreaccount1";
+ setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
+ "io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage")
+ .withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
+ "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*")
+ .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME", AZURE_CONTAINER)
+ .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME", accountName)
+ .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY", accountKey)
+ .withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL", endpointForKafka)
+ .dependsOn(AZURITE_SERVER));
+ }
+
+ @AfterAll
+ static void cleanup() {
+ stopKafka();
+
+ AZURITE_SERVER.stop();
+
+ cleanupStorage();
+ }
+
+ @Override
+ boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
+ final String prefix = String.format("%s-%s", topicName, topicId.toString());
+
+ final var list = blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(prefix), null);
+ return list.stream().findAny().isEmpty();
+ }
+
+ @Override
+ List remotePartitionFiles(final TopicIdPartition topicIdPartition) {
+ return blobContainerClient.listBlobs().stream()
+ .map(BlobItem::getName)
+ .map(k -> k.substring(k.lastIndexOf('/') + 1))
+ .sorted()
+ .collect(Collectors.toList());
+ }
+}
diff --git a/settings.gradle b/settings.gradle
index 425fbe138..1d6840abc 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -19,6 +19,7 @@ include 'core'
include 'storage'
include 'storage:core'
include 'storage:filesystem'
+include 'storage:azure'
include 'storage:gcs'
include 'storage:s3'
include 'e2e'
diff --git a/storage/azure/build.gradle b/storage/azure/build.gradle
new file mode 100644
index 000000000..0d5b46f88
--- /dev/null
+++ b/storage/azure/build.gradle
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+archivesBaseName = "storage-azure"
+
+dependencies {
+ implementation project(":storage:core")
+
+ implementation platform("com.azure:azure-sdk-bom:$azureSdkVersion")
+ implementation ("com.azure:azure-identity") {
+ exclude group: "org.slf4j"
+ }
+ implementation ("com.azure:azure-storage-blob") {
+ exclude group: "org.slf4j"
+ }
+
+ implementation project(":commons")
+
+ testImplementation(testFixtures(project(":storage:core")))
+
+ testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
+}
diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java
new file mode 100644
index 000000000..f1a6dfdfa
--- /dev/null
+++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageAccountKeyTest.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.storage.azure;
+
+import java.util.Map;
+
+import io.aiven.kafka.tieredstorage.storage.StorageBackend;
+
+class AzureBlobStorageAccountKeyTest extends AzureBlobStorageTest {
+ @Override
+ protected StorageBackend storage() {
+ final AzureBlobStorage azureBlobStorage = new AzureBlobStorage();
+ // The well-known Azurite account name and key.
+ final String accountName = "devstoreaccount1";
+ final String accountKey =
+ "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+ final Map configs = Map.of(
+ "azure.container.name", azureContainerName,
+ "azure.account.name", accountName,
+ "azure.account.key", accountKey,
+ "azure.endpoint.url", endpoint()
+ );
+ azureBlobStorage.configure(configs);
+ return azureBlobStorage;
+ }
+}
diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java
new file mode 100644
index 000000000..6ab5e2b73
--- /dev/null
+++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConnectionStringTest.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.storage.azure;
+
+import java.util.Map;
+
+import io.aiven.kafka.tieredstorage.storage.StorageBackend;
+
+class AzureBlobStorageConnectionStringTest extends AzureBlobStorageTest {
+ @Override
+ protected StorageBackend storage() {
+ final AzureBlobStorage azureBlobStorage = new AzureBlobStorage();
+ final Map configs = Map.of(
+ "azure.container.name", azureContainerName,
+ "azure.connection.string", connectionString()
+ );
+ azureBlobStorage.configure(configs);
+ return azureBlobStorage;
+ }
+}
diff --git a/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java
new file mode 100644
index 000000000..55f72746f
--- /dev/null
+++ b/storage/azure/src/integration-test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageTest.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.storage.azure;
+
+import java.io.ByteArrayInputStream;
+
+import io.aiven.kafka.tieredstorage.storage.BaseStorageTest;
+import io.aiven.kafka.tieredstorage.storage.BytesRange;
+import io.aiven.kafka.tieredstorage.storage.InvalidRangeException;
+import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
+
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+@Testcontainers
+abstract class AzureBlobStorageTest extends BaseStorageTest {
+ private static final int BLOB_STORAGE_PORT = 10000;
+ @Container
+ static final GenericContainer> AZURITE_SERVER =
+ new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite"))
+ .withExposedPorts(BLOB_STORAGE_PORT)
+ .withCommand("azurite-blob --blobHost 0.0.0.0");
+
+ static BlobServiceClient blobServiceClient;
+
+ protected String azureContainerName;
+
+ protected static String endpoint() {
+ return "http://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1";
+ }
+
+ protected static String connectionString() {
+ // The well-known Azurite connection string.
+ return "DefaultEndpointsProtocol=http;"
+ + "AccountName=devstoreaccount1;"
+ + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
+ + "BlobEndpoint=" + endpoint() + ";";
+ }
+
+ @BeforeAll
+ static void setUpClass() {
+ blobServiceClient = new BlobServiceClientBuilder()
+ .connectionString(connectionString())
+ .buildClient();
+ }
+
+ @BeforeEach
+ void setUp(final TestInfo testInfo) {
+ azureContainerName = testInfo.getDisplayName()
+ .toLowerCase()
+ .replace("(", "")
+ .replace(")", "");
+ while (azureContainerName.length() < 3) {
+ azureContainerName += azureContainerName;
+ }
+ blobServiceClient.createBlobContainer(azureContainerName);
+ }
+
+ @Override
+ protected void testFetchWithRangeOutsideFileSize() throws StorageBackendException {
+ // For some reason, Azure (or only Azurite) considers range 3-5 valid for a 3-byte blob,
+ // so the generic test fails.
+ final String content = "ABC";
+ storage().upload(new ByteArrayInputStream(content.getBytes()), TOPIC_PARTITION_SEGMENT_KEY);
+
+ assertThatThrownBy(() -> storage().fetch(TOPIC_PARTITION_SEGMENT_KEY, BytesRange.of(4, 6)))
+ .isInstanceOf(InvalidRangeException.class);
+ }
+}
diff --git a/storage/azure/src/integration-test/resources/log4j.properties b/storage/azure/src/integration-test/resources/log4j.properties
new file mode 100644
index 000000000..3e86cfa01
--- /dev/null
+++ b/storage/azure/src/integration-test/resources/log4j.properties
@@ -0,0 +1,21 @@
+#
+# Copyright 2023 Aiven Oy
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
diff --git a/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java
new file mode 100644
index 000000000..d23fdd309
--- /dev/null
+++ b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorage.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.storage.azure;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+import io.aiven.kafka.tieredstorage.storage.BytesRange;
+import io.aiven.kafka.tieredstorage.storage.InvalidRangeException;
+import io.aiven.kafka.tieredstorage.storage.KeyNotFoundException;
+import io.aiven.kafka.tieredstorage.storage.ObjectKey;
+import io.aiven.kafka.tieredstorage.storage.StorageBackend;
+import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
+
+import com.azure.identity.DefaultAzureCredentialBuilder;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobRange;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.specialized.BlockBlobClient;
+import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+
+public class AzureBlobStorage implements StorageBackend {
+ private AzureBlobStorageConfig config;
+ private BlobContainerClient blobContainerClient;
+
+ // TODO make configurable
+ private final int blockSize = 5 * 1024 * 1024;
+
+ @Override
+ public void configure(final Map configs) {
+ this.config = new AzureBlobStorageConfig(configs);
+
+ final BlobServiceClientBuilder blobServiceClientBuilder = new BlobServiceClientBuilder();
+ if (config.connectionString() != null) {
+ blobServiceClientBuilder.connectionString(config.connectionString());
+ } else {
+ blobServiceClientBuilder.endpoint(endpointUrl());
+
+ if (config.accountKey() == null) {
+ blobServiceClientBuilder.credential(
+ new DefaultAzureCredentialBuilder().build());
+ } else {
+ blobServiceClientBuilder.credential(
+ new StorageSharedKeyCredential(config.accountName(), config.accountKey()));
+ }
+ }
+
+ blobContainerClient = blobServiceClientBuilder.buildClient()
+ .getBlobContainerClient(config.containerName());
+ }
+
+ private String endpointUrl() {
+ if (config.endpointUrl() != null) {
+ return config.endpointUrl();
+ } else {
+ return "https://" + config.accountName() + ".blob.core.windows.net";
+ }
+ }
+
+ @Override
+ public long upload(final InputStream inputStream, final ObjectKey key) throws StorageBackendException {
+ final var specializedBlobClientBuilder = new SpecializedBlobClientBuilder();
+ if (config.connectionString() != null) {
+ specializedBlobClientBuilder.connectionString(config.connectionString());
+ } else {
+ specializedBlobClientBuilder.endpoint(endpointUrl());
+
+ if (config.accountKey() == null) {
+ specializedBlobClientBuilder.credential(
+ new DefaultAzureCredentialBuilder().build());
+ } else {
+ specializedBlobClientBuilder.credential(
+ new StorageSharedKeyCredential(config.accountName(), config.accountKey()));
+ }
+ }
+ final BlockBlobClient blockBlobClient = specializedBlobClientBuilder
+ .containerName(config.containerName())
+ .blobName(key.value())
+ .buildBlockBlobClient();
+
+ try (OutputStream os = new BufferedOutputStream(blockBlobClient.getBlobOutputStream(true), blockSize)) {
+ return inputStream.transferTo(os);
+ } catch (final IOException e) {
+ throw new StorageBackendException("Failed to upload " + key, e);
+ }
+ }
+
+ @Override
+ public InputStream fetch(final ObjectKey key) throws StorageBackendException {
+ try {
+ return blobContainerClient.getBlobClient(key.value()).openInputStream();
+ } catch (final BlobStorageException e) {
+ if (e.getStatusCode() == 404) {
+ throw new KeyNotFoundException(this, key, e);
+ } else {
+ throw new StorageBackendException("Failed to fetch " + key, e);
+ }
+ }
+ }
+
+ @Override
+ public InputStream fetch(final ObjectKey key, final BytesRange range) throws StorageBackendException {
+ try {
+ return blobContainerClient.getBlobClient(key.value()).openInputStream(
+ new BlobRange(range.from, (long) range.size()), null);
+ } catch (final BlobStorageException e) {
+ if (e.getStatusCode() == 404) {
+ throw new KeyNotFoundException(this, key, e);
+ } else if (e.getStatusCode() == 416) {
+ throw new InvalidRangeException("Invalid range " + range, e);
+ } else {
+ throw new StorageBackendException("Failed to fetch " + key, e);
+ }
+ }
+ }
+
+ @Override
+ public void delete(final ObjectKey key) throws StorageBackendException {
+ try {
+ blobContainerClient.getBlobClient(key.value()).deleteIfExists();
+ } catch (final BlobStorageException e) {
+ throw new StorageBackendException("Failed to delete " + key, e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AzureStorage{"
+ + "containerName='" + config.containerName() + '\''
+ + '}';
+ }
+}
diff --git a/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfig.java b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfig.java
new file mode 100644
index 000000000..d87ab979f
--- /dev/null
+++ b/storage/azure/src/main/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfig.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.storage.azure;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.types.Password;
+
+import io.aiven.kafka.tieredstorage.config.validators.NonEmptyPassword;
+import io.aiven.kafka.tieredstorage.config.validators.Null;
+import io.aiven.kafka.tieredstorage.config.validators.ValidUrl;
+
+public class AzureBlobStorageConfig extends AbstractConfig {
+ static final String AZURE_ACCOUNT_NAME_CONFIG = "azure.account.name";
+ private static final String AZURE_ACCOUNT_NAME_DOC = "Azure account name";
+
+ static final String AZURE_ACCOUNT_KEY_CONFIG = "azure.account.key";
+ private static final String AZURE_ACCOUNT_KEY_DOC = "Azure account key";
+
+ static final String AZURE_CONTAINER_NAME_CONFIG = "azure.container.name";
+ private static final String AZURE_CONTAINER_NAME_DOC = "Azure container to store log segments";
+
+ static final String AZURE_ENDPOINT_URL_CONFIG = "azure.endpoint.url";
+ private static final String AZURE_ENDPOINT_URL_DOC = "Custom Azure Blob Storage endpoint URL";
+
+ static final String AZURE_CONNECTION_STRING_CONFIG = "azure.connection.string";
+ private static final String AZURE_CONNECTION_STRING_DOC = "Azure connection string. "
+ + "Cannot be used together with azure.account.name, azure.account.key, and azure.endpoint.url";
+
+ private static final ConfigDef CONFIG;
+
+ static {
+ CONFIG = new ConfigDef()
+ .define(
+ AZURE_ACCOUNT_NAME_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ Null.or(new ConfigDef.NonEmptyString()),
+ ConfigDef.Importance.HIGH,
+ AZURE_ACCOUNT_NAME_DOC)
+ .define(
+ AZURE_ACCOUNT_KEY_CONFIG,
+ ConfigDef.Type.PASSWORD,
+ null,
+ Null.or(new NonEmptyPassword()),
+ ConfigDef.Importance.MEDIUM,
+ AZURE_ACCOUNT_KEY_DOC)
+ .define(
+ AZURE_CONTAINER_NAME_CONFIG,
+ ConfigDef.Type.STRING,
+ ConfigDef.NO_DEFAULT_VALUE,
+ new ConfigDef.NonEmptyString(),
+ ConfigDef.Importance.HIGH,
+ AZURE_CONTAINER_NAME_DOC)
+ .define(
+ AZURE_ENDPOINT_URL_CONFIG,
+ ConfigDef.Type.STRING,
+ null,
+ Null.or(new ValidUrl()),
+ ConfigDef.Importance.LOW,
+ AZURE_ENDPOINT_URL_DOC)
+ .define(
+ AZURE_CONNECTION_STRING_CONFIG,
+ ConfigDef.Type.PASSWORD,
+ null,
+ Null.or(new NonEmptyPassword()),
+ ConfigDef.Importance.MEDIUM,
+ AZURE_CONNECTION_STRING_DOC);
+ }
+
+ public AzureBlobStorageConfig(final Map props) {
+ super(CONFIG, props);
+ validate();
+ }
+
+ private void validate() {
+ if (connectionString() != null) {
+ if (accountName() != null) {
+ throw new ConfigException(
+ "\"azure.connection.string\" cannot be set together with \"azure.account.name\".");
+ }
+ if (accountKey() != null) {
+ throw new ConfigException(
+ "\"azure.connection.string\" cannot be set together with \"azure.account.key\".");
+ }
+ if (endpointUrl() != null) {
+ throw new ConfigException(
+ "\"azure.connection.string\" cannot be set together with \"azure.endpoint.url\".");
+ }
+ } else {
+ if (accountName() == null) {
+ throw new ConfigException(
+ "\"azure.account.name\" must be set if \"azure.connection.string\" is not set.");
+ }
+ }
+ }
+
+ String accountName() {
+ return getString(AZURE_ACCOUNT_NAME_CONFIG);
+ }
+
+ String accountKey() {
+ final Password key = getPassword(AZURE_ACCOUNT_KEY_CONFIG);
+ return key == null ? null : key.value();
+ }
+
+ String containerName() {
+ return getString(AZURE_CONTAINER_NAME_CONFIG);
+ }
+
+ String endpointUrl() {
+ return getString(AZURE_ENDPOINT_URL_CONFIG);
+ }
+
+ String connectionString() {
+ final Password connectionString = getPassword(AZURE_CONNECTION_STRING_CONFIG);
+ return connectionString == null ? null : connectionString.value();
+ }
+}
diff --git a/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java b/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java
new file mode 100644
index 000000000..dd42b6279
--- /dev/null
+++ b/storage/azure/src/test/java/io/aiven/kafka/tieredstorage/storage/azure/AzureBlobStorageConfigTest.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2023 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.tieredstorage.storage.azure;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class AzureBlobStorageConfigTest {
+ private static final String ACCOUNT_NAME = "account1";
+ private static final String ACCOUNT_KEY = "account_key";
+ private static final String CONTAINER_NAME = "c1";
+ private static final String ENDPOINT = "http://localhost:10000/";
+ private static final String CONNECTION_STRING = "DefaultEndpointsProtocol=http;"
+ + "AccountName=devstoreaccount1;"
+ + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
+ + "BlobEndpoint=http://localhost:10000/devstoreaccount1;";
+
+ @Test
+ void minimalConfig() {
+ final var configs = Map.of(
+ "azure.account.name", ACCOUNT_NAME,
+ "azure.container.name", CONTAINER_NAME
+ );
+ final var config = new AzureBlobStorageConfig(configs);
+ assertThat(config.containerName()).isEqualTo(CONTAINER_NAME);
+ assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME);
+ assertThat(config.accountKey()).isNull();
+ assertThat(config.endpointUrl()).isNull();
+ assertThat(config.connectionString()).isNull();
+ }
+
+ @Test
+ void shouldRequireContainerName() {
+ final var configs = Map.of(
+ "azure.account.name", ACCOUNT_NAME
+ );
+ assertThatThrownBy(() -> new AzureBlobStorageConfig(configs))
+ .isInstanceOf(ConfigException.class)
+ .hasMessage("Missing required configuration \"azure.container.name\" which has no default value.");
+ }
+
+ @Test
+ void shouldRequireAccountNameIfNoConnectionString() {
+ final var configs = Map.of(
+ "azure.container.name", CONTAINER_NAME
+ );
+ assertThatThrownBy(() -> new AzureBlobStorageConfig(configs))
+ .isInstanceOf(ConfigException.class)
+ .hasMessage("\"azure.account.name\" must be set if \"azure.connection.string\" is not set.");
+ }
+
+ @Test
+ void authAccountName() {
+ final var configs = Map.of(
+ "azure.account.name", ACCOUNT_NAME,
+ "azure.container.name", CONTAINER_NAME
+ );
+ final var config = new AzureBlobStorageConfig(configs);
+ assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME);
+ assertThat(config.accountKey()).isNull();
+ assertThat(config.endpointUrl()).isNull();
+ assertThat(config.connectionString()).isNull();
+ }
+
+ @Test
+ void authAccountNameAndEndpoint() {
+ final var configs = Map.of(
+ "azure.account.name", ACCOUNT_NAME,
+ "azure.container.name", CONTAINER_NAME,
+ "azure.endpoint.url", ENDPOINT
+ );
+ final var config = new AzureBlobStorageConfig(configs);
+ assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME);
+ assertThat(config.accountKey()).isNull();
+ assertThat(config.endpointUrl()).isEqualTo(ENDPOINT);
+ assertThat(config.connectionString()).isNull();
+ }
+
+ @Test
+ void authAccountNameAndKey() {
+ final var configs = Map.of(
+ "azure.account.name", ACCOUNT_NAME,
+ "azure.account.key", ACCOUNT_KEY,
+ "azure.container.name", CONTAINER_NAME
+ );
+ final var config = new AzureBlobStorageConfig(configs);
+ assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME);
+ assertThat(config.accountKey()).isEqualTo(ACCOUNT_KEY);
+ assertThat(config.endpointUrl()).isNull();
+ assertThat(config.connectionString()).isNull();
+ }
+
+ @Test
+ void authAccountNameAndKeyAndEndpoint() {
+ final var configs = Map.of(
+ "azure.account.name", ACCOUNT_NAME,
+ "azure.account.key", ACCOUNT_KEY,
+ "azure.container.name", CONTAINER_NAME,
+ "azure.endpoint.url", ENDPOINT
+ );
+ final var config = new AzureBlobStorageConfig(configs);
+ assertThat(config.accountName()).isEqualTo(ACCOUNT_NAME);
+ assertThat(config.accountKey()).isEqualTo(ACCOUNT_KEY);
+ assertThat(config.endpointUrl()).isEqualTo(ENDPOINT);
+ assertThat(config.connectionString()).isNull();
+ }
+
+ @Test
+ void authConnectionString() {
+ final var configs = Map.of(
+ "azure.connection.string", CONNECTION_STRING,
+ "azure.container.name", CONTAINER_NAME
+ );
+ final var config = new AzureBlobStorageConfig(configs);
+ assertThat(config.accountName()).isNull();
+ assertThat(config.accountKey()).isNull();
+ assertThat(config.endpointUrl()).isNull();
+ assertThat(config.connectionString()).isEqualTo(CONNECTION_STRING);
+ }
+
+ @Test
+ void connectionStringAndAccountNameClash() {
+ final var configs = Map.of(
+ "azure.connection.string", CONNECTION_STRING,
+ "azure.container.name", CONTAINER_NAME,
+ "azure.account.name", ACCOUNT_NAME
+ );
+ assertThatThrownBy(() -> new AzureBlobStorageConfig(configs))
+ .isInstanceOf(ConfigException.class)
+ .hasMessage("\"azure.connection.string\" cannot be set together with \"azure.account.name\".");
+ }
+
+ @Test
+ void connectionStringAndAccountKeyClash() {
+ final var configs = Map.of(
+ "azure.connection.string", CONNECTION_STRING,
+ "azure.container.name", CONTAINER_NAME,
+ "azure.account.key", ACCOUNT_KEY
+ );
+ assertThatThrownBy(() -> new AzureBlobStorageConfig(configs))
+ .isInstanceOf(ConfigException.class)
+ .hasMessage("\"azure.connection.string\" cannot be set together with \"azure.account.key\".");
+ }
+
+ @Test
+ void connectionStringAndEndpointClash() {
+ final var configs = Map.of(
+ "azure.connection.string", CONNECTION_STRING,
+ "azure.container.name", CONTAINER_NAME,
+ "azure.endpoint.url", ENDPOINT
+ );
+ assertThatThrownBy(() -> new AzureBlobStorageConfig(configs))
+ .isInstanceOf(ConfigException.class)
+ .hasMessage("\"azure.connection.string\" cannot be set together with \"azure.endpoint.url\".");
+ }
+}
diff --git a/storage/azure/src/test/resources/log4j.properties b/storage/azure/src/test/resources/log4j.properties
new file mode 100644
index 000000000..3e86cfa01
--- /dev/null
+++ b/storage/azure/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+#
+# Copyright 2023 Aiven Oy
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
diff --git a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java
index 5a57fab46..54d02cf90 100644
--- a/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java
+++ b/storage/core/src/testFixtures/java/io/aiven/kafka/tieredstorage/storage/BaseStorageTest.java
@@ -135,7 +135,7 @@ void testFetchWithOffsetRangeLargerThanFileSize() throws IOException, StorageBac
}
@Test
- void testFetchWithRangeOutsideFileSize() throws StorageBackendException {
+ protected void testFetchWithRangeOutsideFileSize() throws StorageBackendException {
final String content = "ABC";
storage().upload(new ByteArrayInputStream(content.getBytes()), TOPIC_PARTITION_SEGMENT_KEY);