Skip to content

Commit

Permalink
[fix][ml] There are two same-named managed ledgers in the one broker (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Jun 7, 2023
1 parent f53cdda commit d7186a6
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,19 @@ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob
}

void close(ManagedLedger ledger) {
// Remove the ledger from the internal factory cache
ledgers.remove(ledger.getName());
entryCacheManager.removeEntryCache(ledger.getName());
// If the future in map is not done or has exceptionally complete, it means that @param-ledger is not in the
// map.
CompletableFuture<ManagedLedgerImpl> ledgerFuture = ledgers.get(ledger.getName());
if (ledgerFuture == null || !ledgerFuture.isDone() || ledgerFuture.isCompletedExceptionally()){
return;
}
if (ledgerFuture.join() != ledger){
return;
}
// Remove the ledger from the internal factory cache.
if (ledgers.remove(ledger.getName(), ledgerFuture)) {
entryCacheManager.removeEntryCache(ledger.getName());
}
}

public CompletableFuture<Void> shutdownAsync() throws ManagedLedgerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1528,6 +1528,15 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) {

@Override
public synchronized void createComplete(int rc, final LedgerHandle lh, Object ctx) {
if (STATE_UPDATER.get(this) == State.Closed) {
if (lh != null) {
log.warn("[{}] ledger create completed after the managed ledger is closed rc={} ledger={}, so just"
+ " close this ledger handle.", name, rc, lh != null ? lh.getId() : -1);
lh.closeAsync();
}
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@

import static org.testng.Assert.assertEquals;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ManagedLedgerFactoryTest extends MockedBookKeeperTestCase {
Expand Down Expand Up @@ -71,4 +75,43 @@ public void testGetManagedLedgerInfoWithClose() throws Exception {
assertEquals(mri.to.entryId, 0);
}

/**
* see: https://github.com/apache/pulsar/pull/18688
*/
@Test
public void testConcurrentCloseLedgerAndSwitchLedgerForReproduceIssue() throws Exception {
String managedLedgerName = "lg_" + UUID.randomUUID().toString().replaceAll("-", "_");

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setThrottleMarkDelete(1);
config.setMaximumRolloverTime(Integer.MAX_VALUE, TimeUnit.SECONDS);
config.setMaxEntriesPerLedger(5);

// create managedLedger once and close it.
ManagedLedgerImpl managedLedger1 = (ManagedLedgerImpl) factory.open(managedLedgerName, config);
waitManagedLedgerStateEquals(managedLedger1, ManagedLedgerImpl.State.LedgerOpened);
managedLedger1.close();

// create managedLedger the second time.
ManagedLedgerImpl managedLedger2 = (ManagedLedgerImpl) factory.open(managedLedgerName, config);
waitManagedLedgerStateEquals(managedLedger2, ManagedLedgerImpl.State.LedgerOpened);

// Mock the task create ledger complete now, it will change the state to another value which not is Closed.
// Close managedLedger1 the second time.
managedLedger1.createComplete(1, null, null);
managedLedger1.close();

// Verify managedLedger2 is still there.
Assert.assertFalse(factory.ledgers.isEmpty());
Assert.assertEquals(factory.ledgers.get(managedLedger2.getName()).join(), managedLedger2);

// cleanup.
managedLedger2.close();
}

private void waitManagedLedgerStateEquals(ManagedLedgerImpl managedLedger, ManagedLedgerImpl.State expectedStat){
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(managedLedger.getState() == expectedStat));
}

}

0 comments on commit d7186a6

Please sign in to comment.