Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix deadlock in PendingAckHandleImpl #18989

Merged
merged 5 commits into from
Dec 20, 2022
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();

pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
.thenAccept(init -> {
.thenAcceptAsync(init -> {
if (init) {
initPendingAckStore();
} else {
completeHandleFuture();
}
})
}, internalPinnedExecutor)
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
changeToErrorState();
Expand Down Expand Up @@ -937,18 +937,16 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
return transactionPendingAckStats;
}

public synchronized void completeHandleFuture() {
if (!this.pendingAckHandleCompletableFuture.isDone()) {
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
}
if (recoverTime.getRecoverStartTime() != 0L) {
public void completeHandleFuture() {
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
if (recoverTime.getRecoverStartTime() != 0L && recoverTime.getRecoverEndTime() == 0L) {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
Comment on lines +942 to 943
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the synchronized keyword in the method declaration, this conditional update to recoveredEndTime is subject to data races, and the variables will not be safely published to other threads. However, at this time, the recoverTime object is only used for stats, and we have a tendency to have relaxed requirements for stats. Perhaps it is fine. We could alternatively add methods to the class that ensure proper synchronization of updates. I don't feel strongly, except to mention that it is somewhat brittle to assume the variable will only ever be used by stats.

Copy link
Contributor Author

@nicoloboschi nicoloboschi Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're totally right. This recoveryTime registers the time of the replay. It should be handled directly in the callback after the replay is completed. For the current usage, being non thread-safe is fine and I'd prefer leave it as is.
Those methods don't need synchronization at all since, at the moment, there's no data race for them.

}
}

public synchronized void exceptionHandleFuture(Throwable t) {
if (!this.pendingAckHandleCompletableFuture.isDone()) {
this.pendingAckHandleCompletableFuture.completeExceptionally(t);
public void exceptionHandleFuture(Throwable t) {
final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t);
if (completedNow) {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
Expand Down