diff --git a/build.sbt b/build.sbt index 757b2c20c19..64377ed99cb 100644 --- a/build.sbt +++ b/build.sbt @@ -174,6 +174,9 @@ lazy val storage = (project in file("storage")) // User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for // versions with known vulnerabilities. "org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided", + + // Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API + // is not compatible with 3.3.2. "org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided", // Test Deps @@ -194,7 +197,10 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb")) // Test / publishArtifact := true, libraryDependencies ++= Seq( - "com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided" + "com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided", + + // Test Deps + "org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "test", // RemoteFileChangedException ) ) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java new file mode 100644 index 00000000000..300942e9978 --- /dev/null +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java @@ -0,0 +1,226 @@ +package io.delta.storage; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class presents an iterator view over the iterator supplier in the constructor. + * + * This class assumes that the iterator supplied by the supplier can throw, and that subsequent + * supplier.get() calls will return an iterator over the same data. + * + * If there are any RemoteFileChangedException during `next` and `hasNext` calls, will retry + * at most `MAX_RETRIES` times. If there are similar exceptions during the retry, those are handled + * and count towards the MAX_RETRIES. + * + * Internally, keeps track of the last-successfully-returned index. Upon retry, will iterate back + * to that same position. + */ +public class RetryableCloseableIterator implements CloseableIterator { + private static final Logger LOG = LoggerFactory.getLogger(RetryableCloseableIterator.class); + + public static final int DEFAULT_MAX_RETRIES = 3; + + private final Supplier> iterSupplier; + + private final int maxRetries; + + /** + * Index of the last element successfully returned without an exception. A value of -1 means + * that no element has ever been returned yet. + */ + private int lastSuccessfullIndex; + + private int numRetries = 0; + + private CloseableIterator currentIter; + + public RetryableCloseableIterator( + Supplier> iterSupplier, + int maxRetries) { + if (maxRetries < 0) throw new IllegalArgumentException("maxRetries can't be negative"); + + this.iterSupplier = Objects.requireNonNull(iterSupplier); + this.maxRetries = maxRetries; + this.lastSuccessfullIndex = -1; + this.currentIter = this.iterSupplier.get(); + } + + public RetryableCloseableIterator(Supplier> iterSupplier) { + this(iterSupplier, DEFAULT_MAX_RETRIES); + } + + ///////////////// + // Public APIs // + ///////////////// + + @Override + public void close() throws IOException { + currentIter.close(); + } + + /** + * `hasNext` must be idempotent. It does not change the `lastSuccessfulIndex` variable. + */ + @Override + public boolean hasNext() { + try { + return hasNextInternal(); + } catch (IOException ex) { + if (isRemoteFileChangedException(ex)) { + try { + replayIterToLastSuccessfulIndex(ex); + } catch (IOException ex2) { + throw new UncheckedIOException(ex2); + } + return hasNext(); + } else { + throw new UncheckedIOException(ex); + } + + } + } + + @Override + public String next() { + if (!hasNext()) throw new NoSuchElementException(); + + try { + final String ret = nextInternal(); + lastSuccessfullIndex++; + return ret; + } catch (IOException ex) { + if (isRemoteFileChangedException(ex)) { + try { + replayIterToLastSuccessfulIndex(ex); + } catch (IOException ex2) { + throw new UncheckedIOException(ex2); + } + + if (!hasNext()) { + throw new IllegalStateException( + String.format( + "A retried iterator doesn't have enough data " + + "(hasNext=false, lastSuccessfullIndex=%s)", + lastSuccessfullIndex + ) + ); + } + + return next(); + } else { + throw new UncheckedIOException(ex); + } + } + } + + ////////////////////////////////////// + // Package-private APIs for testing // + ////////////////////////////////////// + + /** Visible for testing. */ + int getLastSuccessfullIndex() { + return lastSuccessfullIndex; + } + + /** Visible for testing. */ + int getNumRetries() { + return numRetries; + } + + //////////////////// + // Helper Methods // + //////////////////// + + /** Throw a checked exception so we can catch this in the caller. */ + private boolean hasNextInternal() throws IOException { + return currentIter.hasNext(); + } + + /** Throw a checked exception so we can catch this in the caller. */ + private String nextInternal() throws IOException { + return currentIter.next(); + } + + /** + * Called after a RemoteFileChangedException was thrown. Tries to replay the underlying + * iter implementation (supplied by the `implSupplier`) to the last successful index, so that + * the previous error open (hasNext, or next) can be retried. If a RemoteFileChangedException + * is thrown while replaying the iter, we just increment the `numRetries` counter and try again. + */ + private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOException { + LOG.warn( + "Caught a RemoteFileChangedException. NumRetries is {} / {}.\n{}", + numRetries + 1, maxRetries, topLevelEx + ); + currentIter.close(); + + while (numRetries < maxRetries) { + numRetries++; + LOG.info( + "Replaying until (inclusive) index {}. NumRetries is {} / {}.", + lastSuccessfullIndex, numRetries + 1, maxRetries + ); + currentIter = iterSupplier.get(); + + // Last successful index replayed. Starts at -1, and not 0, because 0 means we've + // already replayed the 1st element! + int replayIndex = -1; + try { + while (replayIndex < lastSuccessfullIndex) { + if (currentIter.hasNext()) { + currentIter.next(); // Disregard data that has been read + replayIndex++; + } else { + throw new IllegalStateException( + String.format( + "A retried iterator doesn't have enough data " + + "(replayIndex=%s, lastSuccessfullIndex=%s)", + replayIndex, + lastSuccessfullIndex + ) + ); + } + } + + // Just like how in RetryableCloseableIterator::next we have to handle + // RemoteFileChangedException, we must also hadnle that here during the replay. + // `currentIter.next()` isn't declared to throw a RemoteFileChangedException, so we + // trick the compiler into thinking this block can throw RemoteFileChangedException + // via `fakeIOException`. That way, we can catch it, and retry replaying the iter. + fakeIOException(); + + LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex); + + return; + } catch (IOException ex) { + if (isRemoteFileChangedException(ex)) { + // Ignore and try replaying the iter again at the top of the while loop + LOG.warn("Caught a RemoteFileChangedException while replaying the iterator"); + } else { + throw ex; + } + } + } + + throw topLevelEx; + } + + private boolean isRemoteFileChangedException(IOException ex) { + // `endsWith` should still work if the class is shaded. + final String exClassName = ex.getClass().getName(); + return exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException"); + } + + private void fakeIOException() throws IOException { + if (false) { + throw new IOException(); + } + } +} diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java index ae1232e0290..2351ec2e6c0 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -80,6 +81,7 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore { */ public static final String SPARK_CONF_PREFIX = "spark.io.delta.storage.S3DynamoDBLogStore"; public static final String BASE_CONF_PREFIX = "io.delta.storage.S3DynamoDBLogStore"; + public static final String READ_RETRIES = "read.retries"; public static final String DDB_CLIENT_TABLE = "ddb.tableName"; public static final String DDB_CLIENT_REGION = "ddb.region"; public static final String DDB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider"; @@ -137,6 +139,35 @@ public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException { tryEnsureTableExists(hadoopConf); } + @Override + public CloseableIterator read(Path path, Configuration hadoopConf) throws IOException { + // With many concurrent readers/writers, there's a chance that concurrent 'recovery' + // operations occur on the same file, i.e. the same temp file T(N) is copied into the target + // N.json file more than once. Though data loss will *NOT* occur, readers of N.json may + // receive a RemoteFileChangedException from S3 as the ETag of N.json was changed. This is + // safe to retry, so we do so here. + final int maxRetries = Integer.parseInt( + getParam( + hadoopConf, + READ_RETRIES, + Integer.toString(RetryableCloseableIterator.DEFAULT_MAX_RETRIES) + ) + ); + + return new RetryableCloseableIterator( + () -> { + try { + return super.read(path, hadoopConf); + } catch (IOException e) { + // There was an issue resolving the file system. Nothing to do with a + // RemoteFileChangedException. + throw new UncheckedIOException(e); + } + }, + maxRetries + ); + } + @Override protected long getExpirationDelaySeconds() { return expirationDelaySeconds; diff --git a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala new file mode 100644 index 00000000000..f9d3f4a1dff --- /dev/null +++ b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala @@ -0,0 +1,224 @@ +package io.delta.storage + +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.s3a.RemoteFileChangedException +import org.scalatest.funsuite.AnyFunSuite + +class RetryableCloseableIteratorSuite extends AnyFunSuite { + + private def getIter( + range: Range, + throwAtIndex: Option[Int] = None): CloseableIterator[String] = + new CloseableIterator[String] { + var index = 0 + val impl = range.iterator.asJava + + override def close(): Unit = { } + + override def hasNext: Boolean = { + impl.hasNext + } + + override def next(): String = { + if (throwAtIndex.contains(index)) { + throw new RemoteFileChangedException(s"path -> index $index", "operation", "msg"); + } + + index = index + 1 + + impl.next().toString + } + } + + /** + * Fails at indices 25, 50, 75, 110. + * + * Provide a suitable input range to get the # of failures you want. e.g. range 0 to 100 will fail + * 3 times. + */ + def getFailingIterSupplier( + range: Range, + failIndices: Seq[Int] = Seq.empty): Supplier[CloseableIterator[String]] = + new Supplier[CloseableIterator[String]] { + var numGetCalls = 0 + + override def get(): CloseableIterator[String] = { + if (numGetCalls < failIndices.length) { + val result = getIter(range, Some(failIndices(numGetCalls))) + numGetCalls = numGetCalls + 1 + result + } else { + getIter(range) + } + } + } + + test("simple case - internally keeps track of the correct index") { + val testIter = new RetryableCloseableIterator(() => getIter(0 to 100)) + assert(testIter.getLastSuccessfullIndex == -1) + + for (i <- 0 to 100) { + val elem = testIter.next() + assert(elem.toInt == i) + assert(testIter.getLastSuccessfullIndex == i) + } + + assert(!testIter.hasNext) // this would be index 101 + } + + test("complex case - replays underlying iter back to correct index after error") { + // Here, we just do the simplest verification + val testIter1 = new RetryableCloseableIterator( + getFailingIterSupplier(0 to 100, Seq(25, 50, 75))) + + // this asserts the size, order, and elements of the testIter1 + assert(testIter1.asScala.toList.map(_.toInt) == (0 to 100).toList) + + // Here, we do more complex verification + val testIter2 = new RetryableCloseableIterator( + getFailingIterSupplier(0 to 100, Seq(25, 50, 75))) + + for (_ <- 0 to 24) { testIter2.next() } + assert(testIter2.getLastSuccessfullIndex == 24) + assert(testIter2.getNumRetries == 0) + + assert(testIter2.next().toInt == 25) // this will fail once, and then re-scan + assert(testIter2.getLastSuccessfullIndex == 25) + assert(testIter2.getNumRetries == 1) + + for (_ <- 26 to 49) { testIter2.next() } + assert(testIter2.getLastSuccessfullIndex == 49) + assert(testIter2.getNumRetries == 1) + + assert(testIter2.next().toInt == 50) // this will fail once, and then re-scan + assert(testIter2.getLastSuccessfullIndex == 50) + assert(testIter2.getNumRetries == 2) + + for (_ <- 51 to 74) { testIter2.next() } + assert(testIter2.getLastSuccessfullIndex == 74) + assert(testIter2.getNumRetries == 2) + + assert(testIter2.next().toInt == 75) // this will fail once, and then re-scan + assert(testIter2.getLastSuccessfullIndex == 75) + assert(testIter2.getNumRetries == 3) + + for (_ <- 76 to 100) { testIter2.next() } + assert(testIter2.getLastSuccessfullIndex == 100) + assert(!testIter2.hasNext) + } + + test("handles exceptions while retrying") { + // Iterates normally until index 50 (return [0, 49] successfully). Then fails. + // Tries to replay, but fails at 30 + // Tries to replay again, but fails at 20 + // Successfully replays to 49, starts returning results from index 50 (inclusive) again + val testIter1 = + new RetryableCloseableIterator(getFailingIterSupplier(0 to 100, Seq(50, 30, 20))) + + assert(testIter1.asScala.toList.map(_.toInt) == (0 to 100).toList) + + // Iterates normally until index 50 (return [0, 49] successfully). Then fails. + // Successfully replayed to 49, starts returning results from index 50 (inclusive) + // Fails at index 50 (returned [50, 69]). Tries to replay, but fails at 5 + // Successfully replays until 69, then normally returns results from 70 + val testIter2 = + new RetryableCloseableIterator(getFailingIterSupplier(0 to 100, Seq(50, 70, 5))) + assert(testIter2.asScala.toList.map(_.toInt) == (0 to 100).toList) + } + + test("throws after maxRetries exceptions") { + val testIter = + new RetryableCloseableIterator(getFailingIterSupplier(0 to 100, Seq(20, 49, 60, 80))) + + for (i <- 0 to 79) { + assert(testIter.next().toInt == i) + } + assert(testIter.getNumRetries == 3) + val ex = intercept[RuntimeException] { + testIter.next() + } + assert(ex.getCause.isInstanceOf[RemoteFileChangedException]) + } + + test("can specify maxRetries") { + val testIter1 = + new RetryableCloseableIterator( + getFailingIterSupplier(0 to 100, Seq(5, 10, 15, 20, 25, 30, 35, 40, 45, 50)), + 10 // maxRetries + ) + + assert(testIter1.asScala.toList.map(_.toInt) == (0 to 100).toList) + + val testIter2 = + new RetryableCloseableIterator( + getFailingIterSupplier(0 to 100, Seq(5, 10, 15, 20, 25, 30)), + 5 // maxRetries + ) + + for (i <- 0 to 29) { + assert(testIter2.next().toInt == i) + } + assert(testIter2.getNumRetries == 5) + val ex = intercept[RuntimeException] { + testIter2.next() + } + assert(ex.getCause.isInstanceOf[RemoteFileChangedException]) + } + + test("retried iterator doesn't have enough data (underlying data changed!)") { + val testIter = new RetryableCloseableIterator( + new Supplier[CloseableIterator[String]] { + var getCount = 0 + + override def get(): CloseableIterator[String] = getCount match { + case 0 => + getCount = getCount + 1 + getIter(0 to 100, Some(50)) // try to iterate 0->100, fail at 50 + + case 1 => + getCount = getCount + 1 + getIter(0 to 30) // try to replay 0 to 50, but no elements after 30! + } + } + ) + + for (_ <- 0 to 49) { testIter.next() } + val e = intercept[IllegalStateException] { + testIter.next() + } + assert(e.getMessage.contains("A retried iterator doesn't have enough data")) + } + + test("after replaying the iter, hasNext is false") { + val testIter = new RetryableCloseableIterator( + new Supplier[CloseableIterator[String]] { + var getCount = 0 + + override def get(): CloseableIterator[String] = getCount match { + case 0 => + getCount = getCount + 1 + getIter(0 to 100, Some(50)) // try to iterate 0->100, fail at 50 + + case 1 => + getCount = getCount + 1 + // when we failed at index 50 above, the lastSuccessfulIndex was 49. here, we can + // replay back to index 49, but the `hasNext` call will be false! + getIter(0 to 49) + } + } + ) + + for (_ <- 0 to 49) { testIter.next() } + assert(testIter.getLastSuccessfullIndex == 49) + + val e = intercept[IllegalStateException] { + testIter.next() + } + assert(e.getMessage.contains("A retried iterator doesn't have enough data (hasNext=false, " + + "lastSuccessfullIndex=49)")) + } + +}