Skip to content

Commit

Permalink
Add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Mar 23, 2023
1 parent f854c59 commit 133887f
Showing 1 changed file with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,20 +339,22 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
List<ImmutableBucket> values = immutableBuckets.asMapOfRanges().values().stream().toList();
long minNumberMessages = Long.MAX_VALUE;
long minScheduleTimestampSum = Long.MAX_VALUE;
long minScheduleTimestamp = Long.MAX_VALUE;
int minIndex = -1;
for (int i = 0; i + 1 < values.size(); i++) {
ImmutableBucket bucketL = values.get(i);
ImmutableBucket bucketR = values.get(i + 1);
// We should skip the bucket which last segment already been load to memory, avoid record replicated index.
if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()
&& bucketR.lastSegmentEntryId > bucketR.getCurrentSegmentEntryId()
// Skip the bucket that is merging
&& !bucketL.merging && !bucketR.merging){
long scheduleTimestampSum =
long scheduleTimestamp =
Math.min(bucketL.firstScheduleTimestamps.get(bucketL.currentSegmentEntryId + 1),
bucketR.firstScheduleTimestamps.get(bucketR.currentSegmentEntryId + 1));
long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages;
if (scheduleTimestampSum <= minScheduleTimestampSum) {
minScheduleTimestampSum = scheduleTimestampSum;
if (scheduleTimestamp <= minScheduleTimestamp) {
minScheduleTimestamp = scheduleTimestamp;
if (numberMessages < minNumberMessages) {
minNumberMessages = numberMessages;
minIndex = i;
Expand Down

0 comments on commit 133887f

Please sign in to comment.