Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made PendingAddOp thread safe #3784

Merged
merged 1 commit into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1337,7 +1337,7 @@ public String toString() {
});
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
BKException.Code.InterruptedException),
BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
op.recyclePendAddOpObject();
}
Expand All @@ -1355,13 +1355,8 @@ public String toString() {
}
}

try {
executeOrdered(op);
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(
BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
}
op.initiate();

}

synchronized void updateLastConfirmed(long lac, long len) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,7 @@ public String toString() {
}
}

try {
clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
BKException.Code.InterruptedException),
LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx);
}
op.initiate();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
import org.apache.bookkeeper.client.api.WriteFlag;
Expand All @@ -52,7 +51,7 @@
*
*
*/
class PendingAddOp implements Runnable, WriteCallback {
class PendingAddOp implements WriteCallback {
private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class);

ByteBuf payload;
Expand All @@ -68,7 +67,7 @@ class PendingAddOp implements Runnable, WriteCallback {
LedgerHandle lh;
ClientContext clientCtx;
boolean isRecoveryAdd = false;
long requestTimeNanos;
volatile long requestTimeNanos;
long qwcLatency; // Quorum Write Completion Latency after response from quorum bookies.
Set<BookieId> addEntrySuccessBookies;
long writeDelayedStartTime; // min fault domains completion latency after response from ack quorum bookies
Expand Down Expand Up @@ -143,7 +142,7 @@ long getEntryId() {
return this.entryId;
}

void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
private void sendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;

clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex),
Expand All @@ -160,32 +159,22 @@ boolean maybeTimeout() {
return false;
}

void timeoutQuorumWait() {
try {
clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, new Runnable() {
@Override
public void run() {
if (completed) {
return;
} else if (addEntrySuccessBookies.size() >= lh.getLedgerMetadata().getAckQuorumSize()) {
// If ackQuorum number of bookies have acknowledged the write but still not complete, indicates
// failures due to not having been written to enough fault domains. Increment corresponding
// counter.
clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
}
lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
}
@Override
public String toString() {
return String.format("AddEntryQuorumTimeout(lid=%d, eid=%d)", lh.ledgerId, entryId);
}
});
} catch (RejectedExecutionException e) {
LOG.warn("Timeout add entry quorum wait failed {} entry: {}", lh.ledgerId, entryId);
synchronized void timeoutQuorumWait() {
if (completed) {
return;
}

if (addEntrySuccessBookies.size() >= lh.getLedgerMetadata().getAckQuorumSize()) {
// If ackQuorum number of bookies have acknowledged the write but still not complete, indicates
// failures due to not having been written to enough fault domains. Increment corresponding
// counter.
clientCtx.getClientStats().getWriteTimedOutDueToNotEnoughFaultDomains().inc();
}

lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException);
}

void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
synchronized void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
// update the ensemble
this.ensemble = ensemble;

Expand Down Expand Up @@ -242,8 +231,7 @@ void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
/**
* Initiate the add operation.
*/
@Override
public void run() {
public void initiate() {
hasRun = true;
if (callbackTriggered) {
// this should only be true if the request was failed due
Expand Down Expand Up @@ -280,7 +268,7 @@ public void run() {
}

@Override
public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
public synchronized void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
int bookieIndex = (Integer) ctx;
--pendingWriteRequests;

Expand Down Expand Up @@ -410,7 +398,7 @@ void sendAddSuccessCallbacks() {
lh.sendAddSuccessCallbacks();
}

void submitCallback(final int rc) {
synchronized void submitCallback(final int rc) {
if (LOG.isDebugEnabled()) {
LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(), entryId, rc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,20 +280,7 @@ private void completeAdd(final int rc,
final BookieId addr,
final WriteCallback cb,
final Object ctx) {
try {
executor.executeOrdered(ledgerId, new Runnable() {
@Override
public void run() {
cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
}
@Override
public String toString() {
return String.format("CompleteWrite(ledgerId=%d, entryId=%d, addr=%s)", ledgerId, entryId, addr);
}
});
} catch (RejectedExecutionException ree) {
cb.writeComplete(getRc(BKException.Code.InterruptedException), ledgerId, entryId, addr, ctx);
}
cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,9 +778,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
if (useV2WireProtocol) {
if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
executor.executeOrdered(ledgerId, () -> {
cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx);
});
cb.writeComplete(BKException.Code.IllegalOpException, ledgerId, entryId, bookieId, ctx);
return;
}
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testExecuteAfterCancelled() {
assertSame(lh, op.lh);
assertEquals(Code.NotEnoughBookiesException, rcHolder.get());

op.run();
op.initiate();
// after the op is run, the object is recycled.
assertNull(op.lh);
}
Expand Down