Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch RemoteFileChangedException inside of S3DynamoDBLogStore::read #1712

Closed
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ lazy val storage = (project in file("storage"))
// User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for
// versions with known vulnerabilities.
"org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided",

// Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API
// is not compatible with 3.3.2.
"org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided",

// Test Deps
Expand All @@ -194,7 +197,10 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb"))
// Test / publishArtifact := true,

libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided"
"com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided", // RemoteFileChangedException

"org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "test", // RemoteFileChangedException
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package io.delta.storage;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Supplier;

import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class presents an iterator view over the iterator supplier in the constructor.
*
* This class assumes that the iterator supplied by the supplier can throw, and that subsequent
* supplier.get() calls will return an iterator over the same data.
*
* If there are any RemoteFileChangedException during `next` and `hasNext` calls, will retry
* at most `MAX_RETRIES` times.
*
* Internally, keeps track of the last-successfully-returned index. Upon retry, will iterate back
* to that same position. If another RemoteFileChangedException occurs during that retry, will fail.
* We can solve that exception inception later (iterators within iterators).
*/
public class RetryableCloseableIterator implements CloseableIterator<String> {
private static final Logger LOG = LoggerFactory.getLogger(RetryableCloseableIterator.class);

/** Visible for testing. */
public static final int MAX_RETRIES = 3;
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved

private final Supplier<CloseableIterator<String>> iterSupplier;

/**
* Index of the last element successfully returned without an exception. A value of -1 means
* that no element has ever been returned yet.
*/
private int lastSuccessfullIndex;

private int numRetries = 0;

private CloseableIterator<String> currentIter;

public RetryableCloseableIterator(Supplier<CloseableIterator<String>> iterSupplier) {
this.iterSupplier = Objects.requireNonNull(iterSupplier);
this.lastSuccessfullIndex = -1;
this.currentIter = this.iterSupplier.get();
}

/** Visible for testing. */
public int getLastSuccessfullIndex() {
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
return lastSuccessfullIndex;
}

/** Visible for testing. */
public int getNumRetries() {
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
return numRetries;
}

@Override
public void close() throws IOException {
if (currentIter != null) {
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
currentIter.close();
}
}

/**
* `hasNext` must be idempotent. It does not change the `lastSuccessfulIndex` variable.
*/
@Override
public boolean hasNext() {
try {
return hasNextInternal();
} catch (RemoteFileChangedException ex) {
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
LOG.warn(
"Caught a RemoteFileChangedException in `hastNext`. NumRetries is {} / {}.\n{}",
numRetries + 1, MAX_RETRIES, ex.toString()
);
if (numRetries < MAX_RETRIES) {
numRetries++;
replayIterToLastSuccessfulIndex();
// Now, the currentImpl has been recreated and iterated to the same index
return hasNext();
} else {
throw new RuntimeException(ex);
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/** Throw a checked exception so we can catch this in the caller. */
private boolean hasNextInternal() throws RemoteFileChangedException {
return currentIter.hasNext();
}

@Override
public String next() {
if (!hasNext()) throw new NoSuchElementException();

try {
final String ret = nextInternal();
lastSuccessfullIndex++;
return ret;
} catch (RemoteFileChangedException ex) {
LOG.warn(
"Caught a RemoteFileChangedException in `next`. NumRetries is {} / {}.\n{}",
numRetries + 1, MAX_RETRIES, ex.toString()
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
);
if (numRetries < MAX_RETRIES) {
numRetries++;
replayIterToLastSuccessfulIndex();
// Now, the currentImpl has been recreated and iterated to the same index
return next();
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new RuntimeException(ex);
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/** Throw a checked exception so we can catch this in the caller. */
private String nextInternal() throws RemoteFileChangedException {
return currentIter.next();
}

/**
* Called after a RemoteFileChangedException was thrown. Tries to replay the underlying
* iter implementation (supplied by the `implSupplier`) to the last successful index, so that
* the previous error open (hasNext, or next) can be retried.
*
* NOTE: This iter replay **itself** can throw a RemoteFileChangedException. Let's not deal
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
* with that for now - that would require handling exception inception.
*/
private void replayIterToLastSuccessfulIndex() {
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
// We still need to close the currentImpl, even though it threw
try {
currentIter.close();
} catch (IOException e) {
throw new RuntimeException(e);
}

LOG.info("Replaying until (inclusive) index {}", lastSuccessfullIndex);
currentIter = iterSupplier.get(); // this last impl threw an exception and is useless!

// Note: we iterate until `i` == `lastSuccessfullIndex`, so that index `i` is the last
// successfully returned index.
//
// e.g. `i` starts at -1. after the 1st currentImpl.next() call, i will be incremented to 0.
// This makes sense as per the `lastSuccessfullIndex` semantics, since 0 is the last
// index to be successfully returned.
// e.g. suppose `lastSuccessfulIndex` is 25. Then we have read 26 items, with indices 0 to
// 25 inclusive. Then we want to iterate while i < lastSuccessfullIndex. After that,
// i will increment to 25 and we will exit the for loop.
for (int i = -1; i < lastSuccessfullIndex; i++) {
// Note: this does NOT touch RetryableCloseableIterator::next and so does not change
// the index
currentIter.next();
}

LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException {
tryEnsureTableExists(hadoopConf);
}

@Override
public CloseableIterator<String> read(Path path, Configuration hadoopConf) throws IOException {
return new RetryableCloseableIterator(() -> {
try {
return super.read(path, hadoopConf);
} catch (IOException e) {
// There was an issue resolving the file system. Nothing to do with a
// RemoteFileChangedException.
throw new RuntimeException(e);
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

@Override
protected long getExpirationDelaySeconds() {
return expirationDelaySeconds;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package io.delta.storage

import java.util.function.Supplier

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.s3a.RemoteFileChangedException
import org.scalatest.funsuite.AnyFunSuite

// scalastyle:off println
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
class RetryableCloseableIteratorSuite extends AnyFunSuite {

private def getIter(
range: Range,
throwAtIndices: Seq[Int] = Seq.empty): CloseableIterator[String] =
new CloseableIterator[String] {
var index = 0
val impl = range.iterator.asJava

override def close(): Unit = { }

override def hasNext: Boolean = {
if (throwAtIndices.contains(index)) {
println(s"`hasNext` throwing for index $index")
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
throw new RemoteFileChangedException(s"path -> index $index", "operation", "msg");
}

impl.hasNext
}

override def next(): String = {
if (throwAtIndices.contains(index)) {
println(s"`next` throwing for index $index")
scottsand-db marked this conversation as resolved.
Show resolved Hide resolved
throw new RemoteFileChangedException(s"path -> index $index", "operation", "msg");
}

index = index + 1

impl.next().toString
}
}

/**
* Fails at indices 25, 50, 75, 110.
*
* Provide a suitable input range to get the # of failures you want. e.g. range 0 to 100 will fail
* 3 times.
*/
def getFailingIterSupplier(range: Range): Supplier[CloseableIterator[String]] =
new Supplier[CloseableIterator[String]] {
var numGetCalls = 0

override def get(): CloseableIterator[String] = numGetCalls match {
case 0 =>
// The 1st time this has been called! This will fail at index 25
numGetCalls = numGetCalls + 1
getIter(range, Seq(25, 50, 75, 110))
case 1 =>
// The 2nd time this has been called! The underlying RetryableCloseableIterator should
// replay until index 25, and then continue.
numGetCalls = numGetCalls + 1
getIter(range, Seq(50, 75, 110))
case 2 =>
// The 3rd time this has been called! The underlying RetryableCloseableIterator should
// replay until index 50, and then continue.
numGetCalls = numGetCalls + 1
getIter(range, Seq(75, 110))
case 3 =>
// The 4th time this has been called! The underlying RetryableCloseableIterator should
// replay until index 75, and then continue.
numGetCalls = numGetCalls + 1
getIter(range, Seq(110))
case _ => throw new RuntimeException("Should never all a 5th time - there's only 4 (1st " +
"call + 3 retries) allowed!")
}
}

test("simple case - internally keeps track of the correct index") {
val testIter = new RetryableCloseableIterator(() => getIter(0 to 100))
assert(testIter.getLastSuccessfullIndex == -1)

for (i <- 0 to 100) {
val elem = testIter.next()
assert(elem.toInt == i)
assert(testIter.getLastSuccessfullIndex == i)
}

assert(!testIter.hasNext) // this would be index 101
}

test("complex case - replays underlying iter back to correct index after error") {
// Here, we just do the simplest verification
val testIter1 = new RetryableCloseableIterator(getFailingIterSupplier(0 to 100))
val output1 = testIter1.asScala.toList

// this asserts the size, order, and elements of the testIter1
assert(output1.map(_.toInt) == (0 to 100).toList)

// Here, we do more complex verification
val testIter2 = new RetryableCloseableIterator(getFailingIterSupplier(0 to 100))
for (_ <- 0 to 24) { testIter2.next() }
assert(testIter2.getLastSuccessfullIndex == 24)
assert(testIter2.getNumRetries == 0)

assert(testIter2.next().toInt == 25) // this will fail once, and then re-scan
assert(testIter2.getLastSuccessfullIndex == 25)
assert(testIter2.getNumRetries == 1)

for (_ <- 26 to 49) { testIter2.next() }
assert(testIter2.getLastSuccessfullIndex == 49)
assert(testIter2.getNumRetries == 1)

assert(testIter2.next().toInt == 50) // this will fail once, and then re-scan
assert(testIter2.getLastSuccessfullIndex == 50)
assert(testIter2.getNumRetries == 2)

for (_ <- 51 to 74) { testIter2.next() }
assert(testIter2.getLastSuccessfullIndex == 74)
assert(testIter2.getNumRetries == 2)

assert(testIter2.next().toInt == 75) // this will fail once, and then re-scan
assert(testIter2.getLastSuccessfullIndex == 75)
assert(testIter2.getNumRetries == 3)

for (_ <- 76 to 100) { testIter2.next() }
assert(testIter2.getLastSuccessfullIndex == 100)
assert(!testIter2.hasNext)
}

test("throws after MAX_RETRIES exceptions") {
// Here, we will try to iterate from [0, 200] but getFailingIterSupplier fails at indices
// 25, 50, 75, 110.
val testIter = new RetryableCloseableIterator(getFailingIterSupplier(0 to 200))

for (i <- 0 to 109) {
assert(testIter.next().toInt == i)
}
assert(testIter.getNumRetries == 3)
val ex = intercept[RuntimeException] {
testIter.next()
}
assert(ex.getCause.isInstanceOf[RemoteFileChangedException])
}

}