-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MINOR: Check store directory empty to decide whether throw task corrupted exception with EOS #8180
Changes from all commits
ef6d111
f7d8c78
29614dc
46b1365
123a28c
2502956
d277f63
8057d8e
24ada91
59ebd0d
d1bea17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -413,48 +413,50 @@ public void closeDirty() { | |
*/ | ||
private void close(final boolean clean) { | ||
if (state() == State.CREATED) { | ||
// the task is created and not initialized, do nothing | ||
transitionTo(State.CLOSING); | ||
} else { | ||
if (state() == State.RUNNING) { | ||
closeTopology(clean); | ||
// the task is created and not initialized, just re-write the checkpoint file | ||
executeAndMaybeSwallow(clean, () -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an improvement I want to add along with the PR: since we delete the checkpoint file after completed loading, and before we initialize to RESTORING if there's an exception we could lose that checkpoint. So here in Restoring / Created state upon closing I also added the checkpoint logic here. |
||
stateMgr.checkpoint(Collections.emptyMap()); | ||
}, "state manager checkpoint"); | ||
|
||
if (clean) { | ||
commitState(); | ||
// whenever we have successfully committed state, it is safe to checkpoint | ||
// the state as well no matter if EOS is enabled or not | ||
stateMgr.checkpoint(checkpointableOffsets()); | ||
} else { | ||
executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush"); | ||
} | ||
transitionTo(State.CLOSING); | ||
} else if (state() == State.RUNNING) { | ||
closeTopology(clean); | ||
|
||
transitionTo(State.CLOSING); | ||
} else if (state() == State.RESTORING) { | ||
executeAndMaybeSwallow(clean, () -> { | ||
stateMgr.flush(); | ||
stateMgr.checkpoint(Collections.emptyMap()); | ||
}, "state manager flush and checkpoint"); | ||
|
||
transitionTo(State.CLOSING); | ||
} else if (state() == State.SUSPENDED) { | ||
// do not need to commit / checkpoint, since when suspending we've already committed the state | ||
transitionTo(State.CLOSING); | ||
if (clean) { | ||
commitState(); | ||
// whenever we have successfully committed state, it is safe to checkpoint | ||
// the state as well no matter if EOS is enabled or not | ||
stateMgr.checkpoint(checkpointableOffsets()); | ||
} else { | ||
executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush"); | ||
} | ||
|
||
if (state() == State.CLOSING) { | ||
// if EOS is enabled, we wipe out the whole state store for unclean close | ||
// since they are invalid to use anymore | ||
final boolean wipeStateStore = !clean && !eosDisabled; | ||
transitionTo(State.CLOSING); | ||
} else if (state() == State.RESTORING) { | ||
executeAndMaybeSwallow(clean, () -> { | ||
stateMgr.flush(); | ||
stateMgr.checkpoint(Collections.emptyMap()); | ||
}, "state manager flush and checkpoint"); | ||
|
||
// first close state manager (which is idempotent) then close the record collector (which could throw), | ||
// if the latter throws and we re-close dirty which would close the state manager again. | ||
StateManagerUtil.closeStateManager(log, logPrefix, clean, | ||
wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE); | ||
transitionTo(State.CLOSING); | ||
} else if (state() == State.SUSPENDED) { | ||
// do not need to commit / checkpoint, since when suspending we've already committed the state | ||
transitionTo(State.CLOSING); | ||
} | ||
|
||
executeAndMaybeSwallow(clean, recordCollector::close, "record collector close"); | ||
} else { | ||
throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id); | ||
} | ||
if (state() == State.CLOSING) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could have added the stores, but then before transiting to RESTORING an exception happens; hence here I always call closeStateManager which would just be an no-op if the lock is not grabbed / stores not added. |
||
// if EOS is enabled, we wipe out the whole state store for unclean close | ||
// since they are invalid to use anymore | ||
final boolean wipeStateStore = !clean && !eosDisabled; | ||
|
||
// first close state manager (which is idempotent) then close the record collector (which could throw), | ||
// if the latter throws and we re-close dirty which would close the state manager again. | ||
StateManagerUtil.closeStateManager(log, logPrefix, clean, | ||
wipeStateStore, stateMgr, stateDirectory, TaskType.ACTIVE); | ||
|
||
executeAndMaybeSwallow(clean, recordCollector::close, "record collector close"); | ||
} else { | ||
throw new IllegalStateException("Illegal state " + state() + " while closing active task " + id); | ||
} | ||
|
||
partitionGroup.close(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I just re-arranged the parameter orders and below I use the log-prefix from the log-context rather than creating a new string, no critical changes here.