Skip to content

Commit

Permalink
Merge pull request #424 from Aiven-Open/ivanyu/azure
Browse files Browse the repository at this point in the history
Add support for Azure Blob Storage
  • Loading branch information
jeqo authored Oct 22, 2023
2 parents ccce813 + 483c8ba commit 3b44bd9
Show file tree
Hide file tree
Showing 22 changed files with 967 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main_push_and_pull_request_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
strategy:
matrix:
java-version: [ 11 ]
test: [ 'LocalSystem', 'S3', 'Gcs' ]
test: [ 'LocalSystem', 'S3', 'Gcs', 'Azure' ]
name: E2E tests for ${{ matrix.test }} with jdk ${{ matrix.java-version }}
runs-on: ubuntu-latest
steps:
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ clean:
checkstyle:
./gradlew checkstyleMain checkstyleTest checkstyleIntegrationTest

build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz
build: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz storage/s3/build/distributions/s3-$(VERSION).tgz storage/gcs/build/distributions/gcs-$(VERSION).tgz storage/gcs/build/distributions/azure-$(VERSION).tgz

build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz:
./gradlew build distTar -x test -x integrationTest -x e2e:test
Expand All @@ -39,6 +39,9 @@ storage/s3/build/distributions/s3-$(VERSION).tgz:
storage/gcs/build/distributions/gcs-$(VERSION).tgz:
./gradlew build :storage:gcs:distTar -x test -x integrationTest -x e2e:test

storage/gcs/build/distributions/azure-$(VERSION).tgz:
./gradlew build :storage:azure:distTar -x test -x integrationTest -x e2e:test

test: build
./gradlew test -x e2e:test

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This project is an implementation of `RemoteStorageManager` for Apache Kafka tiered storage.

The implementation will have multiple configurable storage backends. Currently, AWS S3 and Google Cloud Storage are supported. We intend to support Azure Blob Storage in the near future.
The implementation will have multiple configurable storage backends. Currently, AWS S3, Google Cloud Storage, and Azure Blob Storage are supported.

The project follows the API specifications according to the latest version of [KIP-405: Kafka Tiered Storage](https://cwiki.apache.org/confluence/x/KJDQBQ).

Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ subprojects {

gcpSdkVersion = "2.26.1"

azureSdkVersion = "1.2.17"

testcontainersVersion = "1.19.0"

testcontainersFakeGcsServerVersion = "0.2.0"
Expand Down
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<suppress checks="ClassDataAbstractionCoupling" files="RemoteStorageManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="Metrics.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="MetricCollector.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="AzureBlobStorage.java"/>
<suppress checks="CyclomaticComplexity" files="MetricCollector.java"/>
<suppress checks="CyclomaticComplexity" files="SegmentIndexesV1Builder.java"/>
<suppress checks="CyclomaticComplexity" files="SingleBrokerTest.java"/>
Expand Down
13 changes: 13 additions & 0 deletions demo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,17 @@ run_s3_minio:
run_gcs_fake_gcs_server:
docker compose -f compose-gcs-fake-gcs-server.yml up

.PHONY: run_azure_blob_azurite
run_azure_blob_azurite:
docker compose -f compose-azure-blob-azurite.yml up

.PHONY: clean
clean:
docker compose -f compose-local-fs.yml down
docker compose -f compose-s3-aws.yml down
docker compose -f compose-s3-minio.yml down
docker compose -f compose-gcs-fake-gcs-server.yml down
docker compose -f compose-azure-blob-azurite.yml down

.PHONY: show_local_data
show_local_data:
Expand All @@ -141,6 +146,14 @@ show_remote_data_s3_minio:
show_remote_data_gcs_fake_gcs_server:
curl http://localhost:4443/storage/v1/b/test-bucket/o | jq -r '.items | map(.name) | .[]'

.PHONY: show_remote_data_azurite
show_remote_data_azurite:
docker run --rm --network=host mcr.microsoft.com/azure-cli \
az storage blob list --container-name test-container \
--account-name devstoreaccount1 \
--account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== \
--blob-endpoint http://127.0.0.1:10000/devstoreaccount1 --output table | grep $(topic)

offset = 0
consume = 10
.PHONY: consume
Expand Down
31 changes: 31 additions & 0 deletions demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,37 @@ make show_local_data
make consume
```

### Azurite as remote storage: `compose-azure-blob-azurite.yml`

This scenario uses `AzureBlobStorage` with [Azurite](https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite) as the remote storage.

```bash
# Start the compose
make run_azure_blob_azurite

# Create the topic with any variation
make create_topic_by_size_ts
# or
# make create_topic_by_time_ts
# or with TS disabled
# make create_topic_*_no_ts

# Fill the topic
make fill_topic

# See that segments are uploaded to the remote storage
# (this may take several seconds)
make show_remote_data_azurite

# Check that early segments are deleted
# (completely or renamed with `.deleted` suffix)
# from the local storage (this may take several seconds)
make show_local_data

# Check the data is consumable
make consume
```

## Additional features

### Encryption
Expand Down
90 changes: 90 additions & 0 deletions demo/compose-azure-blob-azurite.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
##
# Copyright 2023 Aiven Oy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
version: '3.8'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:7.3.3"
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: "aivenoy/kafka-with-ts-plugin"
container_name: "kafka-ts"
depends_on:
- zookeeper
- azurite
ports:
- "9092:9092"
- "7000:7000" #prometheus metrics
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:29092"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:9092,BROKER://kafka:29092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
# Increase Tiered Storage log level
KAFKA_LOG4J_LOGGERS: "io.aiven.kafka.tieredstorage=DEBUG"
# Tweak retention checking
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 10000
# Enable Tiered Storage
KAFKA_REMOTE_LOG_STORAGE_SYSTEM_ENABLE: true
KAFKA_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS: 5000
# Remote metadata manager
KAFKA_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME: "org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager"
KAFKA_REMOTE_LOG_METADATA_MANAGER_IMPL_PREFIX: "rlmm.config."
KAFKA_REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME: "BROKER"
KAFKA_RLMM_CONFIG_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR: 1
# Remote storage manager
KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH: "/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*"
KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_NAME: "io.aiven.kafka.tieredstorage.RemoteStorageManager"
KAFKA_REMOTE_LOG_STORAGE_MANAGER_IMPL_PREFIX: "rsm.config."
KAFKA_RSM_CONFIG_CHUNK_SIZE: 5242880 # 5MiB
KAFKA_RSM_CONFIG_CHUNK_CACHE_CLASS: "io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache"
KAFKA_RSM_CONFIG_CHUNK_CACHE_SIZE: -1
KAFKA_RSM_CONFIG_CUSTOM_METADATA_FIELDS_INCLUDE: "REMOTE_SIZE"
# Storage backend
KAFKA_RSM_CONFIG_KEY_PREFIX: "tiered-storage-demo/"
KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS: "io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage"
KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME: "test-container"
# The well-known account Azurite name and key.
KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME: "devstoreaccount1"
KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY: "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL: "http://azurite:10000/devstoreaccount1"

azurite:
image: mcr.microsoft.com/azure-storage/azurite
ports:
- "10000:10000"
command: azurite-blob --blobHost 0.0.0.0

azurite-create-container:
image: mcr.microsoft.com/azure-cli
restart: "no"
depends_on:
- azurite
command: >
az storage container create --name test-container
--account-name devstoreaccount1
--account-key Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==
--blob-endpoint http://azurite:10000/devstoreaccount1
8 changes: 7 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ USER root

RUN mkdir -p /tiered-storage-for-apache-kafka/core \
&& mkdir -p /tiered-storage-for-apache-kafka/s3 \
&& mkdir -p /tiered-storage-for-apache-kafka/gcs
&& mkdir -p /tiered-storage-for-apache-kafka/gcs \
&& mkdir -p /tiered-storage-for-apache-kafka/azure

COPY build/distributions/tiered-storage-for-apache-kafka-${_VERSION}.tgz /tiered-storage-for-apache-kafka/core
RUN cd /tiered-storage-for-apache-kafka/core \
Expand All @@ -38,6 +39,11 @@ RUN cd /tiered-storage-for-apache-kafka/gcs \
&& tar -xf gcs-${_VERSION}.tgz --strip-components=1 \
&& rm gcs-${_VERSION}.tgz

COPY storage/azure/build/distributions/azure-${_VERSION}.tgz /tiered-storage-for-apache-kafka/azure
RUN cd /tiered-storage-for-apache-kafka/azure \
&& tar -xf azure-${_VERSION}.tgz --strip-components=1 \
&& rm azure-${_VERSION}.tgz

# Installing JMX exporter agent
ARG JMX_EXPORTER_VERSION=0.18.0
RUN mkdir -p /opt/prometheus/jmx-exporter
Expand Down
3 changes: 3 additions & 0 deletions e2e/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ dependencies {

testImplementation "io.aiven:testcontainers-fake-gcs-server:$testcontainersFakeGcsServerVersion"

testImplementation platform("com.azure:azure-sdk-bom:$azureSdkVersion")
testImplementation "com.azure:azure-storage-blob"

testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.e2e;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.ListBlobsOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

public class AzureSingleBrokerTest extends SingleBrokerTest {
private static final int BLOB_STORAGE_PORT = 10000;
static final String NETWORK_ALIAS = "blob-storage";

static final GenericContainer<?> AZURITE_SERVER =
new GenericContainer<>(DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite"))
.withExposedPorts(BLOB_STORAGE_PORT)
.withCommand("azurite-blob --blobHost 0.0.0.0")
.withNetwork(NETWORK)
.withNetworkAliases(NETWORK_ALIAS);

static final String AZURE_CONTAINER = "test-container";

static BlobContainerClient blobContainerClient;

@BeforeAll
static void init() throws Exception {
AZURITE_SERVER.start();

// The well-known Azurite account name and key.
final String accountName = "devstoreaccount1";
final String accountKey =
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";

final String endpointForTestCode =
"http://127.0.0.1:" + AZURITE_SERVER.getMappedPort(BLOB_STORAGE_PORT) + "/devstoreaccount1";
final String connectionString = "DefaultEndpointsProtocol=http;"
+ "AccountName=" + accountName + ";"
+ "AccountKey=" + accountKey + ";"
+ "BlobEndpoint=" + endpointForTestCode + ";";
final var blobServiceClient = new BlobServiceClientBuilder()
.connectionString(connectionString)
.buildClient();
blobContainerClient = blobServiceClient.createBlobContainer(AZURE_CONTAINER);

final String endpointForKafka = "http://" + NETWORK_ALIAS + ":" + BLOB_STORAGE_PORT + "/devstoreaccount1";
setupKafka(kafka -> kafka.withEnv("KAFKA_RSM_CONFIG_STORAGE_BACKEND_CLASS",
"io.aiven.kafka.tieredstorage.storage.azure.AzureBlobStorage")
.withEnv("KAFKA_REMOTE_LOG_STORAGE_MANAGER_CLASS_PATH",
"/tiered-storage-for-apache-kafka/core/*:/tiered-storage-for-apache-kafka/azure/*")
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_CONTAINER_NAME", AZURE_CONTAINER)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_NAME", accountName)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ACCOUNT_KEY", accountKey)
.withEnv("KAFKA_RSM_CONFIG_STORAGE_AZURE_ENDPOINT_URL", endpointForKafka)
.dependsOn(AZURITE_SERVER));
}

@AfterAll
static void cleanup() {
stopKafka();

AZURITE_SERVER.stop();

cleanupStorage();
}

@Override
boolean assertNoTopicDataOnTierStorage(final String topicName, final Uuid topicId) {
final String prefix = String.format("%s-%s", topicName, topicId.toString());

final var list = blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(prefix), null);
return list.stream().findAny().isEmpty();
}

@Override
List<String> remotePartitionFiles(final TopicIdPartition topicIdPartition) {
return blobContainerClient.listBlobs().stream()
.map(BlobItem::getName)
.map(k -> k.substring(k.lastIndexOf('/') + 1))
.sorted()
.collect(Collectors.toList());
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ include 'core'
include 'storage'
include 'storage:core'
include 'storage:filesystem'
include 'storage:azure'
include 'storage:gcs'
include 'storage:s3'
include 'e2e'
Expand Down
35 changes: 35 additions & 0 deletions storage/azure/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

archivesBaseName = "storage-azure"

dependencies {
implementation project(":storage:core")

implementation platform("com.azure:azure-sdk-bom:$azureSdkVersion")
implementation ("com.azure:azure-identity") {
exclude group: "org.slf4j"
}
implementation ("com.azure:azure-storage-blob") {
exclude group: "org.slf4j"
}

implementation project(":commons")

testImplementation(testFixtures(project(":storage:core")))

testImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
}
Loading

0 comments on commit 3b44bd9

Please sign in to comment.