commitAsync() {
if (closed) {
return CompletableFuture.completedFuture(false);
@@ -425,6 +450,8 @@ public static FileSegmentType valueOf(int type) {
throw new IllegalStateException("Unexpected value: " + type);
}
}
+
+
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index f043e07f394..e9f0926f1e3 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -48,6 +48,11 @@ public interface TieredStoreProvider {
*/
void createFile();
+ /**
+ * Seal file with given path in backend file system
+ */
+ void sealFile();
+
/**
* Destroy file with given path in backend file system
*/
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
index d5118c1464e..f85ca68e1b3 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
@@ -60,7 +60,7 @@ public TieredFileSegmentInputStream(TieredFileSegment.FileSegmentType fileType,
this.fileType = fileType;
this.contentLength = contentLength;
this.uploadBufferList = uploadBufferList;
- if (uploadBufferList.size() > 0) {
+ if (uploadBufferList != null && uploadBufferList.size() > 0) {
this.curBuffer = uploadBufferList.get(curReadBufferIndex);
}
}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 7032799eb23..4a31199e8ed 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -128,6 +128,11 @@ public void createFile() {
}
}
+ @Override
+ public void sealFile() {
+
+ }
+
@Override
public void destroyFile() {
try {
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/ChunkMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/ChunkMetadata.java
new file mode 100644
index 00000000000..e7e0f3f519a
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/ChunkMetadata.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+
+/**
+ * Metadata of a chunk in S3.
+ *
+ *
+ * There are two types of chunks in S3:
+ *
+ * - Normal chunk, represents a normal chunk in S3, which size is usually less than {@link TieredMessageStoreConfig#getTieredStoreGroupCommitSize()} ()}
+ *
- Segment chunk, means that this all normal chunks in one logic segment have been merged into a single chunk, which is named as segment chunk,
+ * which size is usually equals to {@link TieredMessageStoreConfig#getTieredStoreCommitLogMaxSize()} or {@link TieredMessageStoreConfig#getTieredStoreConsumeQueueMaxSize()}
+ *
+ * Once a segment chunk is created, it will never be changed, and we should delete all normal chunks in this segment.
+ */
+public class ChunkMetadata {
+
+ /**
+ * Name of the chunk in S3. Format:
+ *
+ * Chunk:
+ *
+ * {@link S3FileSegment#getStorePath()}/chunk/chunk-${startPosition}
+ *
+ *
+ * Segment:
+ *
+ * {@link S3FileSegment#getStorePath()}/segment/segment-${startPosition}
+ *
+ */
+ private String chunkName;
+
+ private long startPosition;
+
+ private int chunkSize;
+
+ private boolean isSegmentType;
+
+ public ChunkMetadata() {
+
+ }
+
+ public ChunkMetadata(String chunkName, long startPosition, int chunkSize) {
+ this.startPosition = startPosition;
+ this.chunkName = chunkName;
+ this.chunkSize = chunkSize;
+ this.isSegmentType = this.chunkName.contains("segment");
+ }
+
+ public int getChunkSize() {
+ return chunkSize;
+ }
+
+ public String getChunkName() {
+ return chunkName;
+ }
+
+ public long getStartPosition() {
+ return startPosition;
+ }
+
+ public void setChunkName(String chunkName) {
+ this.chunkName = chunkName;
+ }
+
+ public void setStartPosition(long startPosition) {
+ this.startPosition = startPosition;
+ }
+
+ public void setChunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+
+ public long getEndPosition() {
+ return startPosition + chunkSize - 1;
+ }
+
+ public boolean isSegmentType() {
+ return isSegmentType;
+ }
+
+ @Override
+ public String toString() {
+ return "ChunkMetadata{" +
+ "chunkName='" + chunkName + '\'' +
+ ", startPosition=" + startPosition +
+ ", endPosition=" + getEndPosition() +
+ '}';
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegment.java
new file mode 100644
index 00000000000..c02efef054e
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegment.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import io.opentelemetry.api.common.Attributes;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
+import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
+import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_QUEUE_ID;
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC;
+
+public class S3FileSegment extends TieredFileSegment {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+ /**
+ * The path of the file segment in S3. Format:
+ *
+ * ${hash of clusterName}/${clusterName}/${brokerName}/${topicName}/${queueId}/${fileType}/seg-${baseOffset}
+ *
+ */
+ private final String storePath;
+
+ /**
+ * The path of the chunk file in S3. Format:
+ *
+ * {@link #storePath}/chunk
+ *
+ */
+ private final String chunkPath;
+
+ /**
+ * The path of the segment file in S3. Format:
+ *
+ * {@link #storePath}/segment
+ *
+ */
+ private final String segmentPath;
+
+ private final TieredStorageS3Client client;
+
+ private final S3FileSegmentMetadata metadata;
+
+ private final Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder().put(LABEL_TOPIC, messageQueue.getTopic())
+ .put(LABEL_QUEUE_ID, messageQueue.getQueueId()).put(LABEL_FILE_TYPE, this.fileType.name().toLowerCase()).build();
+
+ /**
+ * Executor for merging chunks into segment or deleting chunks.
+ *
+ * TODO: Better to use a thread pool.
+ */
+ private static final ExecutorService MERGE_CHUNKS_INTO_SEGMENT_EXECUTOR = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("S3FileSegment_MergeChunksIntoSegmentExecutor"));
+
+ // TODO: Uses the specified asynchronous thread pool
+
+ public S3FileSegment(FileSegmentType fileType, MessageQueue messageQueue, long baseOffset,
+ TieredMessageStoreConfig storeConfig) {
+ super(fileType, messageQueue, baseOffset, storeConfig);
+ String clusterName = storeConfig.getBrokerClusterName();
+ String hash = String.valueOf(clusterName.hashCode());
+ this.storePath = hash + File.separator + clusterName + File.separator + messageQueue.getBrokerName() +
+ File.separator + messageQueue.getTopic() + File.separator + messageQueue.getQueueId() + File.separator + fileType + File.separator + "seg-" + baseOffset;
+ this.chunkPath = this.storePath + File.separator + "chunk";
+ this.segmentPath = this.storePath + File.separator + "segment";
+ this.client = TieredStorageS3Client.getInstance(storeConfig);
+ this.metadata = new S3FileSegmentMetadata();
+ this.initialize();
+ }
+
+ private void initialize() {
+ // check if the file segment exists
+ CompletableFuture> listSegments = this.client.listChunks(this.segmentPath);
+ CompletableFuture> listChunks = this.client.listChunks(this.chunkPath);
+ List segments = listSegments.join();
+ if (segments.size() > 1) {
+ throw new RuntimeException("The segment " + segmentPath + " should be only one, but now have " + segments.size() + " segments, please check it.");
+ }
+ List chunks = listChunks.join();
+ if (segments.size() == 1) {
+ // now segment exist
+ // add segment into metadata
+ ChunkMetadata segment = segments.get(0);
+ this.metadata.setSegment(segment);
+ // delete chunks
+ this.client.deleteObjets(chunks.stream().map(ChunkMetadata::getChunkName).collect(Collectors.toList())).join();
+ } else {
+ // now segment not exist
+ // add all chunks into metadata
+ checkAndLoadChunks(chunks);
+ }
+ }
+
+ private void checkAndLoadChunks(List chunks) {
+ if (chunks.size() == 0) {
+ return;
+ }
+ for (ChunkMetadata chunk : chunks) {
+ if (!this.metadata.addChunk(chunk)) {
+ // the chunk is not valid
+ LOGGER.error("Check and load chunks failed, the chunk: {} is not valid, now chunks last end position: {}, please check it.", chunk, this.metadata.getEndPosition());
+ throw new RuntimeException("The chunk: " + chunk + " is not valid, now chunks last end position: " + this.metadata.getEndPosition() + ", please check it.");
+ }
+ }
+ }
+
+ @Override
+ public String getPath() {
+ return this.storePath;
+ }
+
+ public String getSegmentPath() {
+ return segmentPath;
+ }
+
+ public String getChunkPath() {
+ return chunkPath;
+ }
+
+ @Override
+ public long getSize() {
+ return this.metadata.getSize();
+ }
+
+ @Override
+ public boolean exists() {
+ return this.client.exist(this.storePath).join();
+ }
+
+ @Override
+ public void createFile() {
+
+ }
+
+ /**
+ * Merges all normal chunks into a segment file.
+ */
+ @Override
+ public void sealFile() {
+ // check if the segment file exists
+ if (this.metadata.isSealed() && this.metadata.getChunkCount() == 0) {
+ return;
+ }
+ // merge all chunks into a segment file and delete all chunks
+ MERGE_CHUNKS_INTO_SEGMENT_EXECUTOR.submit(this::trySealFile);
+ }
+
+ private void trySealFile() {
+ while (true) {
+ if (this.metadata.isSealed() && this.metadata.getChunkCount() == 0)
+ return;
+
+ boolean success = true;
+
+ if (!this.storeConfig.isEnableMerge()) {
+ this.metadata.setSealed(true);
+ }
+
+ if (!this.metadata.isSealed()) {
+ // merge all chunks
+ String segmentName = this.segmentPath + File.separator + "segment-" + 0;
+ boolean merged = this.client.mergeAllChunksIntoSegment(this.metadata.getChunks(), segmentName).join();
+ if (merged) {
+ // set segment
+ this.metadata.setSegment(new ChunkMetadata(segmentName, 0, (int) this.metadata.getSize()));
+ } else {
+ LOGGER.error("Merge chunks into segment failed, chunk path is {}, segment path is {}.", this.chunkPath, this.segmentPath);
+ success = false;
+ }
+ }
+ if (this.storeConfig.isEnableMerge() && success) {
+ // old chunks still exist, keep deleting them
+ List chunkKeys = this.metadata.getChunks().stream().map(ChunkMetadata::getChunkName).collect(Collectors.toList());
+ List undeleteList = this.client.deleteObjets(chunkKeys).join();
+ if (undeleteList.isEmpty()) {
+ this.metadata.removeAllChunks();
+ } else {
+ success = false;
+ LOGGER.error("Delete chunks failed, chunk path is {}, undelete list is {}.", this.chunkPath, undeleteList);
+ }
+ }
+ if (success)
+ return;
+ // unsuccessful, retry
+ try {
+ Thread.sleep(1000);
+ } catch (Exception ignore) {
+
+ }
+
+ }
+ }
+
+ public boolean isSealed() {
+ return this.metadata.isSealed();
+ }
+
+ @Override
+ public void destroyFile() {
+ this.client.deleteObjects(this.storePath).join();
+ this.metadata.clear();
+ }
+
+ @Override
+ public CompletableFuture read0(long position, int length) {
+ CompletableFuture completableFuture = new CompletableFuture<>();
+ List chunks;
+ try {
+ chunks = this.metadata.seek(position, length);
+ } catch (IndexOutOfBoundsException e) {
+ LOGGER.error("Read position {} and length {} out of range, the file segment size is {}.", position, length, this.metadata.getSize());
+ completableFuture.completeExceptionally(new TieredStoreException(TieredStoreErrorCode.DOWNLOAD_LENGTH_NOT_CORRECT, "read data from segment error because of position or length not correct"));
+ return completableFuture;
+ }
+ long endPosition = position + length - 1;
+ ConcurrentByteBuffer concurrentByteBuffer = new ConcurrentByteBuffer(length);
+ List> subFutures = new ArrayList<>(chunks.size());
+ chunks.forEach(chunk -> {
+ long startPositionInChunk = position >= chunk.getStartPosition() ? position - chunk.getStartPosition() : 0;
+ long endPositionInChunk = endPosition <= chunk.getEndPosition() ? endPosition - chunk.getStartPosition() : chunk.getChunkSize() - 1;
+ CompletableFuture future = this.client.readChunk(chunk.getChunkName(), startPositionInChunk, endPositionInChunk);
+ CompletableFuture subFuture = future.whenComplete((bytes, throwable) -> {
+ if (throwable != null) {
+ LOGGER.error("Failed to read data from s3, chunk: {}, start position: {}, end position: {}", chunk, startPositionInChunk, endPositionInChunk, throwable);
+ TieredStoreException exception = new TieredStoreException(TieredStoreErrorCode.IO_ERROR, "read data from s3 error");
+ completableFuture.completeExceptionally(exception);
+ } else {
+ try {
+ concurrentByteBuffer.put(bytes, 0, bytes.length, (int) (chunk.getStartPosition() + startPositionInChunk - position));
+ } catch (Exception e) {
+ LOGGER.error("Failed to put data from s3 into buffer, chunk: {}, start position: {}, end position: {}", chunk, startPositionInChunk, endPositionInChunk, e);
+ TieredStoreException exception = new TieredStoreException(TieredStoreErrorCode.UNKNOWN, "put data from s3 into buffer error");
+ completableFuture.completeExceptionally(exception);
+ }
+ }
+ });
+ subFutures.add(subFuture);
+ });
+ CompletableFuture.allOf(subFutures.toArray(new CompletableFuture[chunks.size()])).whenComplete((v, throwable) -> {
+ if (throwable != null) {
+ LOGGER.error("Failed to read data from s3, position: {}, length: {}", position, length, throwable);
+ completableFuture.completeExceptionally(new TieredStoreException(TieredStoreErrorCode.IO_ERROR, "wait all sub download tasks complete error"));
+ } else {
+ ByteBuffer byteBuffer = concurrentByteBuffer.close();
+ byteBuffer.rewind();
+ completableFuture.complete(byteBuffer);
+ TieredStoreMetricsManager.downloadBytes.record(length, attributes);
+ }
+ });
+ return completableFuture;
+ }
+
+ @Override
+ public CompletableFuture commit0(TieredFileSegmentInputStream inputStream, long position, int length,
+ boolean append) {
+ // TODO: Deal with the case that the param: append is false
+ CompletableFuture completableFuture = new CompletableFuture<>();
+ // check if now the segment is sealed
+ if (this.metadata.isSealed()) {
+ LOGGER.error("The segment is sealed, the position: {}, the length: {}.", position, length);
+ TieredStoreException exception = new TieredStoreException(TieredStoreErrorCode.SEGMENT_SEALED, "the segment is sealed");
+ exception.setPosition(this.metadata.getEndPosition() + 1);
+ completableFuture.completeExceptionally(exception);
+ return completableFuture;
+ }
+ // check if the position is valid
+ if (length < 0 || position != this.metadata.getEndPosition() + 1) {
+ LOGGER.error("The position is invalid, the position: {}, the length: {}, now segment end position: {}.", position, length, this.metadata.getEndPosition());
+ TieredStoreException exception = new TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, "the position is invalid");
+ exception.setPosition(this.metadata.getEndPosition() + 1);
+ completableFuture.completeExceptionally(exception);
+ return completableFuture;
+ }
+ // upload chunk
+ String chunkPath = this.chunkPath + File.separator + "chunk-" + position;
+
+ this.client.writeChunk(chunkPath, inputStream, length).whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ LOGGER.error("Failed to write data to s3, position: {}, length: {}", position, length, throwable);
+ TieredStoreException exception = new TieredStoreException(TieredStoreErrorCode.IO_ERROR, "write data to s3 error");
+ exception.setPosition(position);
+ completableFuture.completeExceptionally(exception);
+ } else {
+ if (result) {
+ TieredStoreMetricsManager.uploadBytes.record(length, attributes);
+ ChunkMetadata chunk = new ChunkMetadata(chunkPath, position, length);
+ if (!this.metadata.addChunk(chunk)) {
+ // the chunk is not valid
+ LOGGER.error("Add chunk after uploading chunk to S3 failed, the chunk: {} is not valid, now chunks last end position: {}, please check it.", chunk, this.metadata.getEndPosition());
+ throw new RuntimeException("The chunk: " + chunk + " is not valid, now chunks last end position: " + this.metadata.getEndPosition() + ", please check it.");
+ }
+ completableFuture.complete(true);
+ } else {
+ completableFuture.complete(false);
+ }
+ }
+ });
+ return completableFuture;
+ }
+
+ public S3FileSegmentMetadata getMetadata() {
+ return metadata;
+ }
+
+ public String getStorePath() {
+ return storePath;
+ }
+
+ public TieredStorageS3Client getClient() {
+ return client;
+ }
+
+ static class ConcurrentByteBuffer {
+ private final ByteBuffer byteBuffer;
+ private final int length;
+
+ private final ReentrantLock reentrantLock;
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ public ConcurrentByteBuffer(int length) {
+ this.length = length;
+ this.byteBuffer = ByteBuffer.allocate(length);
+ this.byteBuffer.limit(this.length);
+ this.reentrantLock = new ReentrantLock();
+ }
+
+ public void put(byte[] bytes, int bytesIndex, int writeLength, int writePosition) throws Exception {
+ if (closed.get()) {
+ throw new RuntimeException("The ConcurrentByteBuffer has been closed");
+ }
+ this.reentrantLock.lock();
+ try {
+ this.byteBuffer.position(writePosition);
+ this.byteBuffer.put(bytes, bytesIndex, writeLength);
+ } catch (Exception e) {
+ LOGGER.error("Put bytes into byte buffer error. bytesIndex: {}, writeLength: {}, writePosition: {}, limit: {}", bytesIndex, writeLength, writePosition, this.byteBuffer.limit(), e);
+ throw e;
+ } finally {
+ this.reentrantLock.unlock();
+ }
+ }
+
+ public ByteBuffer close() {
+ this.closed.set(true);
+ this.reentrantLock.lock();
+ try {
+ this.byteBuffer.rewind();
+ return this.byteBuffer;
+ } finally {
+ this.reentrantLock.unlock();
+ }
+ }
+ }
+
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegmentMetadata.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegmentMetadata.java
new file mode 100644
index 00000000000..19c77a5a66b
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegmentMetadata.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class S3FileSegmentMetadata {
+
+ private final LinkedList chunks = new LinkedList<>();
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+
+ private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+ private volatile boolean isSealed = false;
+
+ private ChunkMetadata segment;
+
+ public S3FileSegmentMetadata() {
+ }
+
+ /**
+ * Seek the chunks that need to be read, which is the intersection of the chunks and the range of [position, position + length)
+ *
+ * @param position start position
+ * @param length data length
+ * @return the chunks that need to be read
+ * @throws IndexOutOfBoundsException if position or length is negative or position
+ */
+ public List seek(long position, int length) throws IndexOutOfBoundsException {
+ readLock.lock();
+ try {
+ long endPosition = position + length - 1;
+ if (position < 0 || length < 0 || position < getStartPosition() || endPosition > getEndPosition()) {
+ throw new IndexOutOfBoundsException("position: " + position + ", length: " + length + ", Metadata: start: " + getStartPosition() + ", end: " + getEndPosition());
+ }
+ List needChunks = new LinkedList<>();
+ if (length == 0)
+ return needChunks;
+ if (segment != null) {
+ needChunks.add(segment);
+ return needChunks;
+ }
+ for (ChunkMetadata chunk : chunks) {
+ if (endPosition < chunk.getStartPosition())
+ break;
+ if (position > chunk.getEndPosition())
+ continue;
+ if (position <= chunk.getEndPosition() || endPosition >= chunk.getStartPosition()) {
+ needChunks.add(chunk);
+ }
+ }
+ return needChunks;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public boolean addChunk(ChunkMetadata chunk) {
+ this.writeLock.lock();
+ try {
+ if (chunks.size() == 0 && chunk.getStartPosition() != 0) {
+ return false;
+ }
+ if (chunks.size() > 0 && chunks.getLast().getEndPosition() + 1 != chunk.getStartPosition()) {
+ return false;
+ }
+ chunks.addLast(chunk);
+ return true;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public void setSegment(ChunkMetadata segment) {
+ this.writeLock.lock();
+ try {
+ this.isSealed = true;
+ this.segment = segment;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public void removeAllChunks() {
+ this.writeLock.lock();
+ try {
+ this.chunks.clear();
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public long getStartPosition() {
+ this.readLock.lock();
+ try {
+ if (segment != null)
+ return segment.getStartPosition();
+ if (chunks.size() == 0)
+ return -1;
+ return chunks.getFirst().getStartPosition();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public long getEndPosition() {
+ this.readLock.lock();
+ try {
+ if (segment != null)
+ return segment.getEndPosition();
+ if (chunks.size() == 0)
+ return -1;
+ return chunks.getLast().getEndPosition();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public long getSize() {
+ long start = getStartPosition();
+ long end = getEndPosition();
+ if (start == -1)
+ return 0;
+ return end - start + 1;
+ }
+
+ public void clear() {
+ this.writeLock.lock();
+ try {
+ chunks.clear();
+ segment = null;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public long getChunkCount() {
+ this.readLock.lock();
+ try {
+ return chunks.size();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public boolean isSealed() {
+ return isSealed;
+ }
+
+ public List getChunks() {
+ this.readLock.lock();
+ try {
+ return new ArrayList<>(chunks);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public void setSealed(boolean sealed) {
+ this.isSealed = sealed;
+ }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/TieredStorageS3Client.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/TieredStorageS3Client.java
new file mode 100644
index 00000000000..912732076aa
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/s3/TieredStorageS3Client.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import io.opentelemetry.api.common.AttributesBuilder;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeletedObject;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_OPERATION;
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_SUCCESS;
+
+public class TieredStorageS3Client {
+
+ private static final String OPERATION_LIST_OBJECTS = "list_objects";
+
+ private static final String OPERATION_DELETE_OBJECTS = "delete_objects";
+
+ private static final String OPERATION_UPLOAD_OBJECT = "upload_object";
+
+ private static final String OPERATION_DOWNLOAD_OBJECT = "download_object";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+ private volatile static TieredStorageS3Client instance;
+
+ private final String region;
+
+ private final String bucket;
+
+ private final TieredMessageStoreConfig tieredMessageStoreConfig;
+
+ private final ExecutorService asyncRequestBodyExecutor;
+
+ private S3AsyncClient client;
+
+ public static TieredStorageS3Client getInstance(TieredMessageStoreConfig config) {
+ if (config == null) {
+ return instance;
+ }
+ if (instance == null) {
+ synchronized (TieredStorageS3Client.class) {
+ if (instance == null) {
+ instance = new TieredStorageS3Client(config, true);
+ }
+ }
+ }
+ return instance;
+ }
+
+ @VisibleForTesting
+ protected TieredStorageS3Client(TieredMessageStoreConfig config) {
+ this(config, false);
+ }
+
+ private TieredStorageS3Client(TieredMessageStoreConfig config, boolean createClient) {
+ this.tieredMessageStoreConfig = config;
+ this.region = config.getObjectStoreRegion();
+ this.bucket = config.getObjectStoreBucket();
+ if (createClient) {
+ AwsBasicCredentials basicCredentials = AwsBasicCredentials.create(this.tieredMessageStoreConfig.getObjectStoreAccessKey(), this.tieredMessageStoreConfig.getObjectStoreSecretKey());
+ this.client = S3AsyncClient.builder().credentialsProvider(() -> basicCredentials).region(Region.of(config.getObjectStoreRegion())).build();
+ }
+ this.asyncRequestBodyExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryImpl("S3AsyncRequestBodyExecutor_"));
+ }
+
+ public CompletableFuture writeChunk(String key, InputStream inputStream, long length) {
+ PutObjectRequest putObjectRequest = PutObjectRequest.builder().bucket(this.bucket).key(key).build();
+ AsyncRequestBody requestBody = AsyncRequestBody.fromInputStream(inputStream, length, this.asyncRequestBodyExecutor);
+ AttributesBuilder attributesBuilder = TieredStoreMetricsManager.newAttributesBuilder().put(LABEL_OPERATION, OPERATION_UPLOAD_OBJECT);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ CompletableFuture putObjectResponseCompletableFuture = this.client.putObject(putObjectRequest, requestBody);
+ CompletableFuture completableFuture = new CompletableFuture<>();
+ putObjectResponseCompletableFuture.whenComplete((putObjectResponse, throwable) -> {
+ if (throwable != null) {
+ LOGGER.error("Upload file to S3 failed, key: {}, region: {}, bucket: {}", key, this.region, this.bucket, throwable);
+ attributesBuilder.put(LABEL_SUCCESS, false);
+ completableFuture.complete(false);
+ } else {
+ attributesBuilder.put(LABEL_SUCCESS, true);
+ completableFuture.complete(true);
+ }
+ TieredStoreMetricsManager.providerRpcLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ });
+ return completableFuture;
+ }
+
+ public CompletableFuture> listChunks(String prefix) {
+ CompletableFuture> completableFuture = new CompletableFuture<>();
+ AttributesBuilder attributesBuilder = TieredStoreMetricsManager.newAttributesBuilder().put(LABEL_OPERATION, OPERATION_LIST_OBJECTS);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ CompletableFuture listFuture = this.listObjects(prefix);
+ listFuture.whenComplete((listObjectsV2Response, throwable) -> {
+ if (throwable != null) {
+ attributesBuilder.put(LABEL_SUCCESS, false);
+ LOGGER.error("List objects from S3 failed, prefix: {}, region: {}, bucket: {}", prefix, this.region, this.bucket, throwable);
+ completableFuture.complete(Collections.emptyList());
+ } else {
+ attributesBuilder.put(LABEL_SUCCESS, true);
+ listObjectsV2Response.contents().forEach(s3Object -> LOGGER.info("List objects from S3, key: {}, region: {}, bucket: {}", s3Object.key(), this.region, this.bucket));
+ completableFuture.complete(listObjectsV2Response.contents().stream().map(obj -> {
+ ChunkMetadata chunkMetadata = new ChunkMetadata();
+ String key = obj.key();
+ chunkMetadata.setChunkName(key);
+ chunkMetadata.setChunkSize(obj.size().intValue());
+ String[] paths = key.split("/");
+ String chunkSubName = paths[paths.length - 1];
+ Integer startPosition = Integer.valueOf(chunkSubName.split("-")[1]);
+ chunkMetadata.setStartPosition(startPosition);
+ return chunkMetadata;
+ }).sorted((o1, o2) -> (int) (o1.getStartPosition() - o2.getStartPosition())).collect(Collectors.toList()));
+ }
+ TieredStoreMetricsManager.providerRpcLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ });
+ return completableFuture;
+ }
+
+ public CompletableFuture listObjects(String prefix) {
+ AttributesBuilder attributesBuilder = TieredStoreMetricsManager.newAttributesBuilder().put(LABEL_OPERATION, OPERATION_LIST_OBJECTS);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ CompletableFuture listFuture = this.client.listObjectsV2(builder -> builder.bucket(this.bucket).prefix(prefix));
+ return listFuture.thenApply(resp -> {
+ attributesBuilder.put(LABEL_SUCCESS, true);
+ TieredStoreMetricsManager.providerRpcLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ return resp;
+ }).exceptionally(throwable -> {
+ attributesBuilder.put(LABEL_SUCCESS, false);
+ LOGGER.error("List objects from S3 failed, prefix: {}, region: {}, bucket: {}", prefix, this.region, this.bucket, throwable);
+ TieredStoreMetricsManager.providerRpcLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ return null;
+ });
+ }
+
+ public CompletableFuture exist(String prefix) {
+ AttributesBuilder attributesBuilder = TieredStoreMetricsManager.newAttributesBuilder().put(LABEL_OPERATION, OPERATION_LIST_OBJECTS);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ CompletableFuture listFuture = this.listObjects(prefix);
+ return listFuture.thenApply(resp -> {
+ attributesBuilder.put(LABEL_SUCCESS, true);
+ TieredStoreMetricsManager.providerRpcLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ return resp.contents().size() > 0;
+ }).exceptionally(throwable -> {
+ attributesBuilder.put(LABEL_SUCCESS, false);
+ LOGGER.error("Exist prefix failed, list objects from S3 failed, prefix: {}, region: {}, bucket: {}", prefix, this.region, this.bucket, throwable);
+ TieredStoreMetricsManager.providerRpcLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ return false;
+ });
+ }
+
+ public CompletableFuture
> deleteObjets(final List keys) {
+ if (keys == null || keys.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ List objects = keys.stream().map(key -> ObjectIdentifier.builder().key(key).build()).collect(Collectors.toList());
+ Delete delete = Delete.builder().objects(objects).build();
+ DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder().bucket(this.bucket).delete(delete).build();
+ AttributesBuilder attributesBuilder = TieredStoreMetricsManager.newAttributesBuilder().put(LABEL_OPERATION, OPERATION_DELETE_OBJECTS);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ return this.client.deleteObjects(deleteObjectsRequest).thenApply(resp -> {
+ attributesBuilder.put(LABEL_SUCCESS, true);
+ TieredStoreMetricsManager.providerRpcLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ List undeletedKeys;
+ if (resp.deleted().size() != keys.size()) {
+ List deleted = resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
+ undeletedKeys = keys.stream().filter(key -> !deleted.contains(key)).collect(Collectors.toList());
+ } else {
+ undeletedKeys = Collections.emptyList();
+ }
+ return undeletedKeys;
+ }).exceptionally(throwable -> {
+ LOGGER.error("Delete objects from S3 failed, keys: {}, region: {}, bucket: {}", keys, this.region, this.bucket, throwable);
+ attributesBuilder.put(LABEL_SUCCESS, false);
+ TieredStoreMetricsManager.providerRpcLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ return keys;
+ });
+ }
+
+ public CompletableFuture> deleteObjects(String prefix) {
+ CompletableFuture> readObjectsByPrefix = this.listObjects(prefix).
+ thenApply(resp -> resp.contents().stream().map(S3Object::key).collect(Collectors.toList()));
+ return readObjectsByPrefix.thenCompose(this::deleteObjets);
+ }
+
+ public CompletableFuture readChunk(String key, long startPosition, long endPosition) {
+ GetObjectRequest request = GetObjectRequest.builder().bucket(this.bucket).key(key).range("bytes=" + startPosition + "-" + endPosition).build();
+ CompletableFuture future = new CompletableFuture<>();
+ AttributesBuilder attributesBuilder = TieredStoreMetricsManager.newAttributesBuilder().put(LABEL_OPERATION, OPERATION_DOWNLOAD_OBJECT);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ this.client.getObject(request, AsyncResponseTransformer.toBytes()).whenComplete((response, throwable) -> {
+ if (throwable != null) {
+ LOGGER.error("Read chunk from S3 failed, key: {}, region: {}, bucket: {}", key, this.region, this.bucket, throwable);
+ attributesBuilder.put(LABEL_SUCCESS, false);
+ future.completeExceptionally(throwable);
+ } else {
+ attributesBuilder.put(LABEL_SUCCESS, true);
+ future.complete(response.asByteArray());
+ }
+ TieredStoreMetricsManager.providerRpcLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributesBuilder.build());
+ });
+ return future;
+ }
+
+ public CompletableFuture mergeAllChunksIntoSegment(List chunks, String segmentName) {
+ AsyncS3ChunksMerger merger = new AsyncS3ChunksMerger(segmentName, chunks);
+ return merger.run();
+ }
+
+ class AsyncS3ChunksMerger {
+ private final String segmentKey;
+ private String uploadId;
+ private final List completedParts;
+
+ private final List chunks;
+
+ public AsyncS3ChunksMerger(String segmentKey, List chunks) {
+ this.segmentKey = segmentKey;
+ this.uploadId = null;
+ this.completedParts = new ArrayList<>();
+ this.chunks = chunks;
+ }
+
+ public CompletableFuture run() {
+ return initiateUpload().thenCompose(uploadId -> {
+ List> uploadPartFutures = new ArrayList<>(chunks.size());
+ for (int i = 0; i < chunks.size(); i++) {
+ String chunkKey = chunks.get(i).getChunkName();
+ int partNumber = i + 1;
+ uploadPartFutures.add(uploadPart(partNumber, chunkKey));
+ }
+ return CompletableFuture.allOf(uploadPartFutures.toArray(new CompletableFuture[0]));
+ }).thenCompose(v -> completeUpload()).handle((resp, err) -> {
+ if (err != null) {
+ LOGGER.error("Merge all chunks into segment failed, chunks: {}, segmentName: {}, region: {}, bucket: {}", chunks, segmentKey, region, bucket, err);
+ abortUpload().join();
+ return false;
+ }
+ return resp;
+ });
+ }
+
+ private CompletableFuture initiateUpload() {
+ CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder()
+ .bucket(bucket)
+ .key(segmentKey)
+ .build();
+
+ return client.createMultipartUpload(request)
+ .thenApply(CreateMultipartUploadResponse::uploadId)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ LOGGER.error("Error initiating multi part upload: " + error);
+ } else {
+ uploadId = result;
+ }
+ });
+ }
+
+ private CompletableFuture uploadPart(int partNumber, String chunkKey) {
+ UploadPartCopyRequest request = UploadPartCopyRequest.builder()
+ .sourceBucket(bucket).sourceKey(chunkKey).uploadId(uploadId).partNumber(partNumber)
+ .destinationBucket(bucket).destinationKey(segmentKey)
+ .build();
+
+ return client.uploadPartCopy(request)
+ .thenApply(resp -> resp.copyPartResult().eTag())
+ .thenApply(eTag -> CompletedPart.builder().partNumber(partNumber).eTag(eTag).build())
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ LOGGER.error("Error uploading part, chunkKey: {}, partNumber: {}, uploadId: {}, error: {}", chunkKey, partNumber, uploadId, error);
+ } else {
+ completedParts.add(result);
+ }
+ });
+ }
+
+ private CompletableFuture completeUpload() {
+ Collections.sort(completedParts, Comparator.comparingInt(CompletedPart::partNumber));
+
+ CompletedMultipartUpload multipartUpload = CompletedMultipartUpload.builder()
+ .parts(completedParts)
+ .build();
+
+ CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder()
+ .bucket(bucket)
+ .key(segmentKey)
+ .uploadId(uploadId)
+ .multipartUpload(multipartUpload)
+ .build();
+
+ return client.completeMultipartUpload(request)
+ .thenApply(resp -> true)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ LOGGER.error("Error completing multi part upload, uploadId: {}, error: {}", uploadId, error);
+ }
+ });
+ }
+
+ private CompletableFuture abortUpload() {
+ AbortMultipartUploadRequest request = AbortMultipartUploadRequest.builder()
+ .bucket(bucket)
+ .key(segmentKey)
+ .uploadId(uploadId)
+ .build();
+ return client.abortMultipartUpload(request).thenApply(v -> true).exceptionally(e -> false);
+ }
+ }
+}
\ No newline at end of file
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherBaseTest.java
similarity index 90%
rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherBaseTest.java
index b5c4e9d06c9..ba9e2d550a6 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherBaseTest.java
@@ -33,7 +33,6 @@
import org.apache.rocketmq.tieredstore.container.TieredContainerManager;
import org.apache.rocketmq.tieredstore.container.TieredMessageQueueContainer;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
-import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
@@ -41,22 +40,25 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
-public class TieredDispatcherTest {
+@Ignore
+public abstract class TieredDispatcherBaseTest {
private TieredMessageStoreConfig storeConfig;
private MessageQueue mq;
private TieredMetadataStore metadataStore;
- private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID();
+ protected final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID();
+
+ public abstract TieredMessageStoreConfig createTieredMessageStoreConfig();
+
+ public abstract TieredFileSegment createTieredFileSegment(TieredFileSegment.FileSegmentType type, MessageQueue mq, long baseOffset, TieredMessageStoreConfig storeConfig);
@Before
public void setUp() {
- storeConfig = new TieredMessageStoreConfig();
- storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
- storeConfig.setBrokerName(storeConfig.getBrokerName());
+ storeConfig = createTieredMessageStoreConfig();
mq = new MessageQueue("TieredMessageQueueContainerTest", storeConfig.getBrokerName(), 0);
metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
TieredStoreExecutor.init();
@@ -73,11 +75,11 @@ public void tearDown() throws IOException {
@Test
public void testDispatch() {
metadataStore.addQueue(mq, 6);
- MemoryFileSegment segment = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG, mq, 1000, storeConfig);
+ TieredFileSegment segment = createTieredFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG, mq, 1000, storeConfig);
segment.initPosition(segment.getSize());
metadataStore.updateFileSegment(segment);
metadataStore.updateFileSegment(segment);
- segment = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE, mq, 6 * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, storeConfig);
+ segment = createTieredFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE, mq, 6 * TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, storeConfig);
metadataStore.updateFileSegment(segment);
TieredContainerManager containerManager = TieredContainerManager.getInstance(storeConfig);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherBaseTest.java
similarity index 97%
rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherBaseTest.java
index ddcc9fa6c1f..2158d6d4452 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherBaseTest.java
@@ -47,21 +47,29 @@
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
-public class TieredMessageFetcherTest {
- private TieredMessageStoreConfig storeConfig;
+@Ignore
+public abstract class TieredMessageFetcherBaseTest {
+ protected TieredMessageStoreConfig storeConfig;
private MessageQueue mq;
private final String storePath = FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID();
+ public abstract void setTieredBackendProvider();
+
@Before
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
+ setTieredBackendProvider();
storeConfig.setStorePathRootDir(storePath);
storeConfig.setBrokerName(storeConfig.getBrokerName());
storeConfig.setReadAheadCacheExpireDuration(Long.MAX_VALUE);
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
+ storeConfig.setObjectStoreRegion("ap-northeast-1");
+ storeConfig.setObjectStoreBucket("rocketmq-lcy");
+ storeConfig.setBrokerName(storeConfig.getBrokerName());
+ storeConfig.setBrokerClusterName("test-cluster");
storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
mq = new MessageQueue("TieredMessageFetcherTest", storeConfig.getBrokerName(), 0);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index c37ce2c85d8..a6d1d151262 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -85,7 +85,7 @@ public void setUp() {
brokerConfig.setBrokerName("broker");
configuration = new Configuration(LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME), "/tmp/rmqut/config", storeConfig, brokerConfig);
Properties properties = new Properties();
- properties.setProperty("tieredBackendServiceProvider", "org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
+ properties.setProperty("tieredBackendServiceProvider", "org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
configuration.registerConfig(properties);
MessageStorePluginContext context = new MessageStorePluginContext(new MessageStoreConfig(), null, null, brokerConfig, configuration);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java
index ec074b176d9..c690929d553 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredContainerManagerTest.java
@@ -44,7 +44,7 @@ public class TieredContainerManagerTest {
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
storeConfig.setBrokerName(storeConfig.getBrokerName());
mq = new MessageQueue("TieredContainerManagerTest", storeConfig.getBrokerName(), 0);
metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java
index 60f751a623d..8f2375167bd 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredFileQueueTest.java
@@ -25,7 +25,7 @@
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
-import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
+import org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.junit.After;
@@ -43,7 +43,7 @@ public class TieredFileQueueTest {
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
queue = new MessageQueue("TieredFileQueueTest", storeConfig.getBrokerName(), 0);
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
index 6a114e7ca89..5fb6251f68a 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
@@ -47,7 +47,7 @@ public class TieredIndexFileTest {
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
mq = new MessageQueue("TieredIndexFileTest", storeConfig.getBrokerName(), 1);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
index ccfe18bd3f1..4ed5b00690a 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
@@ -30,7 +30,7 @@
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.metadata.QueueMetadata;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
-import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
+import org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
@@ -51,7 +51,7 @@ public class TieredMessageQueueContainerTest {
public void setUp() {
storeConfig = new TieredMessageStoreConfig();
storeConfig.setStorePathRootDir(storePath);
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
storeConfig.setCommitLogRollingInterval(0);
storeConfig.setCommitLogRollingMinimumSize(999);
mq = new MessageQueue("TieredMessageQueueContainerTest", storeConfig.getBrokerName(), 0);
@@ -138,7 +138,7 @@ public void testAppendConsumeQueue() throws ClassNotFoundException, NoSuchMethod
@Test
public void testBinarySearchInQueueByTime() throws ClassNotFoundException, NoSuchMethodException {
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegmentWithoutCheck");
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegmentWithoutCheck");
TieredMessageQueueContainer container = new TieredMessageQueueContainer(mq, storeConfig);
container.initOffset(50);
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
index 96539d1c449..db1e9f0f984 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
@@ -29,7 +29,7 @@
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.container.TieredCommitLog;
-import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
+import org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.junit.After;
import org.junit.Assert;
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
index 170728d4b83..b41c2e8798c 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
@@ -41,7 +41,7 @@ public void getMetricsView() {
@Test
public void init() {
TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
- storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.mock.MemoryFileSegment");
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment");
TieredStoreMetricsManager.init(OpenTelemetrySdk.builder().build().getMeter(""),
null, storeConfig, new TieredMessageFetcher(storeConfig), null);
}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
new file mode 100644
index 00000000000..a6566b7de5a
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+
+public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStream {
+
+ private final InputStream inputStream;
+
+ public MockTieredFileSegmentInputStream(InputStream inputStream) {
+ super(null, null, Integer.MAX_VALUE);
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public int read() {
+ int res = -1;
+ try {
+ res = inputStream.read();
+ } catch (Exception e) {
+ return -1;
+ }
+ return res;
+ }
+
+ @Override
+ public List getUploadBufferList() {
+ return null;
+ }
+
+ @Override
+ public ByteBuffer getCodaBuffer() {
+ return null;
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentBaseTest.java
similarity index 82%
rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentBaseTest.java
index 79b1883ad8c..a81976a827b 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentBaseTest.java
@@ -18,23 +18,27 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+
import org.apache.rocketmq.tieredstore.container.TieredCommitLog;
import org.apache.rocketmq.tieredstore.container.TieredConsumeQueue;
-import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
-public class TieredFileSegmentTest {
+@Ignore
+public abstract class TieredFileSegmentBaseTest {
public int baseOffset = 1000;
- public TieredFileSegment createFileSegment(TieredFileSegment.FileSegmentType fileType) {
- return new MemoryFileSegment(fileType, new MessageQueue("TieredFileSegmentTest", new TieredMessageStoreConfig().getBrokerName(), 0),
- baseOffset, new TieredMessageStoreConfig());
- }
+ public abstract TieredFileSegment createFileSegment(TieredFileSegment.FileSegmentType fileType);
@Test
public void testCommitLog() {
@@ -115,29 +119,38 @@ public void testConsumeQueue() {
@Test
public void testCommitFailed() {
long startTime = System.currentTimeMillis();
- MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
+ TieredFileSegment segment = Mockito.spy(createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG));
long lastSize = segment.getSize();
segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
- segment.blocker = new CompletableFuture<>();
+ CompletableFuture blocker = new CompletableFuture<>();
+ Mockito.doAnswer(invocation -> {
+ blocker.join();
+ CompletableFuture completableFuture = new CompletableFuture<>();
+ completableFuture.completeExceptionally(new RuntimeException("commit failed"));
+ return completableFuture;
+ }).when(segment).commit0(any(TieredFileSegmentInputStream.class), anyLong(), anyInt(), anyBoolean());
+
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Assert.fail(e.getMessage());
}
+ // append msg3
ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime);
segment.append(buffer, 0);
- segment.blocker.complete(false);
+ // blocker complete, commit failed
+ blocker.complete(null);
}).start();
+ // first time try to commit these 2 messages but stuck for while until msg3 is appended, and then this commit failed
segment.commit();
- segment.blocker.join();
- segment.blocker = new CompletableFuture<>();
- segment.blocker.complete(true);
+ // second time commit, expect success
+ Mockito.doCallRealMethod().when(segment).commit0(any(TieredFileSegmentInputStream.class), anyLong(), anyInt(), anyBoolean());
segment.commit();
Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
similarity index 97%
rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
index 3c47d1cb8d4..2d0eba6e359 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.tieredstore.mock;
+package org.apache.rocketmq.tieredstore.provider.memory;
import java.io.File;
import java.nio.ByteBuffer;
@@ -71,6 +71,11 @@ public void createFile() {
}
+ @Override
+ public void sealFile() {
+
+ }
+
@Override
public CompletableFuture read0(long position, int length) {
ByteBuffer buffer = memStore.duplicate();
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
similarity index 97%
rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
index 741a38c81c4..2c49c884799 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.tieredstore.mock;
+package org.apache.rocketmq.tieredstore.provider.memory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/TieredDispatcherForMemoryTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/TieredDispatcherForMemoryTest.java
new file mode 100644
index 00000000000..b938e2689b9
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/TieredDispatcherForMemoryTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.memory;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.TieredDispatcherBaseTest;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+
+public class TieredDispatcherForMemoryTest extends TieredDispatcherBaseTest {
+ @Override
+ public TieredMessageStoreConfig createTieredMessageStoreConfig() {
+ TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setStorePathRootDir(storePath);
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegmentWithoutCheck");
+ storeConfig.setBrokerName(storeConfig.getBrokerName());
+ storeConfig.setBrokerClusterName("test-cluster");
+ return storeConfig;
+ }
+
+ @Override
+ public TieredFileSegment createTieredFileSegment(TieredFileSegment.FileSegmentType type, MessageQueue mq, long baseOffset, TieredMessageStoreConfig storeConfig) {
+ return new MemoryFileSegmentWithoutCheck(type, mq, baseOffset, storeConfig);
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/TieredFileSegmentForMemoryTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/TieredFileSegmentForMemoryTest.java
new file mode 100644
index 00000000000..f1e33d44bd9
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/TieredFileSegmentForMemoryTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.memory;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegmentBaseTest;
+
+public class TieredFileSegmentForMemoryTest extends TieredFileSegmentBaseTest {
+
+ @Override
+ public TieredFileSegment createFileSegment(TieredFileSegment.FileSegmentType fileType) {
+ return new MemoryFileSegment(fileType, new MessageQueue("TieredFileSegmentTest", new TieredMessageStoreConfig().getBrokerName(), 0),
+ baseOffset, new TieredMessageStoreConfig());
+ }
+
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/TieredMessageFetcherForMemoryTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/TieredMessageFetcherForMemoryTest.java
new file mode 100644
index 00000000000..04112dac28f
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/TieredMessageFetcherForMemoryTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.memory;
+
+import org.apache.rocketmq.tieredstore.TieredMessageFetcherBaseTest;
+
+public class TieredMessageFetcherForMemoryTest extends TieredMessageFetcherBaseTest {
+ @Override
+ public void setTieredBackendProvider() {
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegmentWithoutCheck");
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/MockS3AsyncClient.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/MockS3AsyncClient.java
new file mode 100644
index 00000000000..1ddd6576dc8
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/MockS3AsyncClient.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import com.adobe.testing.s3mock.junit4.S3MockRule;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.junit.ClassRule;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.core.sync.ResponseTransformer;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartCopyResponse;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+public class MockS3AsyncClient implements S3AsyncClient {
+
+ @ClassRule
+ public static final S3MockRule S3_MOCK_RULE = S3MockRule.builder().silent().build();
+
+ public static TieredStorageS3Client getMockTieredStorageS3Client(TieredMessageStoreConfig config,
+ S3MockStarterTestImpl s3MockApplication) {
+ TieredStorageS3Client tieredStorageS3Client = null;
+ try {
+ tieredStorageS3Client = new TieredStorageS3Client(config);
+ S3Client s3Client = s3MockApplication.createS3ClientV2();
+ S3AsyncClient asyncClient = new MockS3AsyncClient(s3Client);
+ Field clientField = tieredStorageS3Client.getClass().getDeclaredField("client");
+ clientField.setAccessible(true);
+ clientField.set(tieredStorageS3Client, asyncClient);
+ s3Client.createBucket(CreateBucketRequest.builder().bucket(config.getObjectStoreBucket()).build());
+ } catch (Exception ignore) {
+
+ }
+ return tieredStorageS3Client;
+ }
+
+ private final S3Client s3Client;
+
+ public MockS3AsyncClient(S3Client s3Client) {
+ this.s3Client = s3Client;
+ }
+
+ @Override
+ public String serviceName() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ this.s3Client.close();
+ }
+
+ @Override
+ public CompletableFuture createBucket(CreateBucketRequest createBucketRequest) {
+ return CompletableFuture.completedFuture(this.s3Client.createBucket(createBucketRequest));
+ }
+
+ @Override
+ public CompletableFuture putObject(PutObjectRequest putObjectRequest,
+ AsyncRequestBody requestBody) {
+ List list = new LinkedList<>();
+ CompletableFuture future = requestBody.subscribe(bytebuffer -> {
+ list.add(bytebuffer);
+ });
+ future.join();
+ Integer len = list.stream().map(a -> a.limit()).reduce((a, b) -> a + b).get();
+ ByteBuffer realByteBuffer = ByteBuffer.allocate(len);
+ for (int i = 0; i < list.size(); i++) {
+ ByteBuffer byteBuffer = list.get(i);
+ byteBuffer.rewind();
+ realByteBuffer.put(byteBuffer);
+ }
+ realByteBuffer.flip();
+ RequestBody body = RequestBody.fromByteBuffer(realByteBuffer);
+ return CompletableFuture.completedFuture(this.s3Client.putObject(putObjectRequest, body));
+ }
+
+ @Override
+ public CompletableFuture listObjectsV2(
+ Consumer listObjectsV2Request) {
+ ListObjectsV2Request request = ListObjectsV2Request.builder().applyMutation(listObjectsV2Request).build();
+ return this.listObjectsV2(request);
+ }
+
+ @Override
+ public CompletableFuture listObjectsV2(ListObjectsV2Request listObjectsV2Request) {
+ return CompletableFuture.completedFuture(this.s3Client.listObjectsV2(listObjectsV2Request));
+ }
+
+ @Override
+ public CompletableFuture deleteObject(
+ Consumer deleteObjectRequest) {
+ DeleteObjectRequest request = DeleteObjectRequest.builder().applyMutation(deleteObjectRequest).build();
+ return this.deleteObject(request);
+ }
+
+ @Override
+ public CompletableFuture deleteObject(DeleteObjectRequest deleteObjectRequest) {
+ return CompletableFuture.completedFuture(this.s3Client.deleteObject(deleteObjectRequest));
+ }
+
+ @Override
+ public CompletableFuture deleteObjects(
+ Consumer deleteObjectsRequest) {
+ DeleteObjectsRequest request = DeleteObjectsRequest.builder().applyMutation(deleteObjectsRequest).build();
+ return this.deleteObjects(request);
+ }
+
+ @Override
+ public CompletableFuture deleteObjects(DeleteObjectsRequest deleteObjectsRequest) {
+ return CompletableFuture.completedFuture(this.s3Client.deleteObjects(deleteObjectsRequest));
+ }
+
+ @Override
+ public CompletableFuture getObject(Consumer getObjectRequest,
+ AsyncResponseTransformer asyncResponseTransformer) {
+ GetObjectRequest request = GetObjectRequest.builder().applyMutation(getObjectRequest).build();
+ return this.getObject(request, asyncResponseTransformer);
+ }
+
+ @Override
+ public CompletableFuture getObject(GetObjectRequest getObjectRequest,
+ AsyncResponseTransformer asyncResponseTransformer) {
+ ResponseBytes resp = this.s3Client.getObject(getObjectRequest, ResponseTransformer.toBytes());
+ return CompletableFuture.completedFuture((T) resp);
+ }
+
+ @Override
+ public CompletableFuture createMultipartUpload(
+ Consumer createMultipartUploadRequest) {
+ CreateMultipartUploadRequest request = CreateMultipartUploadRequest.builder().applyMutation(createMultipartUploadRequest).build();
+ return this.createMultipartUpload(request);
+ }
+
+ @Override
+ public CompletableFuture createMultipartUpload(
+ CreateMultipartUploadRequest createMultipartUploadRequest) {
+ return CompletableFuture.completedFuture(this.s3Client.createMultipartUpload(createMultipartUploadRequest));
+ }
+
+ @Override
+ public CompletableFuture uploadPartCopy(
+ Consumer uploadPartCopyRequest) {
+ UploadPartCopyRequest request = UploadPartCopyRequest.builder().applyMutation(uploadPartCopyRequest).build();
+ return this.uploadPartCopy(request);
+ }
+
+ @Override
+ public CompletableFuture uploadPartCopy(UploadPartCopyRequest uploadPartCopyRequest) {
+ return CompletableFuture.completedFuture(this.s3Client.uploadPartCopy(uploadPartCopyRequest));
+ }
+
+ @Override
+ public CompletableFuture completeMultipartUpload(
+ Consumer completeMultipartUploadRequest) {
+ CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder().applyMutation(completeMultipartUploadRequest).build();
+ return this.completeMultipartUpload(request);
+ }
+
+ @Override
+ public CompletableFuture completeMultipartUpload(
+ CompleteMultipartUploadRequest completeMultipartUploadRequest) {
+ return CompletableFuture.completedFuture(this.s3Client.completeMultipartUpload(completeMultipartUploadRequest));
+ }
+
+ @Override
+ public CompletableFuture abortMultipartUpload(
+ Consumer abortMultipartUploadRequest) {
+ AbortMultipartUploadRequest request = AbortMultipartUploadRequest.builder().applyMutation(abortMultipartUploadRequest).build();
+ return S3AsyncClient.super.abortMultipartUpload(request);
+ }
+
+ @Override
+ public CompletableFuture abortMultipartUpload(
+ AbortMultipartUploadRequest abortMultipartUploadRequest) {
+ return CompletableFuture.completedFuture(this.s3Client.abortMultipartUpload(abortMultipartUploadRequest));
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/MockS3TestBase.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/MockS3TestBase.java
new file mode 100644
index 00000000000..fabd0bebb4f
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/MockS3TestBase.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import com.adobe.testing.s3mock.S3MockApplication;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.junit.Assert;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class MockS3TestBase {
+
+ public static final String STORE_BASE_PATH = FileUtils.getTempDirectory() + File.separator + "MockS3TestBase-";
+
+ protected S3MockStarterTestImpl s3MockStater;
+
+ private String rootPath;
+
+ protected void startMockedS3() {
+ Map properties = new HashMap();
+ properties.put(S3MockApplication.PROP_HTTP_PORT, S3MockApplication.RANDOM_PORT);
+ properties.put(S3MockApplication.PROP_HTTPS_PORT, S3MockApplication.RANDOM_PORT);
+ rootPath = STORE_BASE_PATH + UUID.randomUUID();
+ properties.put(S3MockApplication.PROP_ROOT_DIRECTORY, rootPath);
+ properties.put(S3MockApplication.PROP_INITIAL_BUCKETS, "rocketmq_lcy");
+
+ TieredMessageStoreConfig config = new TieredMessageStoreConfig();
+ config.setObjectStoreRegion("ap-northeast-1");
+ config.setObjectStoreBucket("rocketmq-lcy");
+ config.setObjectStoreAccessKey("");
+ config.setObjectStoreSecretKey("");
+ s3MockStater = new S3MockStarterTestImpl(properties);
+ s3MockStater.start();
+ TieredStorageS3Client client = MockS3AsyncClient.getMockTieredStorageS3Client(config, s3MockStater);
+ try {
+ Field instanceField = TieredStorageS3Client.class.getDeclaredField("instance");
+ instanceField.setAccessible(true);
+ instanceField.set(null, client);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ protected void clearMockS3Data() {
+ this.s3MockStater.stop();
+ UtilAll.deleteFile(new File(rootPath));
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegmentMetadataTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegmentMetadataTest.java
new file mode 100644
index 00000000000..78910be1c2b
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegmentMetadataTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class S3FileSegmentMetadataTest {
+
+ @Test
+ public void testBasicOperation() {
+ S3FileSegmentMetadata metadata = new S3FileSegmentMetadata();
+ // valid chunk adding
+ Assert.assertTrue(metadata.addChunk(new ChunkMetadata("test", 0, 10)));
+ Assert.assertTrue(metadata.addChunk(new ChunkMetadata("test", 10, 10)));
+ Assert.assertEquals(0, metadata.getStartPosition());
+ Assert.assertEquals(19, metadata.getEndPosition());
+ Assert.assertEquals(20, metadata.getSize());
+ Assert.assertEquals(2, metadata.getChunkCount());
+ Assert.assertFalse(metadata.isSealed());
+
+ // invalid chunk adding
+ Assert.assertFalse(metadata.addChunk(new ChunkMetadata("test", 0, 10)));
+
+ // seal
+ metadata.setSegment(new ChunkMetadata("test", 0, 10));
+ Assert.assertTrue(metadata.isSealed());
+ Assert.assertEquals(0, metadata.getStartPosition());
+ Assert.assertEquals(9, metadata.getEndPosition());
+ Assert.assertEquals(10, metadata.getSize());
+ Assert.assertEquals(2, metadata.getChunkCount());
+
+ // remove all chunks
+ metadata.removeAllChunks();
+ Assert.assertEquals(0, metadata.getChunkCount());
+
+ }
+
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegmentTest.java
new file mode 100644
index 00000000000..d1f2095dade
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/S3FileSegmentTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import com.adobe.testing.s3mock.S3MockApplication;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
+import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletionException;
+
+import static org.apache.rocketmq.tieredstore.util.TieredStoreUtil.MB;
+
+@Ignore
+public class S3FileSegmentTest extends MockS3TestBase {
+
+ private static final TieredMessageStoreConfig CONFIG = new TieredMessageStoreConfig();
+
+ static {
+ CONFIG.setBrokerClusterName("test-cluster");
+ CONFIG.setBrokerName("test-broker");
+ CONFIG.setObjectStoreRegion("ap-northeast-1");
+ CONFIG.setObjectStoreBucket("rocketmq-lcy");
+ CONFIG.setObjectStoreAccessKey("");
+ CONFIG.setObjectStoreSecretKey("");
+ }
+
+ private static final Map PROPERTIES = new HashMap();
+
+ static {
+ PROPERTIES.put(S3MockApplication.PROP_HTTP_PORT, S3MockApplication.RANDOM_PORT);
+ PROPERTIES.put(S3MockApplication.PROP_HTTPS_PORT, S3MockApplication.RANDOM_PORT);
+ PROPERTIES.put(S3MockApplication.PROP_INITIAL_BUCKETS, CONFIG.getObjectStoreBucket());
+ }
+
+ private static final MessageQueue MQ = new MessageQueue();
+
+ static {
+ MQ.setBrokerName("test-broker");
+ MQ.setQueueId(0);
+ MQ.setTopic("test-topic");
+ }
+
+ private static final long BASE_OFFSET = 1024;
+
+ private static final TieredFileSegment.FileSegmentType TYPE = TieredFileSegment.FileSegmentType.CONSUME_QUEUE;
+
+ private S3FileSegment segment;
+
+ @Before
+ public void setUp() {
+ startMockedS3();
+ segment = new S3FileSegment(TYPE, MQ, BASE_OFFSET, CONFIG);
+ }
+
+ @After
+ public void tearDown() {
+ clearMockS3Data();
+ }
+
+ @Test
+ public void testNewInstance() {
+ S3FileSegmentMetadata metadata = segment.getMetadata();
+ Assert.assertEquals(0, metadata.getSize());
+ }
+
+ @Test
+ public void testCommit() {
+ TieredFileSegmentInputStream inputStream = buildMockedInputStream("hello".getBytes());
+ segment.commit0(inputStream, 0, 5, false).join();
+ ByteBuffer read = segment.read0(0, 5).join();
+ Assert.assertEquals("hello", new String(read.array()));
+ Assert.assertEquals(5, segment.getSize());
+ Assert.assertEquals(0, segment.getMetadata().getStartPosition());
+ Assert.assertEquals(4, segment.getMetadata().getEndPosition());
+ }
+
+ @Test
+ public void testCommitAndRestart() {
+ TieredFileSegmentInputStream inputStream = buildMockedInputStream("hello".getBytes());
+ segment.commit0(inputStream, 0, 5, false).join();
+ ByteBuffer read = segment.read0(0, 5).join();
+ Assert.assertEquals("hello", new String(read.array()));
+ Assert.assertEquals(5, segment.getSize());
+ Assert.assertEquals(0, segment.getMetadata().getStartPosition());
+ Assert.assertEquals(4, segment.getMetadata().getEndPosition());
+
+ segment = new S3FileSegment(TYPE, MQ, BASE_OFFSET, CONFIG);
+ Assert.assertEquals(5, segment.getSize());
+ Assert.assertEquals(0, segment.getMetadata().getStartPosition());
+ Assert.assertEquals(4, segment.getMetadata().getEndPosition());
+ read = segment.read0(0, 5).join();
+ Assert.assertEquals("hello", new String(read.array()));
+ }
+
+ @Test
+ public void testRestartWithInvalidChunks() {
+ // write invalid chunks
+ TieredStorageS3Client client = TieredStorageS3Client.getInstance(CONFIG);
+ client.writeChunk(segment.getChunkPath() + File.separator + "chunk-" + 0, new ByteArrayInputStream("hello".getBytes()), 5).join();
+ client.writeChunk(segment.getChunkPath() + File.separator + "chunk-" + 1, new ByteArrayInputStream("world".getBytes()), 5).join();
+
+ // initialize invalid chunks
+ Assert.assertThrows(RuntimeException.class, () -> segment = new S3FileSegment(TYPE, MQ, BASE_OFFSET, CONFIG));
+ }
+
+ @Test
+ public void testRestartWithInvalidSegments() {
+ // write two segments
+ TieredStorageS3Client client = TieredStorageS3Client.getInstance(CONFIG);
+ client.writeChunk(segment.getSegmentPath() + File.separator + "segment-" + 0, new ByteArrayInputStream("hello".getBytes()), 5).join();
+ client.writeChunk(segment.getSegmentPath() + File.separator + "segment-" + 1, new ByteArrayInputStream("world".getBytes()), 5).join();
+
+ // initialize invalid segments
+ Assert.assertThrows(RuntimeException.class, () -> segment = new S3FileSegment(TYPE, MQ, BASE_OFFSET, CONFIG));
+ }
+
+ @Test
+ public void testCommitAndDelete() {
+ TieredFileSegmentInputStream inputStream = buildMockedInputStream("hello".getBytes());
+ segment.commit0(inputStream, 0, 5, false).join();
+ ByteBuffer read = segment.read0(0, 5).join();
+ Assert.assertEquals("hello", new String(read.array()));
+ segment.destroyFile();
+ segment = new S3FileSegment(TYPE, MQ, BASE_OFFSET, CONFIG);
+ Assert.assertEquals(0, segment.getSize());
+ Assert.assertEquals(-1, segment.getMetadata().getStartPosition());
+ Assert.assertEquals(-1, segment.getMetadata().getEndPosition());
+ Assert.assertTrue(segment.read0(0, 5).isCompletedExceptionally());
+ }
+
+ @Test
+ public void testBackwardCommitPosition() {
+ // write first chunk: "hello", size = 5, position: [0, 4]
+ TieredFileSegmentInputStream inputStream = buildMockedInputStream("hello".getBytes());
+ Assert.assertTrue(segment.commit0(inputStream, 0, 5, false).join());
+ ByteBuffer read = segment.read0(0, 5).join();
+ Assert.assertEquals("hello", new String(read.array()));
+ // write second chunk: ",world", size = 6, position: [5, 10]
+ inputStream = buildMockedInputStream(",world".getBytes());
+ Assert.assertTrue(segment.commit0(inputStream, 5, 6, false).join());
+ read = segment.read0(0, 11).join();
+ Assert.assertEquals("hello,world", new String(read.array()));
+ // write third chunk: " and lcy", size = 8, position: [11, 18]
+ inputStream = buildMockedInputStream(" and lcy".getBytes());
+ Assert.assertTrue(segment.commit0(inputStream, 11, 8, false).join());
+ read = segment.read0(0, 19).join();
+ Assert.assertEquals("hello,world and lcy", new String(read.array()));
+ // write a chunk from position 2, size = 2, data: "he", position: [2, 3]
+ inputStream = buildMockedInputStream("he".getBytes());
+ TieredStoreException exception = null;
+ try {
+ segment.commit0(inputStream, 2, 2, false).join();
+ } catch (CompletionException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue(cause instanceof TieredStoreException);
+ exception = (TieredStoreException) cause;
+ Assert.assertEquals(TieredStoreErrorCode.ILLEGAL_OFFSET, exception.getErrorCode());
+ Assert.assertEquals(19, exception.getPosition());
+ }
+ Assert.assertNotNull(exception);
+ }
+
+ @Test
+ public void testSeal() throws Exception {
+ CONFIG.setEnableMerge(true);
+ int unit = (int) (5 * MB);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(unit);
+ for (int i = 0; i < unit; i++) {
+ byteBuffer.put((byte) i);
+ }
+ byte[] array = byteBuffer.array();
+ for (int i = 0; i < 2; i++) {
+ TieredFileSegmentInputStream inputStream = buildMockedInputStream(array);
+ segment.commit0(inputStream, i * unit, unit, false).join();
+ }
+ // seal
+ segment.sealFile();
+ Thread.sleep(3000);
+
+ Assert.assertTrue(segment.isSealed());
+ S3FileSegmentMetadata metadata = segment.getMetadata();
+ Assert.assertEquals(0, metadata.getChunkCount());
+ Assert.assertEquals(0, metadata.getStartPosition());
+ Assert.assertEquals(2 * unit - 1, metadata.getEndPosition());
+ Assert.assertEquals(2 * unit, metadata.getSize());
+ TieredStoreException exception = null;
+ try {
+ segment.commit0(buildMockedInputStream("lcy".getBytes()), 2 * unit, 3, false).join();
+ } catch (CompletionException e) {
+ Throwable cause = e.getCause();
+ Assert.assertTrue(cause instanceof TieredStoreException);
+ exception = (TieredStoreException) cause;
+ Assert.assertEquals(TieredStoreErrorCode.SEGMENT_SEALED, exception.getErrorCode());
+ }
+ Assert.assertNotNull(exception);
+ CONFIG.setEnableMerge(false);
+ }
+
+ private TieredFileSegmentInputStream buildMockedInputStream(byte[] bytes) {
+ List uploadBuffers = Arrays.asList(ByteBuffer.wrap(bytes));
+ return TieredFileSegmentInputStreamFactory.build(TieredFileSegment.FileSegmentType.CONSUME_QUEUE, 0, uploadBuffers, null, bytes.length);
+ }
+
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/S3MockStarterTestImpl.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/S3MockStarterTestImpl.java
new file mode 100644
index 00000000000..f8a624d36b4
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/S3MockStarterTestImpl.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import com.adobe.testing.s3mock.testsupport.common.S3MockStarter;
+
+import java.util.Map;
+
+public class S3MockStarterTestImpl extends S3MockStarter {
+ protected S3MockStarterTestImpl(Map properties) {
+ super(properties);
+ }
+
+ @Override
+ protected void start() {
+ super.start();
+ }
+
+ @Override
+ protected void stop() {
+ super.stop();
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredDispatcherForS3Test.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredDispatcherForS3Test.java
new file mode 100644
index 00000000000..89bf383775d
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredDispatcherForS3Test.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.TieredDispatcherBaseTest;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+
+import java.io.IOException;
+import org.junit.Ignore;
+
+@Ignore
+public class TieredDispatcherForS3Test extends TieredDispatcherBaseTest {
+
+ private MockS3TestBase mockS3TestBase = new MockS3TestBase();
+
+ @Override
+ public TieredMessageStoreConfig createTieredMessageStoreConfig() {
+ TieredMessageStoreConfig storeConfig = new TieredMessageStoreConfig();
+ storeConfig.setStorePathRootDir(storePath);
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.s3.S3FileSegment");
+ storeConfig.setBrokerName(storeConfig.getBrokerName());
+ storeConfig.setBrokerClusterName("test-cluster");
+ storeConfig.setObjectStoreRegion("ap-northeast-1");
+ storeConfig.setObjectStoreBucket("rocketmq-lcy");
+ return storeConfig;
+ }
+
+ @Override
+ public TieredFileSegment createTieredFileSegment(TieredFileSegment.FileSegmentType type, MessageQueue mq,
+ long baseOffset, TieredMessageStoreConfig storeConfig) {
+ return new S3FileSegment(type, mq, baseOffset, storeConfig);
+ }
+
+ @Override
+ public void setUp() {
+ mockS3TestBase.startMockedS3();
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ super.tearDown();
+ mockS3TestBase.clearMockS3Data();
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredFileSegmentForS3Test.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredFileSegmentForS3Test.java
new file mode 100644
index 00000000000..94381189964
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredFileSegmentForS3Test.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegmentBaseTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+
+@Ignore
+public class TieredFileSegmentForS3Test extends TieredFileSegmentBaseTest {
+
+ private MockS3TestBase mockS3TestBase = new MockS3TestBase();
+
+ private static final TieredMessageStoreConfig CONFIG = new TieredMessageStoreConfig();
+
+ static {
+ CONFIG.setBrokerClusterName("test-cluster");
+ CONFIG.setBrokerName("test-broker");
+ CONFIG.setObjectStoreRegion("ap-northeast-1");
+ CONFIG.setObjectStoreBucket("rocketmq-lcy");
+ CONFIG.setObjectStoreAccessKey("");
+ CONFIG.setObjectStoreSecretKey("");
+ }
+
+ public TieredFileSegment createFileSegment(TieredFileSegment.FileSegmentType fileType) {
+ return new S3FileSegment(fileType, new MessageQueue("TieredFileSegmentTest", CONFIG.getBrokerName(), 0),
+ baseOffset, CONFIG);
+ }
+
+ @Before
+ public void setUp() {
+ mockS3TestBase.startMockedS3();
+ }
+
+ @After
+ public void tearDown() {
+ mockS3TestBase.clearMockS3Data();
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredMessageFetcherForS3Test.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredMessageFetcherForS3Test.java
new file mode 100644
index 00000000000..809d73d06b7
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredMessageFetcherForS3Test.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import org.apache.rocketmq.tieredstore.TieredMessageFetcherBaseTest;
+
+import java.io.IOException;
+import org.junit.Ignore;
+
+@Ignore
+public class TieredMessageFetcherForS3Test extends TieredMessageFetcherBaseTest {
+
+ private MockS3TestBase mockS3TestBase = new MockS3TestBase();
+
+ @Override
+ public void setTieredBackendProvider() {
+ storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.s3.S3FileSegment");
+ }
+
+ @Override
+ public void setUp() {
+ mockS3TestBase.startMockedS3();
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ super.tearDown();
+ mockS3TestBase.clearMockS3Data();
+ }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredStorageS3ClientTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredStorageS3ClientTest.java
new file mode 100644
index 00000000000..35750d056b1
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/s3/TieredStorageS3ClientTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.s3;
+
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.rocketmq.tieredstore.util.TieredStoreUtil.MB;
+
+@Ignore
+public class TieredStorageS3ClientTest extends MockS3TestBase {
+
+ private static final TieredMessageStoreConfig CONFIG = new TieredMessageStoreConfig();
+
+ private static final String BASE_DIR = "123/c/b/t/0/CommitLog/seg-0";
+
+ static {
+ CONFIG.setBrokerClusterName("test-cluster");
+ CONFIG.setBrokerName("test-broker");
+ CONFIG.setObjectStoreRegion("ap-northeast-1");
+ CONFIG.setObjectStoreBucket("rocketmq-lcy");
+ CONFIG.setObjectStoreAccessKey("");
+ CONFIG.setObjectStoreSecretKey("");
+ }
+
+ private TieredStorageS3Client client;
+
+ @Before
+ public void setUp() {
+ startMockedS3();
+ client = MockS3AsyncClient.getMockTieredStorageS3Client(CONFIG, s3MockStater);
+ }
+
+ @After
+ public void tearDown() {
+ clearMockS3Data();
+ }
+
+ @Test
+ public void testWriteChunk() {
+ InputStream inputStream = new ByteArrayInputStream("test".getBytes());
+ String chunkName = BASE_DIR + File.separator + "chunk-0";
+ CompletableFuture completableFuture = client.writeChunk(chunkName, inputStream, 4);
+ Assert.assertTrue(completableFuture.join());
+ }
+
+ @Test
+ public void testReadChunk() {
+ InputStream inputStream = new ByteArrayInputStream("test".getBytes());
+ String chunkName = BASE_DIR + File.separator + "chunk-0";
+ CompletableFuture completableFuture = client.writeChunk(chunkName, inputStream, 4);
+ Assert.assertTrue(completableFuture.join());
+ byte[] bytes = client.readChunk(chunkName, 0, 4).join();
+ Assert.assertEquals("test", new String(bytes));
+ }
+
+ @Test
+ public void testListChunks() {
+ for (int i = 0; i < 10; i++) {
+ String chunkName = BASE_DIR + File.separator + "chunk-" + (i * 5);
+ InputStream inputStream = new ByteArrayInputStream(("test" + i).getBytes());
+ CompletableFuture completableFuture = client.writeChunk(chunkName, inputStream, 5);
+ Assert.assertTrue(completableFuture.join());
+ }
+ List chunks = client.listChunks(BASE_DIR).join();
+ Assert.assertEquals(10, chunks.size());
+ for (int i = 0; i < 10; i++) {
+ ChunkMetadata chunkMetadata = chunks.get(i);
+ String chunkName = BASE_DIR + File.separator + "chunk-" + (i * 5);
+ Assert.assertEquals(chunkName, chunkMetadata.getChunkName());
+ Assert.assertEquals(i * 5, chunkMetadata.getStartPosition());
+ Assert.assertEquals(5, chunkMetadata.getChunkSize());
+ }
+ }
+
+ @Test
+ public void testExist() {
+ String chunkName = BASE_DIR + File.separator + "chunk-0";
+ Assert.assertFalse(client.exist(chunkName).join());
+
+ InputStream inputStream = new ByteArrayInputStream("test".getBytes());
+ CompletableFuture completableFuture = client.writeChunk(chunkName, inputStream, 4);
+ Assert.assertTrue(completableFuture.join());
+
+ Assert.assertTrue(client.exist(chunkName).join());
+ }
+
+ @Test
+ public void testDeleteObjects() {
+ for (int i = 0; i < 10; i++) {
+ String chunkName = BASE_DIR + File.separator + "chunk-" + (i * 5);
+ InputStream inputStream = new ByteArrayInputStream(("test" + i).getBytes());
+ CompletableFuture completableFuture = client.writeChunk(chunkName, inputStream, 5);
+ Assert.assertTrue(completableFuture.join());
+ }
+ List chunks = client.listChunks(BASE_DIR).join();
+ Assert.assertEquals(10, chunks.size());
+ for (int i = 0; i < 10; i++) {
+ ChunkMetadata chunkMetadata = chunks.get(i);
+ String chunkName = BASE_DIR + File.separator + "chunk-" + (i * 5);
+ Assert.assertEquals(chunkName, chunkMetadata.getChunkName());
+ Assert.assertEquals(i * 5, chunkMetadata.getStartPosition());
+ Assert.assertEquals(5, chunkMetadata.getChunkSize());
+ }
+
+ List undeleted = client.deleteObjects(BASE_DIR).join();
+ Assert.assertTrue(undeleted.isEmpty());
+
+ chunks = client.listChunks(BASE_DIR).join();
+ Assert.assertEquals(0, chunks.size());
+ }
+
+ @Test
+ public void testMergeAllChunksIntoSegment() {
+ int unit = (int) (5 * MB);
+ List chunks = new ArrayList<>(2);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(unit);
+ for (int i = 0; i < unit; i++) {
+ byteBuffer.put((byte) i);
+ }
+ byte[] bytes = byteBuffer.array();
+ for (int i = 0; i < 2; i++) {
+ String chunkName = BASE_DIR + File.separator + "chunk-" + (i * unit);
+ chunks.add(new ChunkMetadata(chunkName, i * unit, unit));
+ InputStream inputStream = new ByteArrayInputStream(bytes);
+ CompletableFuture completableFuture = client.writeChunk(chunkName, inputStream, unit);
+ Assert.assertTrue(completableFuture.join());
+ }
+ String segName = BASE_DIR + File.separator + "segment-0";
+ Boolean merged = this.client.mergeAllChunksIntoSegment(chunks, segName).join();
+ Assert.assertTrue(merged);
+ byte[] segBytes = this.client.readChunk(segName, 0, 2 * unit).join();
+ Assert.assertEquals(2 * unit, segBytes.length);
+ for (int i = 0; i < 2; i++) {
+ int offset = i * unit;
+ for (int j = 0; j < unit; j++) {
+ Assert.assertEquals(bytes[j], segBytes[j + offset]);
+ }
+ }
+ }
+
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index befd401ffe7..c16ffa4cb8f 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -112,6 +112,31 @@ public static ByteBuffer buildMockedConsumeQueueBuffer() {
}
+ public static void verifyMockedMessageBuffer(ByteBuffer buffer, int phyOffset) {
+ Assert.assertEquals(MSG_LEN, buffer.remaining());
+ Assert.assertEquals(MSG_LEN, buffer.getInt());
+ Assert.assertEquals(MessageDecoder.MESSAGE_MAGIC_CODE_V2, buffer.getInt());
+ Assert.assertEquals(3, buffer.getInt());
+ Assert.assertEquals(4, buffer.getInt());
+ Assert.assertEquals(5, buffer.getInt());
+ Assert.assertEquals(6, buffer.getLong());
+ Assert.assertEquals(phyOffset, buffer.getLong());
+ Assert.assertEquals(8, buffer.getInt());
+ Assert.assertEquals(9, buffer.getLong());
+ Assert.assertEquals(10, buffer.getLong());
+ Assert.assertEquals(11, buffer.getLong());
+ Assert.assertEquals(10, buffer.getLong());
+ Assert.assertEquals(13, buffer.getInt());
+ Assert.assertEquals(14, buffer.getLong());
+ Assert.assertEquals(0, buffer.getInt());
+ Assert.assertEquals(0, buffer.getShort());
+ buffer.rewind();
+ Map properties = MessageBufferUtil.getProperties(buffer);
+ Assert.assertEquals("uk", properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+ Assert.assertEquals("uservalue0", properties.get("userkey"));
+ }
+
+
@Test
public void testGetTotalSize() {
ByteBuffer buffer = buildMockedMessageBuffer();