Skip to content

Commit

Permalink
Catch RemoteFileChangedException inside of S3DynamoDBLogStore::read
Browse files Browse the repository at this point in the history
## Description
In S3, if `N.json` is over-written while an input stream is open on it, then the ETag will change and a `RemoteFileChangedException` will be thrown.

This PR adds logic to retry reading that `N.json` file, at the exact same line that the error occurred at.

This assumes and requires that the contents of N.json have been overwritten with the same identical content!

As an important implementation highlight: so, if we are at index 25 (thus, the last successfully read index is 24), and we try to call `.next()` on the read iterator, and an RemoteFileChangedException is thrown, we will re-generate the read iterator, skip all the way to index 25, and try reading it again.

New UTs.

## Does this PR introduce _any_ user-facing changes?

No.

Closes #1712

Signed-off-by: Scott Sandre <[email protected]>
GitOrigin-RevId: e94217d3207abf9a246be7464b1f9d76d1463597
  • Loading branch information
scottsand-db committed May 1, 2023
1 parent 91ab166 commit d7483ad
Show file tree
Hide file tree
Showing 4 changed files with 488 additions and 1 deletion.
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ lazy val storage = (project in file("storage"))
// User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for
// versions with known vulnerabilities.
"org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided",

// Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API
// is not compatible with 3.3.2.
"org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided",

// Test Deps
Expand All @@ -194,7 +197,10 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb"))
// Test / publishArtifact := true,

libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided"
"com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided",

// Test Deps
"org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "test", // RemoteFileChangedException
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package io.delta.storage;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class presents an iterator view over the iterator supplier in the constructor.
*
* This class assumes that the iterator supplied by the supplier can throw, and that subsequent
* supplier.get() calls will return an iterator over the same data.
*
* If there are any RemoteFileChangedException during `next` and `hasNext` calls, will retry
* at most `MAX_RETRIES` times. If there are similar exceptions during the retry, those are handled
* and count towards the MAX_RETRIES.
*
* Internally, keeps track of the last-successfully-returned index. Upon retry, will iterate back
* to that same position.
*/
public class RetryableCloseableIterator implements CloseableIterator<String> {
private static final Logger LOG = LoggerFactory.getLogger(RetryableCloseableIterator.class);

public static final int DEFAULT_MAX_RETRIES = 3;

private final Supplier<CloseableIterator<String>> iterSupplier;

private final int maxRetries;

/**
* Index of the last element successfully returned without an exception. A value of -1 means
* that no element has ever been returned yet.
*/
private int lastSuccessfullIndex;

private int numRetries = 0;

private CloseableIterator<String> currentIter;

public RetryableCloseableIterator(
Supplier<CloseableIterator<String>> iterSupplier,
int maxRetries) {
if (maxRetries < 0) throw new IllegalArgumentException("maxRetries can't be negative");

this.iterSupplier = Objects.requireNonNull(iterSupplier);
this.maxRetries = maxRetries;
this.lastSuccessfullIndex = -1;
this.currentIter = this.iterSupplier.get();
}

public RetryableCloseableIterator(Supplier<CloseableIterator<String>> iterSupplier) {
this(iterSupplier, DEFAULT_MAX_RETRIES);
}

/////////////////
// Public APIs //
/////////////////

@Override
public void close() throws IOException {
currentIter.close();
}

/**
* `hasNext` must be idempotent. It does not change the `lastSuccessfulIndex` variable.
*/
@Override
public boolean hasNext() {
try {
return hasNextInternal();
} catch (IOException ex) {
if (isRemoteFileChangedException(ex)) {
try {
replayIterToLastSuccessfulIndex(ex);
} catch (IOException ex2) {
throw new UncheckedIOException(ex2);
}
return hasNext();
} else {
throw new UncheckedIOException(ex);
}

}
}

@Override
public String next() {
if (!hasNext()) throw new NoSuchElementException();

try {
final String ret = nextInternal();
lastSuccessfullIndex++;
return ret;
} catch (IOException ex) {
if (isRemoteFileChangedException(ex)) {
try {
replayIterToLastSuccessfulIndex(ex);
} catch (IOException ex2) {
throw new UncheckedIOException(ex2);
}

if (!hasNext()) {
throw new IllegalStateException(
String.format(
"A retried iterator doesn't have enough data " +
"(hasNext=false, lastSuccessfullIndex=%s)",
lastSuccessfullIndex
)
);
}

return next();
} else {
throw new UncheckedIOException(ex);
}
}
}

//////////////////////////////////////
// Package-private APIs for testing //
//////////////////////////////////////

/** Visible for testing. */
int getLastSuccessfullIndex() {
return lastSuccessfullIndex;
}

/** Visible for testing. */
int getNumRetries() {
return numRetries;
}

////////////////////
// Helper Methods //
////////////////////

/** Throw a checked exception so we can catch this in the caller. */
private boolean hasNextInternal() throws IOException {
return currentIter.hasNext();
}

/** Throw a checked exception so we can catch this in the caller. */
private String nextInternal() throws IOException {
return currentIter.next();
}

/**
* Called after a RemoteFileChangedException was thrown. Tries to replay the underlying
* iter implementation (supplied by the `implSupplier`) to the last successful index, so that
* the previous error open (hasNext, or next) can be retried. If a RemoteFileChangedException
* is thrown while replaying the iter, we just increment the `numRetries` counter and try again.
*/
private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOException {
LOG.warn(
"Caught a RemoteFileChangedException. NumRetries is {} / {}.\n{}",
numRetries + 1, maxRetries, topLevelEx
);
currentIter.close();

while (numRetries < maxRetries) {
numRetries++;
LOG.info(
"Replaying until (inclusive) index {}. NumRetries is {} / {}.",
lastSuccessfullIndex, numRetries + 1, maxRetries
);
currentIter = iterSupplier.get();

// Last successful index replayed. Starts at -1, and not 0, because 0 means we've
// already replayed the 1st element!
int replayIndex = -1;
try {
while (replayIndex < lastSuccessfullIndex) {
if (currentIter.hasNext()) {
currentIter.next(); // Disregard data that has been read
replayIndex++;
} else {
throw new IllegalStateException(
String.format(
"A retried iterator doesn't have enough data " +
"(replayIndex=%s, lastSuccessfullIndex=%s)",
replayIndex,
lastSuccessfullIndex
)
);
}
}

// Just like how in RetryableCloseableIterator::next we have to handle
// RemoteFileChangedException, we must also hadnle that here during the replay.
// `currentIter.next()` isn't declared to throw a RemoteFileChangedException, so we
// trick the compiler into thinking this block can throw RemoteFileChangedException
// via `fakeIOException`. That way, we can catch it, and retry replaying the iter.
fakeIOException();

LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex);

return;
} catch (IOException ex) {
if (isRemoteFileChangedException(ex)) {
// Ignore and try replaying the iter again at the top of the while loop
LOG.warn("Caught a RemoteFileChangedException while replaying the iterator");
} else {
throw ex;
}
}
}

throw topLevelEx;
}

private boolean isRemoteFileChangedException(IOException ex) {
// `endsWith` should still work if the class is shaded.
final String exClassName = ex.getClass().getName();
return exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException");
}

private void fakeIOException() throws IOException {
if (false) {
throw new IOException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.fs.Path;

import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore {
*/
public static final String SPARK_CONF_PREFIX = "spark.io.delta.storage.S3DynamoDBLogStore";
public static final String BASE_CONF_PREFIX = "io.delta.storage.S3DynamoDBLogStore";
public static final String READ_RETRIES = "read.retries";
public static final String DDB_CLIENT_TABLE = "ddb.tableName";
public static final String DDB_CLIENT_REGION = "ddb.region";
public static final String DDB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider";
Expand Down Expand Up @@ -137,6 +139,35 @@ public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException {
tryEnsureTableExists(hadoopConf);
}

@Override
public CloseableIterator<String> read(Path path, Configuration hadoopConf) throws IOException {
// With many concurrent readers/writers, there's a chance that concurrent 'recovery'
// operations occur on the same file, i.e. the same temp file T(N) is copied into the target
// N.json file more than once. Though data loss will *NOT* occur, readers of N.json may
// receive a RemoteFileChangedException from S3 as the ETag of N.json was changed. This is
// safe to retry, so we do so here.
final int maxRetries = Integer.parseInt(
getParam(
hadoopConf,
READ_RETRIES,
Integer.toString(RetryableCloseableIterator.DEFAULT_MAX_RETRIES)
)
);

return new RetryableCloseableIterator(
() -> {
try {
return super.read(path, hadoopConf);
} catch (IOException e) {
// There was an issue resolving the file system. Nothing to do with a
// RemoteFileChangedException.
throw new UncheckedIOException(e);
}
},
maxRetries
);
}

@Override
protected long getExpirationDelaySeconds() {
return expirationDelaySeconds;
Expand Down
Loading

0 comments on commit d7483ad

Please sign in to comment.