-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records #13801
Conversation
I will review the tests once they run. Still tagging the 2 reviewers. |
@vamossagar12 looks like there are checkstyle failures |
log.warn("Failed to write offsets to secondary backing store", secondaryWriteError); | ||
} else { | ||
log.debug("Successfully flushed offsets to secondary backing store"); | ||
} | ||
//primaryWriteError is null at this point, and we don't care about secondaryWriteError | ||
callback.onCompletion(null, ignored); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this going to cause us to always block on writes to the secondary store which is something we want to avoid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Good catch. That doesn’t need to happen in this case. Will remove it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed this. I needed to tweak the callback logic a little but because eventually it needs to be invoked only once so that OffsetStorageWriter#handleFinishWrite
doesn't ignore the flush responses. Let me know how the changes are looking now.
Oh didn’t check that before taggging. Will check |
@vamossagar12 Thanks for the PR. I'd like to wait until Yash has had a chance to review before taking a look, so I've removed myself as a reviewer. Feel free to add me back once he (or someone else familiar with Connect) has had a chance to review. |
if (secondaryStore != null && containsTombstones && secondaryWriteError != null) { | ||
callback.onCompletion(secondaryWriteError, ignored); | ||
} else { | ||
callback.onCompletion(primaryWriteError, ignored); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's one case here where the write to secondary store is successful but fails to primary store. In such a case, there would be discrepancies in the 2 offsets topic but since that would also lead to the failure of offset commit, that discrepancy is still different from the originally reported issue. Let me know if that makes sense.
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
The build was aborted. But the failed tests seem unrelated as such to this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vamossagar12, I've left a few more comments on the non-testing parts; I'll go over the tests in a subsequent pass.
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
Outdated
Show resolved
Hide resolved
07b5a6d
to
73939b9
Compare
Thanks @yashmayya , I addressed your comments and added a response for one of the questions. Let me know if that makes sense. Regarding #13801 (comment), I am still undecided but I feel it's somewhat ok to have it in |
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
Thanks @yashmayya . I addressed the rest of the comments that you had. |
|
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vamossagar12, this is looking much better now!
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
Outdated
Show resolved
Hide resolved
ea19efb
to
a44ee8c
Compare
Thanks Yash.. I addressed your comments. |
Thanks @vamossagar12, I don't have any other comments other than the currently open ones - primarily the timeout case for when exactly-once support is enabled, the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Sagar! I had a feeling the implementation for this would be a little tricky, appreciate you taking a stab at it.
I've left some inline suggestions and have some higher-level thoughts/suggestions as well:
- Can we update the description to remove the parts that were copied from the Jira ticket? It's fine to just link to the ticket, and that also makes it easier to distinguish between the information already contained there, and new information contained only in this PR.
- Can we skip writes for tombstone records to the secondary store in our second write (the one that takes place after we've written to the primary store)? It's a bit of an optimization, and a bit of a readability improvement: it explicitly calls out the three-step process that we want to engage in when offsets contain tombstones where we first write tombstones to the secondary store, then we write everything to the primary store, and then finally we write non-tombstones to the secondary store.
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Outdated
Show resolved
Hide resolved
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
Show resolved
Hide resolved
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
Outdated
Show resolved
Hide resolved
…re object to be returned to the caller
… directly within thenAccept
…both primary and secindary future
765b150
to
2069742
Compare
Hey Chris, sorry for the long delay on this. I finally got a chance to verify the code that you provided and it makes sense. I agree that so far I was only thinking about either having 2 separate futures such that one waits for the other or trying to chain futures like CompletableFutures. However, the version you have provided is pretty straight forward and all the new tests passed OOB for me. Regarding
I am assuming that for non-exactly-once source tasks, you are referring to scenarios when offset flushes are triggered and when flush operations finish out of order. I reviewed the code and I can see that this is being checked in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Sagar, this is looking pretty good. I've made a brief pass on the testing parts; once this initial stuff is taken care of I can do a deeper dive.
.../runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java
Show resolved
Hide resolved
.../runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
.../runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
.../runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java
Show resolved
Hide resolved
.../runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
.../runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
.../runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java
Outdated
Show resolved
Hide resolved
…/ConnectorOffsetBackingStoreTest.java Co-authored-by: Chris Egerton <[email protected]>
…/ConnectorOffsetBackingStoreTest.java Co-authored-by: Chris Egerton <[email protected]>
…/ConnectorOffsetBackingStoreTest.java Co-authored-by: Chris Egerton <[email protected]>
Chris, I started changing the tests in alignment with the comments (i.e using AtomicBoolean, AtomicReference and removing try-catch block). I noticed an interesting issue with |
@C0urante , I think I figured out the reason for the above failure with I have fixed this by creating a new |
super.get(timeout, unit); | ||
if (primaryWriteError.get() != null) { | ||
if (primaryWriteError.get() instanceof TimeoutException) { | ||
throw (TimeoutException) primaryWriteError.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am slightly on the fence if we need to handle this case or not, because in ConvertingFutureCallback#result I see that any exception other than CancellationException
is wrapped in ExecutionException
.
Thanks Sagar, great catch! I suspected this would be a gnarly one to tackle but it's turning out to be even harder than I thought. I think there's still an issue with the current state of the PR. It looks like we aren't blocking on the future returned by I've sketched a new kind of private class ChainedOffsetWriteFuture implements Future<Void> {
private final OffsetBackingStore primaryStore;
private final OffsetBackingStore secondaryStore;
private final Map<ByteBuffer, ByteBuffer> completeOffsets;
private final Map<ByteBuffer, ByteBuffer> regularOffsets;
private final Callback<Void> callback;
private final AtomicReference<Throwable> writeError;
private final CountDownLatch completed;
public ChainedOffsetWriteFuture(
OffsetBackingStore primaryStore,
OffsetBackingStore secondaryStore,
Map<ByteBuffer, ByteBuffer> completeOffsets,
Map<ByteBuffer, ByteBuffer> regularOffsets,
Map<ByteBuffer, ByteBuffer> tombstoneOffsets,
Callback<Void> callback
) {
this.primaryStore = primaryStore;
this.secondaryStore = secondaryStore;
this.completeOffsets = completeOffsets;
this.regularOffsets = regularOffsets;
this.callback = callback;
this.writeError = new AtomicReference<>();
this.completed = new CountDownLatch(1);
secondaryStore.set(tombstoneOffsets, this::onFirstWrite);
}
private void onFirstWrite(Throwable error, Void ignored) {
if (error != null) {
log.trace("Skipping offsets write to primary store because secondary tombstone write has failed", error);
try (LoggingContext context = loggingContext()) {
callback.onCompletion(error, ignored);
writeError.compareAndSet(null, error);
completed.countDown();
}
return;
}
setPrimaryThenSecondary(primaryStore, secondaryStore, completeOffsets, regularOffsets, this::onSecondWrite);
}
private void onSecondWrite(Throwable error, Void ignored) {
callback.onCompletion(error, ignored);
writeError.compareAndSet(null, error);
completed.countDown();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return completed.getCount() == 0;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
completed.await();
if (writeError.get() != null) {
throw new ExecutionException(writeError.get());
}
return null;
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!completed.await(timeout, unit)) {
throw new TimeoutException("Failed to complete offset write in time");
}
if (writeError.get() != null) {
throw new ExecutionException(writeError.get());
}
return null;
}
} (I've omitted an implementation of The new class can be used at the end of if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
return new ChainedOffsetWriteFuture(
primaryStore,
secondaryStore,
values,
regularOffsets,
tombstoneOffsets,
callback
);
} else {
return setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback);
} |
Thanks Chris! I ran through the scenarios in the test and I can see that it handles the cases correctly. Regarding, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Sagar, this is looking great.
The one thing left I'd like to see is testing coverage for three-step writes. Right now we have great coverage for two-step writes (secondary tombstones then primary, primary then secondary non-tombstones, etc.), but it looks like we don't have any coverage for a mix of tombstone and non-tombstone offsets. Would it be possible to add cases for these kinds of scenarios?
.../runtime/src/test/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStoreTest.java
Show resolved
Hide resolved
… tombstone offsets
Thanks @C0urante , good catch, yeah those are missing. I have modified some of the tests to consider all the 3 types of offset records. I added another test for the case when write to secondary store times out for regular offsets. Let me know if these look ok coverage-wise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @vamossagar12!
…ore primary store (apache#13801) Reviewers: Chris Egerton <[email protected]>
Problem statement described in ticket
This PR explicitly fails the offset flush for cases when the dual write fails on secondary store. This PR does the same thing as prescribed in the ticket to write first to secondary stores in cases of tombstone records. Also, while originally the flushes didn't care about the outcome of the writes to secondary stores, this PR fails marks the flush as failed for cases when offsets with tombstone records also don't get written successfully to secondary stores. This is because Tombstone events happening are rare events as per ticket. And the probability of them failing on secondary and succeeding on primary is even lesser than that. Considering these things, this PR errs on the side of correctness for these rare events which can lead to inconsistent state. Note that post this PR, offset flus becomes a 3 step process when tombstone offsets are present.