diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 99936705bed..04270c72f53 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -1337,7 +1337,7 @@ public String toString() { }); } catch (RejectedExecutionException e) { op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(), - BKException.Code.InterruptedException), + BKException.Code.InterruptedException), LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx); op.recyclePendAddOpObject(); } @@ -1355,13 +1355,8 @@ public String toString() { } } - try { - executeOrdered(op); - } catch (RejectedExecutionException e) { - op.cb.addCompleteWithLatency( - BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException), - LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx); - } + op.initiate(); + } synchronized void updateLastConfirmed(long lac, long len) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java index 92bddc9cc4f..c94a9154f51 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -275,13 +275,7 @@ public String toString() { } } - try { - clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op); - } catch (RejectedExecutionException e) { - op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(), - BKException.Code.InterruptedException), - LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx); - } + op.initiate(); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index d0ff59e45c6..8dc50370194 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency; import org.apache.bookkeeper.client.api.WriteFlag; @@ -52,7 +51,7 @@ * * */ -class PendingAddOp implements Runnable, WriteCallback { +class PendingAddOp implements WriteCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class); ByteBuf payload; @@ -68,7 +67,7 @@ class PendingAddOp implements Runnable, WriteCallback { LedgerHandle lh; ClientContext clientCtx; boolean isRecoveryAdd = false; - long requestTimeNanos; + volatile long requestTimeNanos; long qwcLatency; // Quorum Write Completion Latency after response from quorum bookies. Set addEntrySuccessBookies; long writeDelayedStartTime; // min fault domains completion latency after response from ack quorum bookies @@ -143,7 +142,7 @@ long getEntryId() { return this.entryId; } - void sendWriteRequest(List ensemble, int bookieIndex) { + private void sendWriteRequest(List ensemble, int bookieIndex) { int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE; clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex), @@ -160,32 +159,22 @@ boolean maybeTimeout() { return false; } - void timeoutQuorumWait() { - try { - clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, new Runnable() { - @Override - public void run() { - if (completed) { - return; - } else if (addEntrySuccessBookies.size() >= lh.getLedgerMetadata().getAckQuorumSize()) { - // If ackQuorum number of bookies have acknowledged the write but still not complete, indicates - // failures due to not having been written to enough fault domains. Increment corresponding - // counter. - clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc(); - } - lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException); - } - @Override - public String toString() { - return String.format("AddEntryQuorumTimeout(lid=%d, eid=%d)", lh.ledgerId, entryId); - } - }); - } catch (RejectedExecutionException e) { - LOG.warn("Timeout add entry quorum wait failed {} entry: {}", lh.ledgerId, entryId); + synchronized void timeoutQuorumWait() { + if (completed) { + return; } + + if (addEntrySuccessBookies.size() >= lh.getLedgerMetadata().getAckQuorumSize()) { + // If ackQuorum number of bookies have acknowledged the write but still not complete, indicates + // failures due to not having been written to enough fault domains. Increment corresponding + // counter. + clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc(); + } + + lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException); } - void unsetSuccessAndSendWriteRequest(List ensemble, int bookieIndex) { + synchronized void unsetSuccessAndSendWriteRequest(List ensemble, int bookieIndex) { // update the ensemble this.ensemble = ensemble; @@ -242,8 +231,7 @@ void unsetSuccessAndSendWriteRequest(List ensemble, int bookieIndex) { /** * Initiate the add operation. */ - @Override - public void run() { + public void initiate() { hasRun = true; if (callbackTriggered) { // this should only be true if the request was failed due @@ -280,7 +268,7 @@ public void run() { } @Override - public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) { + public synchronized void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) { int bookieIndex = (Integer) ctx; --pendingWriteRequests; @@ -410,7 +398,7 @@ void sendAddSuccessCallbacks() { lh.sendAddSuccessCallbacks(); } - void submitCallback(final int rc) { + synchronized void submitCallback(final int rc) { if (LOG.isDebugEnabled()) { LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(), entryId, rc); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index eaecb452820..43374776d4f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -280,20 +280,7 @@ private void completeAdd(final int rc, final BookieId addr, final WriteCallback cb, final Object ctx) { - try { - executor.executeOrdered(ledgerId, new Runnable() { - @Override - public void run() { - cb.writeComplete(rc, ledgerId, entryId, addr, ctx); - } - @Override - public String toString() { - return String.format("CompleteWrite(ledgerId=%d, entryId=%d, addr=%s)", ledgerId, entryId, addr); - } - }); - } catch (RejectedExecutionException ree) { - cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx); - } + cb.writeComplete(rc, ledgerId, entryId, addr, ctx); } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 888077fe80b..aea77fba29e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -778,9 +778,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf if (useV2WireProtocol) { if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) { LOG.error("invalid writeflags {} for v2 protocol", writeFlags); - executor.executeOrdered(ledgerId, () -> { - cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx); - }); + cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx); return; } completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java index 51d296c9005..5fb318c51fe 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/PendingAddOpTest.java @@ -82,7 +82,7 @@ public void testExecuteAfterCancelled() { assertSame(lh, op.lh); assertEquals(Code.NotEnoughBookiesException, rcHolder.get()); - op.run(); + op.initiate(); // after the op is run, the object is recycled. assertNull(op.lh); }