Skip to content

Commit

Permalink
fix code
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Mar 30, 2023
1 parent 4dd3acc commit 64967e8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ private synchronized List<ImmutableBucket> selectMergedBuckets(final List<Immuta
}

if (minIndex >= 0) {
return values.subList(minIndex, minIndex + MAX_MERGE_NUM);
return values.subList(minIndex, minIndex + mergeNum);
} else if (mergeNum > 2){
return selectMergedBuckets(values, mergeNum - 1);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
long length = 0;
List<ByteBuf> bufList = this.bucketSnapshots.get(bucketId);
for (ByteBuf byteBuf : bufList) {
length += byteBuf.array().length;
length += byteBuf.readableBytes();
}
return length;
}, executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {

clockTime.set(1 * 10);

Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(
tracker.getImmutableBuckets().asMapOfRanges().values().stream().noneMatch(x -> x.merging ||
!x.getSnapshotCreateFuture().get().isDone()));
});

assertTrue(tracker.hasMessageAvailable());
Set<PositionImpl> scheduledMessages = tracker.getScheduledMessages(100);

Expand All @@ -201,16 +207,16 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {

clockTime.set(30 * 10);

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50);
BucketDelayedDeliveryTracker tracker2 = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50);

assertFalse(tracker.containsMessage(101, 101));
assertEquals(tracker.getNumberOfDelayedMessages(), 70);
assertFalse(tracker2.containsMessage(101, 101));
assertEquals(tracker2.getNumberOfDelayedMessages(), 70);

clockTime.set(100 * 10);

assertTrue(tracker.hasMessageAvailable());
scheduledMessages = tracker.getScheduledMessages(70);
assertTrue(tracker2.hasMessageAvailable());
scheduledMessages = tracker2.getScheduledMessages(70);

assertEquals(scheduledMessages.size(), 70);

Expand All @@ -220,7 +226,7 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
i++;
}

tracker.close();
tracker2.close();
}

@Test
Expand Down

0 comments on commit 64967e8

Please sign in to comment.