Skip to content

Commit

Permalink
[fix] [ml] Fix orphan scheduled task for ledger create timeout check (#…
Browse files Browse the repository at this point in the history
…21542)

When an ML tries to create a new ledger, it will create a delay task to check if the ledger create request is timeout<sup>[1]</sup>.

However, we should cancel this delay task after the request to create new ledgers is finished. Otherwise, these tasks will cost unnecessary CPU resources.

Cancel the scheduled task after the create ledger request is finished

(cherry picked from commit e7536a2)
  • Loading branch information
poorbarcode committed Dec 13, 2023
1 parent 992eb44 commit bbc8dfb
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -3900,7 +3899,7 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) {
*/
protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType,
CreateCallback cb, Map<String, byte[]> metadata) {
AtomicBoolean ledgerCreated = new AtomicBoolean(false);
CompletableFuture<LedgerHandle> ledgerFutureHook = new CompletableFuture<>();
Map<String, byte[]> finalMetadata = new HashMap<>();
finalMetadata.putAll(ledgerMetadata);
finalMetadata.putAll(metadata);
Expand All @@ -3913,33 +3912,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
));
} catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
log.error("[{}] Serialize the placement configuration failed", name, e);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
return;
}
}
createdLedgerCustomMetadata = finalMetadata;

try {
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
} catch (Throwable cause) {
log.error("[{}] Encountered unexpected error when creating ledger",
name, cause);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
ledgerFutureHook.completeExceptionally(cause);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
return;
}
scheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {

ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
if (!ledgerFutureHook.isDone()
&& ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
if (log.isDebugEnabled()) {
log.debug("[{}] Timeout creating ledger", name);
}
cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
cb.createComplete(BKException.Code.TimeoutException, null, ledgerFutureHook);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger already created when timeout task is triggered", name);
}
}
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);

ledgerFutureHook.whenComplete((ignore, ex) -> {
timeoutChecker.cancel(false);
});
}

public Clock getClock() {
Expand All @@ -3948,16 +3953,12 @@ public Clock getClock() {

/**
* check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
*
* @param rc
* @param lh
* @param ctx
* @return
*/
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
if (ctx instanceof AtomicBoolean) {
if (ctx instanceof CompletableFuture) {
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) {
if (((CompletableFuture) ctx).complete(lh)) {
return false;
} else {
if (rc == BKException.Code.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -90,6 +92,8 @@
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.BoundedScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
Expand Down Expand Up @@ -135,6 +139,7 @@
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.testng.Assert;
Expand Down Expand Up @@ -3086,9 +3091,9 @@ public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {

latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
assertEquals(response.get(), BKException.Code.TimeoutException);
assertTrue(ctxHolder.get() instanceof AtomicBoolean);
AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get();
assertFalse(ledgerCreated.get());
assertTrue(ctxHolder.get() instanceof CompletableFuture);
CompletableFuture ledgerCreateHook = (CompletableFuture) ctxHolder.get();
assertTrue(ledgerCreateHook.isCompletedExceptionally());

ledger.close();
}
Expand Down Expand Up @@ -4017,4 +4022,52 @@ public void operationFailed(MetaStoreException e) {
});
future.join();
}

/***
* When a ML tries to create a ledger, it will create a delay task to check if the ledger create request is timeout.
* But we should guarantee that the delay task should be canceled after the ledger create request responded.
*/
@Test
public void testNoOrphanScheduledTasksAfterCloseML() throws Exception {
String mlName = UUID.randomUUID().toString();
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMetadataOperationsTimeoutSeconds(3600);

// Calculate pending task count.
long pendingTaskCountBefore = calculatePendingTaskCount(factory.getScheduledExecutor());
// Trigger create & close ML 1000 times.
for (int i = 0; i < 1000; i++) {
ManagedLedger ml = factory.open(mlName, config);
ml.close();
}
// Verify there is no orphan scheduled task.
long pendingTaskCountAfter = calculatePendingTaskCount(factory.getScheduledExecutor());
// Maybe there are other components also appended scheduled tasks, so leave 100 tasks to avoid flaky.
assertTrue(pendingTaskCountAfter - pendingTaskCountBefore < 100);
}

/**
* Calculate how many pending tasks in {@link OrderedScheduler}
*/
private long calculatePendingTaskCount(OrderedScheduler orderedScheduler) {
ExecutorService[] threads = WhiteboxImpl.getInternalState(orderedScheduler, "threads");
long taskCounter = 0;
for (ExecutorService thread : threads) {
BoundedScheduledExecutorService boundedScheduledExecutorService =
WhiteboxImpl.getInternalState(thread, "delegate");
BlockingQueue<Runnable> queue = WhiteboxImpl.getInternalState(boundedScheduledExecutorService, "queue");
for (Runnable r : queue) {
if (r instanceof FutureTask) {
FutureTask futureTask = (FutureTask) r;
if (!futureTask.isCancelled() && !futureTask.isDone()) {
taskCounter++;
}
} else {
taskCounter++;
}
}
}
return taskCounter;
}
}

0 comments on commit bbc8dfb

Please sign in to comment.