Skip to content

Commit

Permalink
Made PendingAddOp thread safe (apache#3784)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored and Anup Ghatage committed Jul 12, 2024
1 parent 76ac940 commit ee9eca8
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ public String toString() {
});
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
BKException.Code.InterruptedException),
BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
op.recyclePendAddOpObject();
}
Expand All @@ -1355,13 +1355,8 @@ public String toString() {
}
}

try {
executeOrdered(op);
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(
BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
}
op.initiate();

}

synchronized void updateLastConfirmed(long lac, long len) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,7 @@ public String toString() {
}
}

try {
clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
BKException.Code.InterruptedException),
LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
}
op.initiate();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
import org.apache.bookkeeper.client.api.WriteFlag;
Expand All @@ -52,7 +51,7 @@
*
*
*/
class PendingAddOp implements Runnable, WriteCallback {
class PendingAddOp implements WriteCallback {
private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class);

ByteBuf payload;
Expand All @@ -68,7 +67,7 @@ class PendingAddOp implements Runnable, WriteCallback {
LedgerHandle lh;
ClientContext clientCtx;
boolean isRecoveryAdd = false;
long requestTimeNanos;
volatile long requestTimeNanos;
long qwcLatency; // Quorum Write Completion Latency after response from quorum bookies.
Set<BookieId> addEntrySuccessBookies;
long writeDelayedStartTime; // min fault domains completion latency after response from ack quorum bookies
Expand Down Expand Up @@ -143,7 +142,7 @@ long getEntryId() {
return this.entryId;
}

void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
private void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;

clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex),
Expand All @@ -160,32 +159,22 @@ boolean maybeTimeout() {
return false;
}

void timeoutQuorumWait() {
try {
clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, new Runnable() {
@Override
public void run() {
if (completed) {
return;
} else if (addEntrySuccessBookies.size() >= lh.getLedgerMetadata().getAckQuorumSize()) {
// If ackQuorum number of bookies have acknowledged the write but still not complete, indicates
// failures due to not having been written to enough fault domains. Increment corresponding
// counter.
clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
}
lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
}
@Override
public String toString() {
return String.format("AddEntryQuorumTimeout(lid=%d, eid=%d)", lh.ledgerId, entryId);
}
});
} catch (RejectedExecutionException e) {
LOG.warn("Timeout add entry quorum wait failed {} entry: {}", lh.ledgerId, entryId);
synchronized void timeoutQuorumWait() {
if (completed) {
return;
}

if (addEntrySuccessBookies.size() >= lh.getLedgerMetadata().getAckQuorumSize()) {
// If ackQuorum number of bookies have acknowledged the write but still not complete, indicates
// failures due to not having been written to enough fault domains. Increment corresponding
// counter.
clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
}

lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
}

void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
synchronized void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
// update the ensemble
this.ensemble = ensemble;

Expand Down Expand Up @@ -242,8 +231,7 @@ void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
/**
* Initiate the add operation.
*/
@Override
public void run() {
public void initiate() {
hasRun = true;
if (callbackTriggered) {
// this should only be true if the request was failed due
Expand Down Expand Up @@ -280,7 +268,7 @@ public void run() {
}

@Override
public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
public synchronized void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
int bookieIndex = (Integer) ctx;
--pendingWriteRequests;

Expand Down Expand Up @@ -410,7 +398,7 @@ void sendAddSuccessCallbacks() {
lh.sendAddSuccessCallbacks();
}

void submitCallback(final int rc) {
synchronized void submitCallback(final int rc) {
if (LOG.isDebugEnabled()) {
LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(), entryId, rc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,20 +280,7 @@ private void completeAdd(final int rc,
final BookieId addr,
final WriteCallback cb,
final Object ctx) {
try {
executor.executeOrdered(ledgerId, new Runnable() {
@Override
public void run() {
cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
}
@Override
public String toString() {
return String.format("CompleteWrite(ledgerId=%d, entryId=%d, addr=%s)", ledgerId, entryId, addr);
}
});
} catch (RejectedExecutionException ree) {
cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx);
}
cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,9 +778,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
if (useV2WireProtocol) {
if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
executor.executeOrdered(ledgerId, () -> {
cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx);
});
cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx);
return;
}
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testExecuteAfterCancelled() {
assertSame(lh, op.lh);
assertEquals(Code.NotEnoughBookiesException, rcHolder.get());

op.run();
op.initiate();
// after the op is run, the object is recycled.
assertNull(op.lh);
}
Expand Down

0 comments on commit ee9eca8

Please sign in to comment.