-
Notifications
You must be signed in to change notification settings - Fork 11.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Amazon S3 backend in TieredStorage #6154
Comments
@ShadowySpirits I would like to implement this feature! Plz assign it to me~ |
Would it be better to implement the universal object storage tiered storage rather than just AWS S3? From my understanding, there's no huge difference in object storage for cloud vendors. The main difference is the client interface that accesses the API. Maybe we can propose a client interface like K8S CNI to adapt for all cloud vendors? If I try to implement it in Azure or AlibabaCloud, just implement the client interface. |
Great point! But one thing to consider is some object storage products support |
1. change log level Closes apache#6154
1. change log level Closes apache#6154
* support s3 backend in tiered storage * refactor(tieredstorage): Unify all object storage configuration properties 1. Unify all object storage configuration properties * refactor(tieredstorage): replace some lambda function with more simple expression 1. replace some lambda function with more simple expression * style(tieredstorage): perfect comments on ChunkMetadata 1. perfect comments on ChunkMetadata * refactor(tieredstorage): perfect lambda expression 1. perfect lambda expression * fix(tieredstorage): fix unmatched config attributes in brokerS3.conf 1. fix unmatched config attributes in brokerS3.conf * feat(tieredstorage): More context in logging output 1. More context in logging output * fix(tieredstorage): fix wrong concurrently put 1. fix wrong concurrently put * test(tieredstorage): add UT to verify TieredFileSegmentInputStream 1. add UT to verify TieredFileSegmentInputStream * refactor(tieredstorage): better code placement 1. better code placement * refactor(tieredstorage): refactor TieredFileSegmentInputStream for better understandability 1. refactor TieredFileSegmentInputStream for better understandability * feat(tieredstorage): support `reset` of TieredFileSegmentInputStream 1. support `reset` of TieredFileSegmentInputStream * fix(tieredstorage): fix wrong position when failed in `S3FileSegment#commit0` 1. fix wrong position when failed in `S3FileSegment#commit0` * fix(tieredstorage): fix still have upload buffer when already seal the segment 1. fix still have upload buffer when already seal the segment * test(tieredstorage): fix wrong assertion 1. fix wrong assertion * feat(tieredstorage): support switch to enable merge chunks into segment 1. support switch to enable merge chunks into segment * feat(tieredstorage): add more debug log in TieredMessageStore 1. add more debug log in TieredMessageStore * style(tieredstorage): use rmq code style 1. use rmq code style * feat(tieredstorage): add metrics for S3 provider 1. add metrics for S3 provider * fix(tieredstorage): resolve conflicts after rebasing master 1. resolve conflicts after rebasing master Closes #6624 * style(tieredstorage): change log level 1. change log level Closes #6154 * build(controller): build tieredstorage with bazel 1. build tieredstorage with bazel * build(controller): build tieredstorage with bazel 1. build tieredstorage with bazel * style(tieredstorage): change log level 1. change log level Closes #6154 * test(tieredstorage): ignore tests about S3Mock 1. ignore tests about S3Mock * test(tieredstorage): ignore tests about S3Mock 1. ignore tests about S3Mock
1. change log level Closes apache/rocketmq#6154
1. change log level Closes apache/rocketmq#6154
Support S3 backend for TiredStorage
Target
Implment
Core Idea
When the upper layer calls
commit0
, the data is directly uploaded, and our upload buffer and pre-read mechanism are implemented inTieredFileSegment
andTieredMessageFetcher
, respectively. We do not need to do optimization at this layer and treat commit0 as the method that is actually persisted to S3. When we fill an entireSegment
, we can useUploadPartCopy
to actually organize these objects of different sizes into one large object.Complete Process
We need to maintain metadata
S3SegmentMetadata
for thisSegment
, which maintains the mapping between theposition
of this logicalSegment
and the directory and size of the file on S3. For example, the internal small file metadata of the firstSegment
ofCommitLog
ofqueue1
.0
-><clusterName1/brokerName1/topic1/queue1/commitLog/seg-0/chunk-0, 1024>
1024
-><clusterName1/brokerName1/topic1/queue1/commitLog/seg-0/chunk-1024, 4096>
5120
-><clusterName1/brokerName1/topic1/queue1/commitLog/seg-0/chunk-5120, 5120>
That is, call the
S3Segment#S3Segment
constructor function. The path of the logicalSegment
file is concatenated according to the rules, and the format is as follows:{clusterName}/{brokerName}/{topic}/{queue}/{type}/seg-{baseOffset}
is used as the path of the logical Segment file. The path below is calledbaseDir
.Get the objects under
baseDir
and construct the metadata.That is, the upper layer calls the
S3Segment#commit0()
method. Suppose to write data withposition=0
andlength=1024
, that is, write to the object pathbaseDir/chunk-0
with a size of1024
. Upload it directly through the S3 client asynchronously. After the upload is completed, update theS3SegmentMetadata
:0
-><baseDir/chunk-0, 1024>
That is, call the
S3Segment#read0()
method. Suppose to read 1M data withposition=0
andlength=1024
. Then you can find the objectbaseDir/chunk-0
starting from position 0 throughS3SegmentMetadata
, and download it directly through the S3 client.Suppose we write data with
position=1024
andlength=4096
, that is, submit 4K data from position 1K. Upload it directly through the S3 client asynchronously. After the upload is completed, update theS3SegmentMetadata
:0
-><baseDir/chunk-0, 1024>
1024
-><baseDir/chunk-1024, 4096>
Assuming we are reading from
position = 512
andlength = 1024
, according to S3SegmentMetadata, we need to fetch the data of[512, 1024)
frombaseDir/chunk-0
and[1024, 1536)
frombaseDir/chunk-1024
.Assuming we are writing data with
position = 5120
andlength = 5120
, we upload the objectbaseDir/chunk-5120
, and theS3SegmentMetadata
becomes:0
-><baseDir/chunk-0, 1024>
1024
-><baseDir/chunk-1024, 4096>
5120
-><baseDir/chunk-5120, 5120>
Assuming our
Segment
size is10240
Bytes. We have filled the aboveSegment
by now.We can asynchronously trigger an
uploadPartCopy
to consolidate all thechunks
into a largesegment
object. The path isbaseDir/segment
. After the copy is successful, we can asynchronously delete allbaseDir/chunk-*
, and updateS3SegmentMetadata
to:0
-><baseDir/segment, 10240>
At this point, we first concatenate the path
baseDir/segment
to determine whether the object exists. If it exists, it is a consolidated largeSegment
object, so we record the corresponding metadata locally.read0()
can directly read the object based on the offset. Then, we check whether thebaseDir/chunk-*
objects currently exist. If they do, it means that the asynchronous deletion has not been successful, so we can try to delete them again asynchronously.If the concatenated path
baseDir/segment
does not exist, it may be due to consolidation failure or the currentSegment
not being fully written. We can list all thechunk-*
paths underbaseDir
and then determine whether they are full (an interface can be added to determine this during recovery). If it is full, we can consolidate and delete it asynchronously. If it is not full, we can restore the metadataS3SegmentMetadata
normally.Possible Optimizations
Upload Buffer Pooling
A general
UploadBufferPool
can be used as the upload buffer. Each timecommit0
is called, the data is first put into the corresponding Buffer in the pool. When the overall buffer pool reaches the set threshold, the actual data upload is triggered.commit0
is called, the data is read into the correspondingBuffer
in thequeue
.Upload Buffer Pool
reaches the threshold, the actual data is uploaded to S3. All the data in eachqueue
'sBuffer
forms an object.S3SegmentMetadata
andTieredFileSegment#commitPosition
.Tasks
Elimination implement
new configuration
4 *1024* 1024
chunk
num in oneS3Segment
chunk
num in eachread0
callingtieredStoreGroupCommitSize
/s3ChunkSize
A segment is treated as a logical file and is divided into multiple physical files, or multiple physical objects, in the S3 view. We assume that each physical object has a default size of 4 MB, which is named
chunk
.For ease of process representation, we assume that readaheadChunkNum is 2 in the following.
Process
This is done in the
S3Segment#S3Segment
constructor. The path of the logical segment file is constructed by concatenating the following components according to a set of rules:clusterName/brokerName/topic/queue/type-baseOffset
. The path below this point is referred to asbaseDir
.That is, the
S3Segment#createFile()
method is called. Since no data has been written yet, we need to create the firstchunk
object and allocate 4MB of memory to cache the data for this chunk. We request the creation of an object from S3 in the formatbaseDir/chunk-startOffset
, which means creating abaseDir/chunk-0
object in S3 now.The
Segment#commit0()
method is called.We assume that wrting 2MB data this time.
The data is directlly writed into
chunk-0
, and uploaded to S3.That is, the
S3Segment#read0()
method is called. Suppose we are currently reading 1MB of data withposition = 0 and length = 1024
. Then it directly hits in the localchunk-0
buffer and returns.Suppose this time we write
position= 2048, length= 12 * 1024
data, that is, submit 12MB of data from 2MB position.At this point, the first 2MB of chunk-0 is cached locally, so we can directly concatenate the first 2MB of
chunk-0
with the first 2MB of the stream to form a completechunk-0
. Next, we correctly locate the first 2MB ofchunk-4096
,chunk-8192
, andchunk-12288
, and then upload them to S3. For the case of multiple chunks uploading at the same time, we use asynchronous/thread pool to upload them. If some chunks fail to upload, they are cached and then retried in the background asynchronously. If they fail multiple times, appropriate logical processing is performed.After the above commit, only the first 2MB of
chunk-12288
is cached locally. Now, we read 4096 bytes of data starting fromposition = 2048
, which means reading the second half ofchunk-0
and the first half ofchunk-4096
. Since we have enabled the pre-reading mechanism and the parameter is 2, we need to read two more chunks. Considering that we only read half ofchunk-4096
, we only need to read one more chunk, which ischunk-8192
.Then we read
chunk-0
,chunk-4096
, andchunk-8192
from S3. According to the pre-reading mechanism, we do not savechunk-0
and only savechunk-4096
andchunk-8192
in memory.Now, we read 4096 bytes of data starting from
position = 6144
, which means reading the second half ofchunk-4096
and the first half ofchunk-8192
. Since we have pre-loadedchunk-4096
andchunk-8192
into memory, we can directly return the data without reading from S3.At this point, we can asynchronously trigger an
uploadPartCopy
operation to consolidate all thechunks
into a single largesegment
object, and record the basic information of thesegment
in the object metadata. The object path isclusterName/brokerName/topic/queue/type-baseOffset-seg
. After the copy operation is successful, we can asynchronously delete the parent path of the chunks.Now we concatenate the path
clusterName/brokerName/topic/queue/type-baseOffset-seg
and check whether the object exists. If it exists, it means it is the already organized largeSegment
object, then we record the corresponding metadata locally, andread0()
can directly read the object based on the offset. Next, we check if there is an object under.../type-baseOffset
. If it exists, it means the asynchronous deletion has not been successful, so we can re-attempt asynchronous deletion.If the path
.../type-baseOffset-seg
does not exist, it may be due to failed consolidation or the currentsegment
has not been written to capacity. In this case, we can list all the chunk files under the path and then determine if the segment has been fully written (this can be checked by adding an interface that is called during recovery). If thesegment
has been fully written, we can consolidate thechunks
asynchronously and then delete them. If thesegment
has not been fully written, we can simply recover the latestchunk
by caching it.Advantages and disadvantages
chunk
caches can lead to excessive memory usage. Suppose that 1000 queues, even if only onechunk
is cached for one queue, can reach 4GB of memory usage.The text was updated successfully, but these errors were encountered: