Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9618: Directory deletion failure leading to error task RocksDB open #8186

Merged
merged 2 commits into from
Mar 4, 2020

Conversation

abbccdda
Copy link
Contributor

@abbccdda abbccdda commented Feb 27, 2020

This PR tries to reorder the closing behavior by doing the state store cleanup first before releasing the lock.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@@ -83,8 +84,8 @@ static void wipeStateStores(final Logger log, final ProcessorStateManager stateM
try {
Utils.delete(stateMgr.baseDir());
} catch (final IOException fatalException) {
// since it is only called under dirty close, we always swallow the exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we now always wipe the state stores (even on clean close)? cc/ @guozhangwang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not wipe state stores on clean close, but only in dirty close.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wipeStateStores is only triggered if it is dirty close, under EOS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I wasn't looking at the latest trunk, I see you fixed that in a recent PR. In that case we should leave the comment?

// since it is only called under dirty close, we always swallow the exception
log.warn("Failed to wiping state stores for task {}", stateMgr.taskId());
log.error("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException);
throw new StreamsException(fatalException);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should retry, at least if this is a clean close...otherwise this thread still dies, and also kills any other threads that then get assigned this task.
Or better yet, can we fix the state store to not blindly open non-existent files? If we successfully delete the checkpoint file but fail to delete the stores, Streams shouldn't attempt to open any store files.
If we fail at deleting the checkpoint, we're more at risk as the data may be corrupted (otherwise we wouldn't be trying to delete the task state to begin with). In that case it seems like the only safe thing to do would be to retry until we at least delete the checkpoint file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My current feeling is that we should infinitely retry to make sure we clean up a directory completely, since in principal if this is not an atomic operation, the handling complexity during normal processing would be 2X IMHO. In this case, I would rather we stuck in here instead of proceed, as we potentially could already corrupt the state and what's even worse is that we don't know it.

Copy link
Member

@ableegoldman ableegoldman Feb 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, isn't the whole point of the checkpoint file to act as the source of truth? ie we should feel secure that deletion of the checkpoint file will always invalidate any leftover state, even if we fail to actually wipe it out -- whether the store or even the directory itself is left behind should not matter. We should absolutely retry indefinitely to delete the checkpoint file though. Once that's gone, either the cleanup thread will delete the directory or some thread will get the task re-assigned and delete the leftover store to recreate from scratch

@guozhangwang
Copy link
Contributor

@abbccdda Do we know what's the exception thrown here? I think if it is a transient error we should retry closing until succeed, OR if it is fatal we should throw it all the way up to thread as fatal and stop the thread (or even stop the instance based on how we want to proceed with KIP-572).

@abbccdda
Copy link
Contributor Author

Exception was due to non-empty directory:

[stream-soak-test-776e3009-0514-454c-a473-8964806818c0-StreamThread-1] task [1_1] Failed to wiping state stores for task 1_1 due to {} (org.apache.kafka.streams.processor.internals.StreamTask)
[2020-02-28T05:18:02-08:00] (streams-soak-trunk-eos_soak_i-0f3c459ae5907ce97_streamslog) java.nio.file.DirectoryNotEmptyException: /mnt/run/streams/state/stream-soak-test/1_1/KSTREAM-AGGREGATE-STATE-STORE-0000000019/KSTREAM-AGGREGATE-STATE-STORE-0000000019.1582804800000
        at sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242)
        at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
        at java.nio.file.Files.delete(Files.java:1126)
        at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:761)
        at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:744)
        at java.nio.file.Files.walkFileTree(Files.java:2688)
        at java.nio.file.Files.walkFileTree(Files.java:2742)
        at org.apache.kafka.common.utils.Utils.delete(Utils.java:744)
        at org.apache.kafka.streams.processor.internals.StateManagerUtil.wipeStateStores(StateManagerUtil.java:84)
        at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:450)
        at org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:392)
        at org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:337)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:766)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:725)

@abbccdda
Copy link
Contributor Author

I'm looking into it, and feel this is some bug from the directory deletion utility itself, as we are not failing every time.

@abbccdda
Copy link
Contributor Author

   * <p> On some operating systems it may not be possible to remove a file when
     * it is open and in use by this Java virtual machine or other programs.

Do we have the guarantee that the file is not being accessed by JVM when deleting it?

@guozhangwang
Copy link
Contributor

Thinking about that a bit more, the closeDirty is only called in those places:

  1. onAssignment: this is within a rebalance, and we are certain these tasks going-to-be-closed are not owned by anyone else yet.

  2. shutdown: at this point all the still-owned tasks should not yet owned by anyone else.

  3. handleCorruption: same as 2), it was not in a rebalance, and hence it should not owned by someone else yet.

  4. handleLostAll: this is the case it may not be true, since maybe the task is already migrated out to someone and only later the previous owner realized it and called handleLostAll, in this case though, since the lock is still on the previous thread's hand, the other threads should be blocked on dir.lock until this get releases the lock.

I think we should take a deeper look at the call trace of 4) and see if that's not the case and the file locking is actually not protecting us.

@ableegoldman
Copy link
Member

@guozhangwang I think there actually is a race condition there as we first unlock the directory, and then try to delete it -- another thread may be spinning trying to obtain this lock and get it before the directory deletion

@ableegoldman
Copy link
Member

Can we just delete the checkpoint file (and state) and then unlock the directory? We can attempt to also delete it after that, but if we fail just move on, as it's likely another thread came in and will reinitialize/restore the stores from scratch. If we fail for some other reason then thats also fine, the cleanup thread will remove the empty directory

@abbccdda
Copy link
Contributor Author

abbccdda commented Mar 1, 2020

I updated the JIRA with the race condition reasoning. My current thinking was that we should be more careful in the deletion of states by making sure we hold the lock during the deletion and release it afterwards. This means we may potentially have a bunch of empty state directories, but not stepping on each other's toes. Note that some OS doesn't allow the deletion of a file which is locked by the current JVM thread, so it is slightly difficult.

The other option is to have a directory like state_locks which maintains all the locks of state stores, so that we don't need to have this special deletion logic which only clears up all the states files except the lock directory.

@guozhangwang
Copy link
Contributor

@ableegoldman Yeah when I discussed with @abbccdda offline on Friday we also found that, but at that time we still need to confirm from the logs that it was indeed the issue.

@abbccdda From the trace you added on the JIRA ticket it seems our theory is indeed confirmed, thanks!

@abbccdda abbccdda force-pushed the broken_file branch 4 times, most recently from 8f97d35 to ab48698 Compare March 2, 2020 18:08
@guozhangwang
Copy link
Contributor

test this please

// if EOS is enabled, we wipe out the whole state store for unclean close
// since they are invalid to use anymore
if (!clean && !eosDisabled) {
StateManagerUtil.wipeStateStores(log, stateMgr);
}

// first close state manager (which is idempotent) then close the record collector (which could throw),
// if the latter throws and we re-close dirty which would close the state manager again.
StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that okay that we wipe out the directory already and then closing state manager, in which we would flush the stores and then close them? Would any exception be thrown?

@abbccdda abbccdda force-pushed the broken_file branch 2 times, most recently from 15b7258 to a6ec2ba Compare March 3, 2020 07:17
@guozhangwang
Copy link
Contributor

test this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the updated PR, overall looks good.

Could you also add an integration test that involves with the FS indeed to make sure this code path is okay? With mocked modulo unit tests I concern we do not have a good coverage here.

Also could you trigger a system test as well?

// first close state manager (which is idempotent) then close the record collector (which could throw),
// if the latter throws and we re-close dirty which would close the state manager again.
StateManagerUtil.closeStateManager(log, logPrefix, clean,
wipeStateStore, stateMgr, stateDirectory, "active");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use TaskType.ACTIVE.

@@ -179,7 +179,7 @@ private void close(final boolean clean) {
}

if (state() == State.CLOSING) {
StateManagerUtil.closeStateManager(log, logPrefix, clean, stateMgr, stateDirectory);
StateManagerUtil.closeStateManager(log, logPrefix, clean, false, stateMgr, stateDirectory, "standby");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use TaskType.STANDBY.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the added integration test! Left some more comments.

@abbccdda
Copy link
Contributor Author

abbccdda commented Mar 4, 2020

@guozhangwang
Copy link
Contributor

test this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Waiting for green build.

@guozhangwang
Copy link
Contributor

test this please

@abbccdda
Copy link
Contributor Author

abbccdda commented Mar 4, 2020

The only real test failure:
org.apache.kafka.streams.processor.internals.StateManagerUtilTest.testCloseStateManagerDirtyShallSwallowException

is fixed by last commit.
Others are flaky tests.

@guozhangwang guozhangwang merged commit c2ec974 into apache:trunk Mar 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants