Skip to content

Commit

Permalink
Add support for Azure Blob Storage
Browse files Browse the repository at this point in the history
Metrics are not supported in this PR and will be added in another one.

Related to #421
  • Loading branch information
ivanyu committed Oct 19, 2023
1 parent 3f4a2d4 commit 61f4cc0
Show file tree
Hide file tree
Showing 19 changed files with 932 additions and 3 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ clean:
checkstyle:
./gradlew checkstyleMain checkstyleTest checkstyleIntegrationTest

build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz
build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz storage/gcs/build/distributions/azure-$(VERSION).tgz

build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz:
./gradlew build distTar -x test -x integrationTest -x e2e:test
Expand All @@ -39,6 +39,9 @@ storage/s3/build/distributions/s3-$(VERSION).tgz:
storage/gcs/build/distributions/gcs-$(VERSION).tgz:
./gradlew build :storage:gcs:distTar -x test -x integrationTest -x e2e:test

storage/gcs/build/distributions/azure-$(VERSION).tgz:
./gradlew build :storage:azure:distTar -x test -x integrationTest -x e2e:test

test: build
./gradlew test -x e2e:test

Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ subprojects {

gcpSdkVersion = "2.26.1"

azureSdkVersion = "1.2.17"

testcontainersVersion = "1.19.0"

testcontainersFakeGcsServerVersion = "0.2.0"
Expand Down
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="MetricCollector.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="AzureBlobStorage.java"/>
<suppress checks="CyclomaticComplexity" files="MetricCollector.java"/>
<suppress checks="CyclomaticComplexity" files="SegmentIndexesV1Builder.java"/>
<suppress checks="CyclomaticComplexity" files="SingleBrokerTest.java"/>
Expand Down
13 changes: 13 additions & 0 deletions demo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,17 @@ run_s3_minio:
run_gcs_fake_gcs_server:
docker compose -f compose-gcs-fake-gcs-server.yml up

.PHONY: run_azure_blob_azurite
run_azure_blob_azurite:
docker compose -f compose-azure-blob-azurite.yml up

.PHONY: clean
clean:
docker compose -f compose-local-fs.yml down
docker compose -f compose-s3-aws.yml down
docker compose -f compose-s3-minio.yml down
docker compose -f compose-gcs-fake-gcs-server.yml down
docker compose -f compose-azure-blob-azurite.yml down

.PHONY: show_local_data
show_local_data:
Expand All @@ -141,6 +146,14 @@ show_remote_data_s3_minio:
show_remote_data_gcs_fake_gcs_server:
curl http://localhost:4443/storage/v1/b/test-bucket/o | jq -r '.items | map(.name) | .[]'

.PHONY: show_remote_data_azurite
show_remote_data_azurite:
docker run --rm --network=host mcr.microsoft.com/azure-cli \
az storage blob list --container-name test-container \
--account-name devstoreaccount1 \
--account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== \
--blob-endpoint http://127.0.0.1:10000/devstoreaccount1 --output table | grep $(topic)

offset = 0
consume = 10
.PHONY: consume
Expand Down
89 changes: 89 additions & 0 deletions demo/compose-azure-blob-azurite.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
##
# Copyright 2023 Aiven Oy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
version: '3.8'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:7.3.3"
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: "aivenoy/kafka-with-ts-plugin"
container_name: "kafka-ts"
depends_on:
- zookeeper
- azurite
ports:
- "9092:9092"
- "7000:7000" #prometheus metrics
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:29092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092,BROKER://kafka:29092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
# Increase Tiered Storage log level
KAFKA_LOG4J_LOGGERS: "io.aiven.kafka.tieredstorage=DEBUG"
# Tweak retention checking
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000
# Enable Tiered Storage
KAFKA_REMOTE_LOG_STORAGE_SYSTEM_ENABLE: true
KAFKA_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS: 5000
# Remote metadata manager
KAFKA_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME: "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager"
KAFKA_REMOTE_LOG_METADATA_MANAGER_IMPL_PREFIX: "rlmm.config."
KAFKA_REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME: "BROKER"
KAFKA_RLMM_CONFIG_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR: 1
# Remote storage manager
KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH: "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*"
KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_NAME: "io.aiven.kafka.tieredstorage.RemoteStorageManager"
KAFKA_REMOTE_LOG_STORAGE_MANAGER_IMPL_PREFIX: "rsm.config."
KAFKA_RSM_CONFIG_CHUNK_SIZE: 5242880 # 5MiB
KAFKA_RSM_CONFIG_CHUNK_CACHE_CLASS: "io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache"
KAFKA_RSM_CONFIG_CHUNK_CACHE_SIZE: -1
KAFKA_RSM_CONFIG_CUSTOM_METADATA_FIELDS_INCLUDE: "REMOTE_SIZE"
# Storage backend
KAFKA_RSM_CONFIG_KEY_PREFIX: "tiered-storage-demo/"
KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS: "io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage"
KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME: "test-container"
KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME: "devstoreaccount1"
KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL: "http://azurite:10000/devstoreaccount1"

azurite:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- "10000:10000"
command: azurite-blob --blobHost 0.0.0.0

azurite-create-container:
image: mcr.microsoft.com/azure-cli
restart: "no"
depends_on:
- azurite
command: >
az storage container create --name test-container
--account-name devstoreaccount1
--account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==
--blob-endpoint http://azurite:10000/devstoreaccount1
8 changes: 7 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ USER root

RUN mkdir -p /tiered-storage-for-apache-kafka/core \
&& mkdir -p /tiered-storage-for-apache-kafka/s3 \
&& mkdir -p /tiered-storage-for-apache-kafka/gcs
&& mkdir -p /tiered-storage-for-apache-kafka/gcs \
&& mkdir -p /tiered-storage-for-apache-kafka/azure

COPY build/distributions/tiered-storage-for-apache-kafka-${_VERSION}.tgz /tiered-storage-for-apache-kafka/core
RUN cd /tiered-storage-for-apache-kafka/core \
Expand All @@ -38,6 +39,11 @@ RUN cd /tiered-storage-for-apache-kafka/gcs \
&& tar -xf gcs-${_VERSION}.tgz --strip-components=1 \
&& rm gcs-${_VERSION}.tgz

COPY storage/azure/build/distributions/azure-${_VERSION}.tgz /tiered-storage-for-apache-kafka/azure
RUN cd /tiered-storage-for-apache-kafka/azure \
&& tar -xf azure-${_VERSION}.tgz --strip-components=1 \
&& rm azure-${_VERSION}.tgz

# Installing JMX exporter agent
ARG JMX_EXPORTER_VERSION=0.18.0
RUN mkdir -p /opt/prometheus/jmx-exporter
Expand Down
3 changes: 3 additions & 0 deletions e2e/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ dependencies {

testImplementation "io.aiven:testcontainers-fake-gcs-server:$testcontainersFakeGcsServerVersion"

testImplementation platform("com.azure:azure-sdk-bom:$azureSdkVersion")
testImplementation "com.azure:azure-storage-blob"

testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.ListBlobsOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

public class AzureSingleBrokerTest extends SingleBrokerTest {
private static final int BLOB_STORAGE_PORT = 10000;
static final String NETWORK_ALIAS = "blob-storage";

static final GenericContainer<?> AZURITE_SERVER =
new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite"))
.withExposedPorts(BLOB_STORAGE_PORT)
.withCommand("azurite-blob --blobHost 0.0.0.0")
.withNetwork(NETWORK)
.withNetworkAliases(NETWORK_ALIAS);

static final String AZURE_CONTAINER = "test-container";

static BlobContainerClient blobContainerClient;

@BeforeAll
static void init() throws Exception {
AZURITE_SERVER.start();

// The well-known Azurite account name and key.
final String accountName = "devstoreaccount1";
final String accountKey =
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";

final String endpointForTestCode =
"http://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1";
final String connectionString = "DefaultEndpointsProtocol=http;"
+ "AccountName=" + accountName + ";"
+ "AccountKey=" + accountKey + ";"
+ "BlobEndpoint=" + endpointForTestCode + ";";
final var blobServiceClient = new BlobServiceClientBuilder()
.connectionString(connectionString)
.buildClient();
blobContainerClient = blobServiceClient.createBlobContainer(AZURE_CONTAINER);

final String endpointForKafka = "http://" + NETWORK_ALIAS + ":" + BLOB_STORAGE_PORT + "/devstoreaccount1";
setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage")
.withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
"/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME", AZURE_CONTAINER)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME", accountName)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY", accountKey)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL", endpointForKafka)
.dependsOn(AZURITE_SERVER));
}

@AfterAll
static void cleanup() {
stopKafka();

AZURITE_SERVER.stop();

cleanupStorage();
}

@Override
boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());

final var list = blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(prefix), null);
return list.stream().findAny().isEmpty();
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
return blobContainerClient.listBlobs().stream()
.map(BlobItem::getName)
.map(k -> k.substring(k.lastIndexOf('/') + 1))
.sorted()
.collect(Collectors.toList());
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ include 'core'
include 'storage'
include 'storage:core'
include 'storage:filesystem'
include 'storage:azure'
include 'storage:gcs'
include 'storage:s3'
include 'e2e'
Expand Down
35 changes: 35 additions & 0 deletions storage/azure/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

archivesBaseName = "storage-azure"

dependencies {
implementation project(":storage:core")

implementation platform("com.azure:azure-sdk-bom:$azureSdkVersion")
implementation ("com.azure:azure-identity") {
exclude group: "org.slf4j"
}
implementation ("com.azure:azure-storage-blob") {
exclude group: "org.slf4j"
}

implementation project(":commons")

testImplementation(testFixtures(project(":storage:core")))

testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.storage.azure;

import java.util.Map;

import io.aiven.kafka.tieredstorage.storage.StorageBackend;

class AzureBlobStorageAccountKeyTest extends AzureBlobStorageTest {
@Override
protected StorageBackend storage() {
final AzureBlobStorage azureBlobStorage = new AzureBlobStorage();
// The well-known Azurite account name and key.
final String accountName = "devstoreaccount1";
final String accountKey =
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
final Map<String, Object> configs = Map.of(
"azure.container.name", azureContainerName,
"azure.account.name", accountName,
"azure.account.key", accountKey,
"azure.endpoint.url", endpoint()
);
azureBlobStorage.configure(configs);
return azureBlobStorage;
}
}
Loading

0 comments on commit 61f4cc0

Please sign in to comment.