-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20664][core] Delete stale application data from SHS. #20138
Conversation
Detect the deletion of event log files from storage, and remove data about the related application attempt in the SHS. Also contains code to fix SPARK-21571 based on code by ericvandenbergfb.
@ericvandenbergfb |
There are some previous comments on this code at: vanzin#40 |
Test build #85611 has finished for PR 20138 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM since I have reviewed in vanzin#40
.asScala | ||
.toList | ||
stale.foreach { log => | ||
if (!log.appId.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: log.appId.isEmpty
try { | ||
fs.delete(log, true) | ||
} catch { | ||
case e: AccessControlException => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: e
is not used
Test build #85686 has finished for PR 20138 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still looking, but on the reviews of @ericvandenbergfb 's changes, it seemed like @ajbozarth and @jiangxb1987 were opposed to the more aggressive cleaning by default. I don't see the argument against it, but want to make sure they are aware of that change here.
}) | ||
} catch { | ||
// let the iteration over logInfos break, since an exception on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you've renamed logInfos
to updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and actually you've moved the try/catch so this is no longer true, you'll continue to submit all tasks if one throws an exception. (I guess I'm not really sure why the old code did it that way ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should handle RejectedExecutionException
explicitly, under this exception, we can log an error message and stop submit the rest tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually RejectedExecutionException
shouldn't ever be thrown here. The executor doesn't have a bounded queue, and it's very unlikely you'll ever submit Integer.MAX_VALUE
tasks here.
The code didn't use to catch any exception here (it was added along with the comment in a531fe1). Catching the exception doesn't do any harm, I just don't think this code will ever trigger.
I haven't had a chance to read though your code, but as @squito said, I am against any default feature that deletes files from the eventLog dir. Many users, such as myself, use one log dir for both the event log as well as their Spark logs. I believe it is a great feature for most use cases and should be available as a option defaulted to off. |
well, perhaps I mis-represented this -- you still need to turn the event log cleaning on explicitly with the old option, "spark.history.fs.cleaner.enabled". This just doesn't include the "aggressive" option that was originally proposed by @ericvandenbergfb |
Ok, no problems here on that front then. If I have time later to do a proper review and this has't been merged yet I'll take better a look at the whole PR |
(Some(app.info.id), app.attempts.head.info.attemptId) | ||
|
||
case _ => | ||
(None, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think comment here explaining that writing an entry with no appId will mark this log file as eligible for automatic recovery, if its still in that state after max_log_age. (if I understood correctly)
@@ -834,6 +906,9 @@ private[history] case class FsHistoryProviderMetadata( | |||
|
|||
private[history] case class LogInfo( | |||
@KVIndexParam logPath: String, | |||
@KVIndexParam("lastProcessed") lastProcessed: Long, | |||
appId: Option[String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also a comment here explaining why appId is an Option, as that is unexpected
I was actually suggesting have the "aggressive" option default turned on, and I'm also fine to not have that config at all. Will take a closer look at this later, thank you for ping me @squito ! |
} | ||
|
||
test("SPARK-21571: clean up removes invalid history files") { | ||
val clock = new ManualClock(TimeUnit.DAYS.toMillis(120)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, why start at 120 days? (not that it matters ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line:
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
Without that maxTime
would be negative and that seems to be triggering a bug somewhere else. I need to take a look at exactly what's happening there, but it seems unrelated to this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI #20284 fixes the underlying bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one small suggestion for an additional test, otherwise lgtm
clock.advance(TimeUnit.DAYS.toMillis(2)) | ||
provider.checkForLogs() | ||
provider.cleanLogs() | ||
assert(new File(testDir.toURI).listFiles().size === 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should add a case where one file starts out empty, say even for one full day, but then becomes valid before the expiration time, and make sure it does not get cleaned up.
lgtm |
LGTM |
Test build #86199 has finished for PR 20138 at commit
|
This PR introduces an more thoughtful event logs cleanup method, if users have |
LGTM and it looks safe. |
as RC1 failed and RC2 is going to be cut soon, I'm going to merge this to master & 2.3 |
Detect the deletion of event log files from storage, and remove data about the related application attempt in the SHS. Also contains code to fix SPARK-21571 based on code by ericvandenbergfb. Author: Marcelo Vanzin <[email protected]> Closes #20138 from vanzin/SPARK-20664. (cherry picked from commit fed2139) Signed-off-by: Imran Rashid <[email protected]>
Detect the deletion of event log files from storage, and remove
data about the related application attempt in the SHS.
Also contains code to fix SPARK-21571 based on code by ericvandenbergfb.