Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch RemoteFileChangedException inside of S3DynamoDBLogStore::read #1712

Closed
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ lazy val storage = (project in file("storage"))
// User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for
// versions with known vulnerabilities.
"org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided",

// Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API
// is not compatible with 3.3.2.
"org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided",

// Test Deps
Expand All @@ -194,7 +197,10 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb"))
// Test / publishArtifact := true,

libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided"
"com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided",

// Test Deps
"org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "test", // RemoteFileChangedException
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package io.delta.storage;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class presents an iterator view over the iterator supplier in the constructor.
*
* This class assumes that the iterator supplied by the supplier can throw, and that subsequent
* supplier.get() calls will return an iterator over the same data.
*
* If there are any RemoteFileChangedException during `next` and `hasNext` calls, will retry
* at most `MAX_RETRIES` times. If there are similar exceptions during the retry, those are handled
* and count towards the MAX_RETRIES.
*
* Internally, keeps track of the last-successfully-returned index. Upon retry, will iterate back
* to that same position.
*/
public class RetryableCloseableIterator implements CloseableIterator<String> {
private static final Logger LOG = LoggerFactory.getLogger(RetryableCloseableIterator.class);

public static final int DEFAULT_MAX_RETRIES = 3;

private final Supplier<CloseableIterator<String>> iterSupplier;

private final int maxRetries;

/**
* Index of the last element successfully returned without an exception. A value of -1 means
* that no element has ever been returned yet.
*/
private int lastSuccessfullIndex;

private int numRetries = 0;

private CloseableIterator<String> currentIter;

public RetryableCloseableIterator(
Supplier<CloseableIterator<String>> iterSupplier,
int maxRetries) {
if (maxRetries < 0) throw new IllegalArgumentException("maxRetries can't be negative");

this.iterSupplier = Objects.requireNonNull(iterSupplier);
this.maxRetries = maxRetries;
this.lastSuccessfullIndex = -1;
this.currentIter = this.iterSupplier.get();
}

public RetryableCloseableIterator(Supplier<CloseableIterator<String>> iterSupplier) {
this(iterSupplier, DEFAULT_MAX_RETRIES);
}

/////////////////
// Public APIs //
/////////////////

@Override
public void close() throws IOException {
currentIter.close();
}

/**
* `hasNext` must be idempotent. It does not change the `lastSuccessfulIndex` variable.
*/
@Override
public boolean hasNext() {
try {
return hasNextInternal();
} catch (IOException ex) {
if (isRemoteFileChangedException(ex)) {
try {
replayIterToLastSuccessfulIndex(ex);
} catch (IOException ex2) {
throw new UncheckedIOException(ex2);
}
return hasNext();
} else {
throw new UncheckedIOException(ex);
}

}
}

@Override
public String next() {
if (!hasNext()) throw new NoSuchElementException();

try {
final String ret = nextInternal();
lastSuccessfullIndex++;
return ret;
} catch (IOException ex) {
if (isRemoteFileChangedException(ex)) {
try {
replayIterToLastSuccessfulIndex(ex);
} catch (IOException ex2) {
throw new UncheckedIOException(ex2);
}

if (!hasNext()) {
throw new IllegalStateException(
String.format(
"A retried iterator doesn't have enough data " +
"(hasNext=false, lastSuccessfullIndex=%s)",
lastSuccessfullIndex
)
);
}

return next();
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new UncheckedIOException(ex);
}
}
}

//////////////////////////////////////
// Package-private APIs for testing //
//////////////////////////////////////

/** Visible for testing. */
int getLastSuccessfullIndex() {
return lastSuccessfullIndex;
}

/** Visible for testing. */
int getNumRetries() {
return numRetries;
}

////////////////////
// Helper Methods //
////////////////////

/** Throw a checked exception so we can catch this in the caller. */
private boolean hasNextInternal() throws IOException {
return currentIter.hasNext();
}

/** Throw a checked exception so we can catch this in the caller. */
private String nextInternal() throws IOException {
return currentIter.next();
}

/**
* Called after a RemoteFileChangedException was thrown. Tries to replay the underlying
* iter implementation (supplied by the `implSupplier`) to the last successful index, so that
* the previous error open (hasNext, or next) can be retried. If a RemoteFileChangedException
* is thrown while replaying the iter, we just increment the `numRetries` counter and try again.
*/
private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOException {
LOG.warn(
"Caught a RemoteFileChangedException. NumRetries is {} / {}.\n{}",
numRetries + 1, maxRetries, topLevelEx
);
currentIter.close();

while (numRetries < maxRetries) {
numRetries++;
LOG.info(
"Replaying until (inclusive) index {}. NumRetries is {} / {}.",
lastSuccessfullIndex, numRetries + 1, maxRetries
);
currentIter = iterSupplier.get();

// Last successful index replayed. Starts at -1, and not 0, because 0 means we've
// already replayed the 1st element!
int replayIndex = -1;
try {
while (replayIndex < lastSuccessfullIndex) {
if (currentIter.hasNext()) {
currentIter.next(); // Disregard data that has been read
replayIndex++;
} else {
throw new IllegalStateException(
String.format(
"A retried iterator doesn't have enough data " +
"(replayIndex=%s, lastSuccessfullIndex=%s)",
replayIndex,
lastSuccessfullIndex
)
);
}
}

// Just like how in RetryableCloseableIterator::next we have to handle
// RemoteFileChangedException, we must also hadnle that here during the replay.
// `currentIter.next()` isn't declared to throw a RemoteFileChangedException, so we
// trick the compiler into thinking this block can throw RemoteFileChangedException
// via `fakeIOException`. That way, we can catch it, and retry replaying the iter.
fakeIOException();

LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex);

return;
} catch (IOException ex) {
if (isRemoteFileChangedException(ex)) {
// Ignore and try replaying the iter again at the top of the while loop
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn("Caught a RemoteFileChangedException while replaying the iterator");
} else {
throw ex;
}
}
}

throw topLevelEx;
}

private boolean isRemoteFileChangedException(IOException ex) {
// `endsWith` should still work if the class is shaded.
final String exClassName = ex.getClass().getName();
return exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException");
}

private void fakeIOException() throws IOException {
if (false) {
throw new IOException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.fs.Path;

import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore {
*/
public static final String SPARK_CONF_PREFIX = "spark.io.delta.storage.S3DynamoDBLogStore";
public static final String BASE_CONF_PREFIX = "io.delta.storage.S3DynamoDBLogStore";
public static final String READ_RETRIES = "read.retries";
public static final String DDB_CLIENT_TABLE = "ddb.tableName";
public static final String DDB_CLIENT_REGION = "ddb.region";
public static final String DDB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider";
Expand Down Expand Up @@ -137,6 +139,35 @@ public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException {
tryEnsureTableExists(hadoopConf);
}

@Override
public CloseableIterator<String> read(Path path, Configuration hadoopConf) throws IOException {
// With many concurrent readers/writers, there's a chance that concurrent 'recovery'
// operations occur on the same file, i.e. the same temp file T(N) is copied into the target
// N.json file more than once. Though data loss will *NOT* occur, readers of N.json may
// receive a RemoteFileChangedException from S3 as the ETag of N.json was changed. This is
// safe to retry, so we do so here.
final int maxRetries = Integer.parseInt(
getParam(
hadoopConf,
READ_RETRIES,
Integer.toString(RetryableCloseableIterator.DEFAULT_MAX_RETRIES)
)
);

return new RetryableCloseableIterator(
() -> {
try {
return super.read(path, hadoopConf);
} catch (IOException e) {
// There was an issue resolving the file system. Nothing to do with a
// RemoteFileChangedException.
throw new UncheckedIOException(e);
}
},
maxRetries
);
}

@Override
protected long getExpirationDelaySeconds() {
return expirationDelaySeconds;
Expand Down
Loading