Skip to content

Commit

Permalink
[ISSUE apache#6154] Support Amazon S3 backend in TieredStorage (apach…
Browse files Browse the repository at this point in the history
…e#6495)

* support s3 backend in tiered storage

* refactor(tieredstorage): Unify all object storage configuration properties

1. Unify all object storage configuration properties

* refactor(tieredstorage): replace some lambda function with more simple expression

1. replace some lambda function with more simple expression

* style(tieredstorage): perfect comments on ChunkMetadata

1. perfect comments on ChunkMetadata

* refactor(tieredstorage): perfect lambda expression

1. perfect lambda expression

* fix(tieredstorage): fix unmatched config attributes in brokerS3.conf

1. fix unmatched config attributes in brokerS3.conf

* feat(tieredstorage): More context in logging output

1. More context in logging output

* fix(tieredstorage): fix wrong concurrently put

1. fix wrong concurrently put

* test(tieredstorage): add UT to verify TieredFileSegmentInputStream

1. add UT to verify TieredFileSegmentInputStream

* refactor(tieredstorage): better code placement

1. better code placement

* refactor(tieredstorage): refactor TieredFileSegmentInputStream for better understandability

1. refactor TieredFileSegmentInputStream for better understandability

* feat(tieredstorage): support `reset` of TieredFileSegmentInputStream

1. support `reset` of TieredFileSegmentInputStream

* fix(tieredstorage): fix wrong position when failed in `S3FileSegment#commit0`

1. fix wrong position when failed in `S3FileSegment#commit0`

* fix(tieredstorage): fix still have upload buffer when already seal the segment

1. fix still have upload buffer when already seal the segment

* test(tieredstorage): fix wrong assertion

1. fix wrong assertion

* feat(tieredstorage): support switch to enable merge chunks into segment

1. support switch to enable merge chunks into segment

* feat(tieredstorage): add more debug log in TieredMessageStore

1. add more debug log in TieredMessageStore

* style(tieredstorage): use rmq code style

1. use rmq code style

* feat(tieredstorage): add metrics for S3 provider

1. add metrics for S3 provider

* fix(tieredstorage): resolve conflicts after rebasing master

1. resolve conflicts after rebasing master

Closes apache#6624

* style(tieredstorage): change log level

1. change log level

Closes apache#6154

* build(controller): build tieredstorage with bazel

1. build tieredstorage with bazel

* build(controller): build tieredstorage with bazel

1. build tieredstorage with bazel

* style(tieredstorage): change log level

1. change log level

Closes apache#6154

* test(tieredstorage): ignore tests about S3Mock

1. ignore tests about S3Mock

* test(tieredstorage): ignore tests about S3Mock

1. ignore tests about S3Mock
  • Loading branch information
TheR1sing3un authored Jun 7, 2023
1 parent a325d14 commit 450d0d6
Show file tree
Hide file tree
Showing 44 changed files with 2,422 additions and 65 deletions.
3 changes: 3 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ maven_install(
"org.slf4j:jul-to-slf4j:2.0.6",
"org.jetbrains:annotations:23.1.0",
"io.github.aliyunmq:rocketmq-shaded-slf4j-api-bridge:1.0.0",
"software.amazon.awssdk:s3:2.20.29",
"com.fasterxml.jackson.core:jackson-databind:2.13.4.2",
"com.adobe.testing:s3mock-junit4:2.11.0",
],
fetch_sources = True,
repositories = [
Expand Down
31 changes: 31 additions & 0 deletions distribution/conf/tieredstorage/brokerS3.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
messageStorePlugIn = org.apache.rocketmq.tieredstore.TieredMessageStore
tieredBackendServiceProvider = org.apache.rocketmq.tieredstore.provider.s3.S3FileSegment
tieredStorageLevel = FORCE
tieredStoreGroupCommitCount = 1
tieredStoreGroupCommitSize = 50
objectStoreRegion = ""
objectStoreBucket = ""
objectStoreAccessKey = ""
objectStoreSecretKey = ""
57 changes: 57 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@
<opentelemetry.version>1.26.0</opentelemetry.version>
<opentelemetry-exporter-prometheus.version>1.26.0-alpha</opentelemetry-exporter-prometheus.version>
<jul-to-slf4j.version>2.0.6</jul-to-slf4j.version>
<s3.version>2.20.29</s3.version>
<jackson-databind.version>2.13.4.2</jackson-databind.version>

<!-- Test dependencies -->
<junit.version>4.13.2</junit.version>
Expand All @@ -144,6 +146,7 @@
<powermock-version>2.0.9</powermock-version>
<awaitility.version>4.1.0</awaitility.version>
<truth.version>0.30</truth.version>
<s3mock-junit4.version>2.11.0</s3mock-junit4.version>

<!-- Build plugin dependencies -->
<versions-maven-plugin.version>2.2</versions-maven-plugin.version>
Expand Down Expand Up @@ -964,6 +967,60 @@
<artifactId>jul-to-slf4j</artifactId>
<version>${jul-to-slf4j.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${s3.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>com.adobe.testing</groupId>
<artifactId>s3mock-junit4</artifactId>
<version>${s3mock-junit4.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>annotations</artifactId>
<groupId>software.amazon.awssdk</groupId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>http-client-spi</artifactId>
<groupId>software.amazon.awssdk</groupId>
</exclusion>
<exclusion>
<artifactId>json-utils</artifactId>
<groupId>software.amazon.awssdk</groupId>
</exclusion>
<exclusion>
<artifactId>profiles</artifactId>
<groupId>software.amazon.awssdk</groupId>
</exclusion>
<exclusion>
<artifactId>regions</artifactId>
<groupId>software.amazon.awssdk</groupId>
</exclusion>
<exclusion>
<artifactId>sdk-core</artifactId>
<groupId>software.amazon.awssdk</groupId>
</exclusion>
<exclusion>
<artifactId>utils</artifactId>
<groupId>software.amazon.awssdk</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-dataformat-cbor</artifactId>
<groupId>com.fasterxml.jackson.dataformat</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
2 changes: 2 additions & 0 deletions tieredstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ java_library(
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_apache_tomcat_annotations_api",
"@maven//:com_alibaba_fastjson",
"@maven//:software_amazon_awssdk_s3",
],
)

Expand Down Expand Up @@ -66,6 +67,7 @@ java_library(
"@maven//:com_google_guava_guava",
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
"@maven//:io_github_aliyunmq_rocketmq_shaded_slf4j_api_bridge",
"@maven//:com_adobe_testing_s3mock_junit4",
],
)

Expand Down
10 changes: 10 additions & 0 deletions tieredstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,20 @@
</exclusions>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.adobe.testing</groupId>
<artifactId>s3mock-junit4</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String group, String
return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
});
}
logger.debug("TieredMessageStore#getMessageAsync: get message from next store: topic: {}, queue: {}, queue offset: {}",
topic, queueId, offset);
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ public boolean check(TieredStorageLevel targetLevel) {

private String tieredStoreFilepath = "";

// only for oss storage provider
private String ossEndpoint = "";
private String ossBucket = "";
private String ossAccessKey = "";
private String ossSecretKey = "";
private String objectStoreRegion = "";

private String objectStoreBucket = "";

private String objectStoreAccessKey = "";

private String objectStoreSecretKey = "";

private boolean enableMerge = false;

public static String localHostName() {
try {
Expand Down Expand Up @@ -356,35 +360,43 @@ public void setTieredStoreFilepath(String tieredStoreFilepath) {
this.tieredStoreFilepath = tieredStoreFilepath;
}

public String getOssEndpoint() {
return ossEndpoint;
public void setObjectStoreRegion(String objectStoreRegion) {
this.objectStoreRegion = objectStoreRegion;
}

public String getObjectStoreBucket() {
return objectStoreBucket;
}

public void setObjectStoreBucket(String objectStoreBucket) {
this.objectStoreBucket = objectStoreBucket;
}

public void setOssEndpoint(String ossEndpoint) {
this.ossEndpoint = ossEndpoint;
public String getObjectStoreAccessKey() {
return objectStoreAccessKey;
}

public String getOssBucket() {
return ossBucket;
public void setObjectStoreAccessKey(String objectStoreAccessKey) {
this.objectStoreAccessKey = objectStoreAccessKey;
}

public void setOssBucket(String ossBucket) {
this.ossBucket = ossBucket;
public String getObjectStoreSecretKey() {
return objectStoreSecretKey;
}

public String getOssAccessKey() {
return ossAccessKey;
public void setObjectStoreSecretKey(String objectStoreSecretKey) {
this.objectStoreSecretKey = objectStoreSecretKey;
}

public void setOssAccessKey(String ossAccessKey) {
this.ossAccessKey = ossAccessKey;
public String getObjectStoreRegion() {
return objectStoreRegion;
}

public String getOssSecretKey() {
return ossSecretKey;
public boolean isEnableMerge() {
return enableMerge;
}

public void setOssSecretKey(String ossSecretKey) {
this.ossSecretKey = ossSecretKey;
public void setEnableMerge(boolean enableMerge) {
this.enableMerge = enableMerge;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ protected void loadFromMetadata() {
segment.setEndTimestamp(metadata.getEndTimestamp());
if (metadata.getStatus() == FileSegmentMetadata.STATUS_SEALED) {
segment.setFull(false);
segment.sealFile();
}
// TODO check coda/size
fileSegmentList.add(segment);
Expand Down Expand Up @@ -254,7 +255,7 @@ protected TieredFileSegment getFileToWrite() {
if (!segment.isFull()) {
return segment;
}
if (segment.commit()) {
if (segment.commitAndSealFile()) {
try {
metadataStore.updateFileSegment(segment);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ public enum TieredStoreErrorCode {
NO_NEW_DATA,
STORAGE_PROVIDER_ERROR,
IO_ERROR,
SEGMENT_SEALED,
UNKNOWN
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

public class TieredStoreException extends RuntimeException {
private TieredStoreErrorCode errorCode;
private int position = -1;
private long position = -1;

private String requestId;

Expand All @@ -41,11 +41,11 @@ public void setErrorCode(TieredStoreErrorCode errorCode) {
this.errorCode = errorCode;
}

public int getPosition() {
public long getPosition() {
return position;
}

public void setPosition(int position) {
public void setPosition(long position) {
this.position = position;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
private final ReentrantLock bufferLock = new ReentrantLock();
private final Semaphore commitLock = new Semaphore(1);
private List<ByteBuffer> uploadBufferList = new ArrayList<>();
private boolean full;
private volatile boolean full;
protected final FileSegmentType fileType;
protected final MessageQueue messageQueue;
protected final TieredMessageStoreConfig storeConfig;
Expand Down Expand Up @@ -312,6 +312,31 @@ public boolean commit() {
return result;
}

public boolean commitAndSealFile() {
if (closed) {
return false;
}
if (!this.isFull()) {
logger.error("Failed to commitAndSealFile, file is not full, file: {}, appendPosition: {}, commitPosition: {}, maxSize: {}", getPath(), appendPosition, commitPosition, maxSize);
return false;
}
// first time to commit, try to wait inflight commit request to be completed
inflightCommitRequest.join();
boolean success = false;
for (int i = 0; i < 3; i++) {
if (!needCommit() || commit()) {
success = true;
break;
}
}
if (!success) {
logger.error("Failed to commit all data, file: {}, appendPosition: {}, commitPosition: {}, maxSize: {}", getPath(), appendPosition, commitPosition, maxSize);
return false;
}
sealFile();
return true;
}

public CompletableFuture<Boolean> commitAsync() {
if (closed) {
return CompletableFuture.completedFuture(false);
Expand Down Expand Up @@ -425,6 +450,8 @@ public static FileSegmentType valueOf(int type) {
throw new IllegalStateException("Unexpected value: " + type);
}
}


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface TieredStoreProvider {
*/
void createFile();

/**
* Seal file with given path in backend file system
*/
void sealFile();

/**
* Destroy file with given path in backend file system
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public TieredFileSegmentInputStream(TieredFileSegment.FileSegmentType fileType,
this.fileType = fileType;
this.contentLength = contentLength;
this.uploadBufferList = uploadBufferList;
if (uploadBufferList.size() > 0) {
if (uploadBufferList != null && uploadBufferList.size() > 0) {
this.curBuffer = uploadBufferList.get(curReadBufferIndex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ public void createFile() {
}
}

@Override
public void sealFile() {

}

@Override
public void destroyFile() {
try {
Expand Down
Loading

0 comments on commit 450d0d6

Please sign in to comment.