From e51c3d9706a530a22adec79735dbd2eb45f7fb82 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 21 Apr 2023 15:48:05 -0700 Subject: [PATCH 01/12] Create and use RetryableCloseableIterator class --- build.sbt | 8 +- .../storage/RetryableCloseableIterator.java | 147 ++++++++++++++++++ .../io/delta/storage/S3DynamoDBLogStore.java | 13 ++ 3 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java diff --git a/build.sbt b/build.sbt index 757b2c20c19..560928dac35 100644 --- a/build.sbt +++ b/build.sbt @@ -174,6 +174,9 @@ lazy val storage = (project in file("storage")) // User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for // versions with known vulnerabilities. "org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided", + + // Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API + // is not compatible with 3.3.2. "org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided", // Test Deps @@ -194,7 +197,10 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb")) // Test / publishArtifact := true, libraryDependencies ++= Seq( - "com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided" + "com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided", + "org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided", // RemoteFileChangedException + + "org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "test", // RemoteFileChangedException ) ) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java new file mode 100644 index 00000000000..41d9df1536f --- /dev/null +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java @@ -0,0 +1,147 @@ +package io.delta.storage; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Supplier; + +import org.apache.hadoop.fs.s3a.RemoteFileChangedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Iterates over the given iterator supplied in the constructor. + * + * If there are any RemoteFileChangedException during `next` and `hasNext` calls, will retry + * at most `MAX_RETRIES` times. + * + * Internally, keeps track of the last-successfully-returned index. Upon retry, will iterate back + * to that same position. If another RemoteFileChangedException occurs during that retry, will fail. + * We can solve that exception inception later (iterators within iterators). + */ +public class RetryableCloseableIterator implements CloseableIterator { + private static final Logger LOG = LoggerFactory.getLogger(RetryableCloseableIterator.class); + + /** Visible for testing. */ + public static final int MAX_RETRIES = 3; + + private final Supplier> implSupplier; + + /** + * Index of the last element successfully returned without an exception. A value of -1 means + * that no element has ever been returned yet. + */ + private int lastSuccessfullIndex; + + private int numRetries = 0; + + private CloseableIterator currentImpl; + + public RetryableCloseableIterator(Supplier> implSupplier) { + this.implSupplier = Objects.requireNonNull(implSupplier); + this.lastSuccessfullIndex = -1; + this.currentImpl = this.implSupplier.get(); + } + + /** Visible for testing. */ + public int getLastSuccessfullIndex() { + return lastSuccessfullIndex; + } + + /** Visible for testing. */ + public int getNumRetries() { + return numRetries; + } + + @Override + public void close() throws IOException { + if (currentImpl != null) { + currentImpl.close(); + } + } + + /** + * `hasNext` must be idempotent. It does not change the `lastSuccessfulIndex` variable. + */ + @Override + public boolean hasNext() { + try { + return hasNextInternal(); + } catch (RemoteFileChangedException ex) { + LOG.warn( + "Caught a RemoteFileChangedException in `hastNext`. NumRetries is {} / {}.\n{}", + numRetries + 1, MAX_RETRIES, ex.toString() + ); + if (numRetries < MAX_RETRIES) { + numRetries++; + replayIterToLastSuccessfulIndex(); + return hasNext(); + } else { + throw new RuntimeException(ex); + } + } + } + + /** Throw a checked exception so we can catch this in the caller. */ + private boolean hasNextInternal() throws RemoteFileChangedException { + return currentImpl.hasNext(); + } + + @Override + public String next() { + if (!hasNext()) throw new NoSuchElementException(); + + try { + final String ret = nextInternal(); + lastSuccessfullIndex++; + return ret; + } catch (RemoteFileChangedException ex) { + LOG.warn( + "Caught a RemoteFileChangedException in `next`. NumRetries is {} / {}.\n{}", + numRetries + 1, MAX_RETRIES, ex.toString() + ); + if (numRetries < MAX_RETRIES) { + numRetries++; + replayIterToLastSuccessfulIndex(); + return next(); + } else { + throw new RuntimeException(ex); + } + } + } + + /** Throw a checked exception so we can catch this in the caller. */ + private String nextInternal() throws RemoteFileChangedException { + return currentImpl.next(); + } + + /** + * Called after a RemoteFileChangedException was thrown. Tries to replay the underlying + * iter implementation (supplied by the `implSupplier`) to the last successful index, so that + * the previous error open (hasNext, or next) can be retried. + * + * NOTE: This iter replay **itself** can throw a RemoteFileChangedException. Let's not deal + * with that for now - that would require handling exception inception. + */ + private void replayIterToLastSuccessfulIndex() { + LOG.info("Replaying until (inclusive) index {}", lastSuccessfullIndex); + currentImpl = implSupplier.get(); // this last impl threw an exception and is useless! + + // Note: we iterate until `i` == `lastSuccessfullIndex`, so that index `i` is the last + // successfully returned index. + // + // e.g. `i` starts at -1. after the 1st currentImpl.next() call, i will be incremented to 0. + // This makes sense as per the `lastSuccessfullIndex` semantics, since 0 is the last + // index to be successfully returned. + // e.g. suppose `lastSuccessfulIndex` is 25. Then we have read 26 items, with indices 0 to + // 25 inclusive. Then we want to iterate while i < lastSuccessfullIndex. After that, + // i will increment to 25 and we will exit the for loop. + for (int i = -1; i < lastSuccessfullIndex; i++) { + // Note: this does NOT touch RetryableCloseableIterator::next and so does not change + // the index + currentImpl.next(); + } + + LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex); + } +} diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java index ae1232e0290..f6e66ad2dac 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java @@ -137,6 +137,19 @@ public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException { tryEnsureTableExists(hadoopConf); } + @Override + public CloseableIterator read(Path path, Configuration hadoopConf) throws IOException { + return new RetryableCloseableIterator(() -> { + try { + return super.read(path, hadoopConf); + } catch (IOException e) { + // There was an issue resolving the file system. Nothing to do with a + // RemoteFileChangedException. + throw new RuntimeException(e); + } + }); + } + @Override protected long getExpirationDelaySeconds() { return expirationDelaySeconds; From 89ad977f7d7ae6011583a2b892e01f414f8b3987 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 21 Apr 2023 15:48:27 -0700 Subject: [PATCH 02/12] Create RetryableCloseableIteratorSuite.scala --- .../RetryableCloseableIteratorSuite.scala | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala diff --git a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala new file mode 100644 index 00000000000..f2c36026324 --- /dev/null +++ b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala @@ -0,0 +1,145 @@ +package io.delta.storage + +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.s3a.RemoteFileChangedException +import org.scalatest.funsuite.AnyFunSuite + +// scalastyle:off println +class RetryableCloseableIteratorSuite extends AnyFunSuite { + + private def getIter( + range: Range, + throwAtIndices: Seq[Int] = Seq.empty): CloseableIterator[String] = + new CloseableIterator[String] { + var index = 0 + val impl = range.iterator.asJava + + override def close(): Unit = { } + + override def hasNext: Boolean = { + if (throwAtIndices.contains(index)) { + println(s"`hasNext` throwing for index $index") + throw new RemoteFileChangedException(s"path -> index $index", "operation", "msg"); + } + + impl.hasNext + } + + override def next(): String = { + if (throwAtIndices.contains(index)) { + println(s"`next` throwing for index $index") + throw new RemoteFileChangedException(s"path -> index $index", "operation", "msg"); + } + + index = index + 1 + + impl.next().toString + } + } + + /** + * Fails at indices 25, 50, 75, 110. + * + * Provide a suitable input range to get the # of failures you want. e.g. range 0 to 100 will fail + * 3 times. + */ + def getFailingIterSupplier(range: Range): Supplier[CloseableIterator[String]] = + new Supplier[CloseableIterator[String]] { + var numGetCalls = 0 + + override def get(): CloseableIterator[String] = numGetCalls match { + case 0 => + // The 1st time this has been called! This will fail at index 25 + numGetCalls = numGetCalls + 1 + getIter(range, Seq(25, 50, 75, 110)) + case 1 => + // The 2nd time this has been called! The underlying RetryableCloseableIterator should + // replay until index 25, and then continue. + numGetCalls = numGetCalls + 1 + getIter(range, Seq(50, 75, 110)) + case 2 => + // The 3rd time this has been called! The underlying RetryableCloseableIterator should + // replay until index 50, and then continue. + numGetCalls = numGetCalls + 1 + getIter(range, Seq(75, 110)) + case 3 => + // The 4th time this has been called! The underlying RetryableCloseableIterator should + // replay until index 75, and then continue. + numGetCalls = numGetCalls + 1 + getIter(range, Seq(110)) + case _ => throw new RuntimeException("Should never all a 5th time - there's only 4 (1st " + + "call + 3 retries) allowed!") + } + } + + test("simple case - internally keeps track of the correct index") { + val testIter = new RetryableCloseableIterator(() => getIter(0 to 100)) + assert(testIter.getLastSuccessfullIndex == -1) + + for (i <- 0 to 100) { + val elem = testIter.next() + assert(elem.toInt == i) + assert(testIter.getLastSuccessfullIndex == i) + } + + assert(!testIter.hasNext) // this would be index 101 + } + + test("complex case - replays underlying iter back to correct index after error") { + // Here, we just do the simplest verification + val testIter1 = new RetryableCloseableIterator(getFailingIterSupplier(0 to 100)) + val output1 = testIter1.asScala.toList + + // this asserts the size, order, and elements of the testIter1 + assert(output1.map(_.toInt) == (0 to 100).toList) + + // Here, we do more complex verification + val testIter2 = new RetryableCloseableIterator(getFailingIterSupplier(0 to 100)) + for (_ <- 0 to 24) { testIter2.next() } + assert(testIter2.getLastSuccessfullIndex == 24) + assert(testIter2.getNumRetries == 0) + + assert(testIter2.next().toInt == 25) // this will fail once, and then re-scan + assert(testIter2.getLastSuccessfullIndex == 25) + assert(testIter2.getNumRetries == 1) + + for (_ <- 26 to 49) { testIter2.next() } + assert(testIter2.getLastSuccessfullIndex == 49) + assert(testIter2.getNumRetries == 1) + + assert(testIter2.next().toInt == 50) // this will fail once, and then re-scan + assert(testIter2.getLastSuccessfullIndex == 50) + assert(testIter2.getNumRetries == 2) + + for (_ <- 51 to 74) { testIter2.next() } + assert(testIter2.getLastSuccessfullIndex == 74) + assert(testIter2.getNumRetries == 2) + + assert(testIter2.next().toInt == 75) // this will fail once, and then re-scan + assert(testIter2.getLastSuccessfullIndex == 75) + assert(testIter2.getNumRetries == 3) + + for (_ <- 76 to 100) { testIter2.next() } + assert(testIter2.getLastSuccessfullIndex == 100) + assert(!testIter2.hasNext) + } + + test("throws after MAX_RETRIES exceptions") { + // Here, we will try to iterate from [0, 200] but getFailingIterSupplier fails at indices + // 25, 50, 75, 110. + val testIter = new RetryableCloseableIterator(getFailingIterSupplier(0 to 200)) + + for (i <- 0 to 109) { + assert(testIter.next().toInt == i) + } + assert(testIter.getNumRetries == 3) + val ex = intercept[RuntimeException] { + testIter.next() + } + assert(ex.getCause.isInstanceOf[RemoteFileChangedException]) + } + +} From 533bdb5638ac486bbe45045bb9871a4a4332a0ad Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Mon, 24 Apr 2023 09:57:30 -0700 Subject: [PATCH 03/12] close the current iter after failure --- .../io/delta/storage/RetryableCloseableIterator.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java index 41d9df1536f..d0de3c805ca 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java @@ -75,6 +75,7 @@ public boolean hasNext() { if (numRetries < MAX_RETRIES) { numRetries++; replayIterToLastSuccessfulIndex(); + // Now, the currentImpl has been recreated and iterated to the same index return hasNext(); } else { throw new RuntimeException(ex); @@ -103,6 +104,7 @@ public String next() { if (numRetries < MAX_RETRIES) { numRetries++; replayIterToLastSuccessfulIndex(); + // Now, the currentImpl has been recreated and iterated to the same index return next(); } else { throw new RuntimeException(ex); @@ -124,6 +126,13 @@ private String nextInternal() throws RemoteFileChangedException { * with that for now - that would require handling exception inception. */ private void replayIterToLastSuccessfulIndex() { + // We still need to close the currentImpl, even though it threw + try { + currentImpl.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + LOG.info("Replaying until (inclusive) index {}", lastSuccessfullIndex); currentImpl = implSupplier.get(); // this last impl threw an exception and is useless! From 7ebc93f9c72c0bc91f073a7c61617a96180e2a69 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Mon, 24 Apr 2023 10:03:42 -0700 Subject: [PATCH 04/12] Update RetryableCloseableIterator.java - update variable names - update class docs --- .../storage/RetryableCloseableIterator.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java index d0de3c805ca..25471172559 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java @@ -10,7 +10,10 @@ import org.slf4j.LoggerFactory; /** - * Iterates over the given iterator supplied in the constructor. + * This class presents an iterator view over the iterator supplier in the constructor. + * + * This class assumes that the iterator supplied by the supplier can throw, and that subsequent + * supplier.get() calls will return an iterator over the same data. * * If there are any RemoteFileChangedException during `next` and `hasNext` calls, will retry * at most `MAX_RETRIES` times. @@ -25,7 +28,7 @@ public class RetryableCloseableIterator implements CloseableIterator { /** Visible for testing. */ public static final int MAX_RETRIES = 3; - private final Supplier> implSupplier; + private final Supplier> iterSupplier; /** * Index of the last element successfully returned without an exception. A value of -1 means @@ -35,12 +38,12 @@ public class RetryableCloseableIterator implements CloseableIterator { private int numRetries = 0; - private CloseableIterator currentImpl; + private CloseableIterator currentIter; - public RetryableCloseableIterator(Supplier> implSupplier) { - this.implSupplier = Objects.requireNonNull(implSupplier); + public RetryableCloseableIterator(Supplier> iterSupplier) { + this.iterSupplier = Objects.requireNonNull(iterSupplier); this.lastSuccessfullIndex = -1; - this.currentImpl = this.implSupplier.get(); + this.currentIter = this.iterSupplier.get(); } /** Visible for testing. */ @@ -55,8 +58,8 @@ public int getNumRetries() { @Override public void close() throws IOException { - if (currentImpl != null) { - currentImpl.close(); + if (currentIter != null) { + currentIter.close(); } } @@ -85,7 +88,7 @@ public boolean hasNext() { /** Throw a checked exception so we can catch this in the caller. */ private boolean hasNextInternal() throws RemoteFileChangedException { - return currentImpl.hasNext(); + return currentIter.hasNext(); } @Override @@ -114,7 +117,7 @@ public String next() { /** Throw a checked exception so we can catch this in the caller. */ private String nextInternal() throws RemoteFileChangedException { - return currentImpl.next(); + return currentIter.next(); } /** @@ -128,13 +131,13 @@ private String nextInternal() throws RemoteFileChangedException { private void replayIterToLastSuccessfulIndex() { // We still need to close the currentImpl, even though it threw try { - currentImpl.close(); + currentIter.close(); } catch (IOException e) { throw new RuntimeException(e); } LOG.info("Replaying until (inclusive) index {}", lastSuccessfullIndex); - currentImpl = implSupplier.get(); // this last impl threw an exception and is useless! + currentIter = iterSupplier.get(); // this last impl threw an exception and is useless! // Note: we iterate until `i` == `lastSuccessfullIndex`, so that index `i` is the last // successfully returned index. @@ -148,7 +151,7 @@ private void replayIterToLastSuccessfulIndex() { for (int i = -1; i < lastSuccessfullIndex; i++) { // Note: this does NOT touch RetryableCloseableIterator::next and so does not change // the index - currentImpl.next(); + currentIter.next(); } LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex); From e9450dcdf424e6afb199e20cf7ec3b9ba5cb46c5 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Mon, 24 Apr 2023 17:09:42 -0700 Subject: [PATCH 05/12] Handle exceptions even while retrying; TODO unit test --- build.sbt | 3 - .../storage/RetryableCloseableIterator.java | 129 +++++++++++------- .../RetryableCloseableIteratorSuite.scala | 4 + 3 files changed, 80 insertions(+), 56 deletions(-) diff --git a/build.sbt b/build.sbt index 560928dac35..1ee6ebe6e64 100644 --- a/build.sbt +++ b/build.sbt @@ -178,9 +178,6 @@ lazy val storage = (project in file("storage")) // Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API // is not compatible with 3.3.2. "org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided", - - // Test Deps - "org.scalatest" %% "scalatest" % "3.2.11" % "test", ) ) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java index 25471172559..2a2f64f2cb0 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java @@ -1,6 +1,7 @@ package io.delta.storage; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.NoSuchElementException; import java.util.Objects; import java.util.function.Supplier; @@ -16,11 +17,11 @@ * supplier.get() calls will return an iterator over the same data. * * If there are any RemoteFileChangedException during `next` and `hasNext` calls, will retry - * at most `MAX_RETRIES` times. + * at most `MAX_RETRIES` times. If there are similar exceptions during the retry, those are handled + * and count towards the MAX_RETRIES. * * Internally, keeps track of the last-successfully-returned index. Upon retry, will iterate back - * to that same position. If another RemoteFileChangedException occurs during that retry, will fail. - * We can solve that exception inception later (iterators within iterators). + * to that same position. */ public class RetryableCloseableIterator implements CloseableIterator { private static final Logger LOG = LoggerFactory.getLogger(RetryableCloseableIterator.class); @@ -70,24 +71,25 @@ public void close() throws IOException { public boolean hasNext() { try { return hasNextInternal(); - } catch (RemoteFileChangedException ex) { - LOG.warn( - "Caught a RemoteFileChangedException in `hastNext`. NumRetries is {} / {}.\n{}", - numRetries + 1, MAX_RETRIES, ex.toString() - ); - if (numRetries < MAX_RETRIES) { - numRetries++; - replayIterToLastSuccessfulIndex(); - // Now, the currentImpl has been recreated and iterated to the same index + } catch (IOException ex) { + // `endsWith` should still work if the class is shaded. + final String exClassName = ex.getClass().getName(); + if (exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) { + try { + replayIterToLastSuccessfulIndex(ex); + } catch (IOException ex2) { + throw new RuntimeException(ex2); + } return hasNext(); } else { - throw new RuntimeException(ex); + throw new UncheckedIOException(ex); } + } } /** Throw a checked exception so we can catch this in the caller. */ - private boolean hasNextInternal() throws RemoteFileChangedException { + private boolean hasNextInternal() throws IOException { return currentIter.hasNext(); } @@ -99,61 +101,82 @@ public String next() { final String ret = nextInternal(); lastSuccessfullIndex++; return ret; - } catch (RemoteFileChangedException ex) { - LOG.warn( - "Caught a RemoteFileChangedException in `next`. NumRetries is {} / {}.\n{}", - numRetries + 1, MAX_RETRIES, ex.toString() - ); - if (numRetries < MAX_RETRIES) { - numRetries++; - replayIterToLastSuccessfulIndex(); - // Now, the currentImpl has been recreated and iterated to the same index + } catch (IOException ex) { + // `endsWith` should still work if the class is shaded. + final String exClassName = ex.getClass().getName(); + if (exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) { + try { + replayIterToLastSuccessfulIndex(ex); + } catch (IOException ex2) { + throw new RuntimeException(ex2); + } return next(); } else { - throw new RuntimeException(ex); + throw new UncheckedIOException(ex); } } } /** Throw a checked exception so we can catch this in the caller. */ - private String nextInternal() throws RemoteFileChangedException { + private String nextInternal() throws IOException { return currentIter.next(); } /** * Called after a RemoteFileChangedException was thrown. Tries to replay the underlying * iter implementation (supplied by the `implSupplier`) to the last successful index, so that - * the previous error open (hasNext, or next) can be retried. - * - * NOTE: This iter replay **itself** can throw a RemoteFileChangedException. Let's not deal - * with that for now - that would require handling exception inception. + * the previous error open (hasNext, or next) can be retried. If a RemoteFileChangedException + * is thrown while replaying the iter, we just increment the `numRetries` counter and try again. */ - private void replayIterToLastSuccessfulIndex() { - // We still need to close the currentImpl, even though it threw - try { - currentIter.close(); - } catch (IOException e) { - throw new RuntimeException(e); + private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOException { + LOG.warn( + "Caught a RemoteFileChangedException in `next`. NumRetries is {} / {}.\n{}", + numRetries + 1, MAX_RETRIES, topLevelEx + ); + currentIter.close(); + + while (numRetries < MAX_RETRIES) { + numRetries++; + LOG.info("Replaying until (inclusive) index {}", lastSuccessfullIndex); + currentIter = iterSupplier.get(); + int replayIndex = 0; + try { + while (replayIndex < lastSuccessfullIndex) { + if (currentIter.hasNext()) { + currentIter.next(); // Disregard data that has been read + replayIndex++; + } else { + throw new IllegalStateException( + "a retried iterator doesn't have enough data (currentIndex=" + + replayIndex + ", lastSuccessfullIndex=" + lastSuccessfullIndex + ")"); + } + } + + // Just like how in RetryableCloseableIterator::next we have to handle + // RemoteFileChangedException, we must also hadnle that here during the replay. + // `currentIter.next()` isn't declared to throw a RemoteFileChangedException, so we + // trick the compiler into thinking this block can throw RemoteFileChangedException + // via `fakeIOException`. That way, we can catch it, and retry replaying the iter. + fakeIOException(); + + return; + } catch (IOException ex) { + final String exClassName = ex.getClass().getName(); + if (exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) { + // Ignore and try replaying the iter again at the top of the while loop + } else { + throw ex; + } + } + LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex); } - LOG.info("Replaying until (inclusive) index {}", lastSuccessfullIndex); - currentIter = iterSupplier.get(); // this last impl threw an exception and is useless! - - // Note: we iterate until `i` == `lastSuccessfullIndex`, so that index `i` is the last - // successfully returned index. - // - // e.g. `i` starts at -1. after the 1st currentImpl.next() call, i will be incremented to 0. - // This makes sense as per the `lastSuccessfullIndex` semantics, since 0 is the last - // index to be successfully returned. - // e.g. suppose `lastSuccessfulIndex` is 25. Then we have read 26 items, with indices 0 to - // 25 inclusive. Then we want to iterate while i < lastSuccessfullIndex. After that, - // i will increment to 25 and we will exit the for loop. - for (int i = -1; i < lastSuccessfullIndex; i++) { - // Note: this does NOT touch RetryableCloseableIterator::next and so does not change - // the index - currentIter.next(); - } + throw new IOException(topLevelEx); + } - LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex); + private void fakeIOException() throws IOException { + if (false) { + throw new IOException(); + } } } diff --git a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala index f2c36026324..59426e434e1 100644 --- a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala +++ b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala @@ -127,6 +127,10 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { assert(!testIter2.hasNext) } + test("handles exceptions while retrying") { + + } + test("throws after MAX_RETRIES exceptions") { // Here, we will try to iterate from [0, 200] but getFailingIterSupplier fails at indices // 25, 50, 75, 110. From 12e1daec47726ce2fd31b9353077c11e69a01add Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 25 Apr 2023 09:31:35 -0700 Subject: [PATCH 06/12] Refactor and clean up tests --- build.sbt | 5 +- .../storage/RetryableCloseableIterator.java | 25 +++++-- .../RetryableCloseableIteratorSuite.scala | 73 ++++++++++--------- 3 files changed, 61 insertions(+), 42 deletions(-) diff --git a/build.sbt b/build.sbt index 1ee6ebe6e64..64377ed99cb 100644 --- a/build.sbt +++ b/build.sbt @@ -178,6 +178,9 @@ lazy val storage = (project in file("storage")) // Note that the org.apache.hadoop.fs.s3a.Listing::createFileStatusListingIterator 3.3.1 API // is not compatible with 3.3.2. "org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided", + + // Test Deps + "org.scalatest" %% "scalatest" % "3.2.11" % "test", ) ) @@ -195,8 +198,8 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb")) libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk" % "1.7.4" % "provided", - "org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided", // RemoteFileChangedException + // Test Deps "org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "test", // RemoteFileChangedException ) ) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java index 2a2f64f2cb0..7f285354022 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java @@ -78,7 +78,7 @@ public boolean hasNext() { try { replayIterToLastSuccessfulIndex(ex); } catch (IOException ex2) { - throw new RuntimeException(ex2); + throw new UncheckedIOException(ex2); } return hasNext(); } else { @@ -108,7 +108,7 @@ public String next() { try { replayIterToLastSuccessfulIndex(ex); } catch (IOException ex2) { - throw new RuntimeException(ex2); + throw new UncheckedIOException(ex2); } return next(); } else { @@ -137,9 +137,15 @@ private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOEx while (numRetries < MAX_RETRIES) { numRetries++; - LOG.info("Replaying until (inclusive) index {}", lastSuccessfullIndex); + LOG.info( + "Replaying until (inclusive) index {}. NumRetries is {} / {}.", + lastSuccessfullIndex, numRetries + 1, MAX_RETRIES + ); currentIter = iterSupplier.get(); - int replayIndex = 0; + + // Last successful index replayed. Starts at -1, and not 0, because 0 means we've + // already replayed the 1st element! + int replayIndex = -1; try { while (replayIndex < lastSuccessfullIndex) { if (currentIter.hasNext()) { @@ -147,8 +153,13 @@ private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOEx replayIndex++; } else { throw new IllegalStateException( - "a retried iterator doesn't have enough data (currentIndex=" + - replayIndex + ", lastSuccessfullIndex=" + lastSuccessfullIndex + ")"); + String.format( + "A retried iterator doesn't have enough data " + + "(replayIndex=%s, lastSuccessfullIndex=%s)", + replayIndex, + lastSuccessfullIndex + ) + ); } } @@ -171,7 +182,7 @@ private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOEx LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex); } - throw new IOException(topLevelEx); + throw topLevelEx; } private void fakeIOException() throws IOException { diff --git a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala index 59426e434e1..ff810401d7c 100644 --- a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala +++ b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala @@ -1,5 +1,6 @@ package io.delta.storage +import java.io.UncheckedIOException import java.util.function.Supplier import scala.collection.JavaConverters._ @@ -12,7 +13,7 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { private def getIter( range: Range, - throwAtIndices: Seq[Int] = Seq.empty): CloseableIterator[String] = + throwAtIndex: Option[Int] = None): CloseableIterator[String] = new CloseableIterator[String] { var index = 0 val impl = range.iterator.asJava @@ -20,7 +21,7 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { override def close(): Unit = { } override def hasNext: Boolean = { - if (throwAtIndices.contains(index)) { + if (throwAtIndex.contains(index)) { println(s"`hasNext` throwing for index $index") throw new RemoteFileChangedException(s"path -> index $index", "operation", "msg"); } @@ -29,7 +30,7 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { } override def next(): String = { - if (throwAtIndices.contains(index)) { + if (throwAtIndex.contains(index)) { println(s"`next` throwing for index $index") throw new RemoteFileChangedException(s"path -> index $index", "operation", "msg"); } @@ -46,32 +47,20 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { * Provide a suitable input range to get the # of failures you want. e.g. range 0 to 100 will fail * 3 times. */ - def getFailingIterSupplier(range: Range): Supplier[CloseableIterator[String]] = + def getFailingIterSupplier( + range: Range, + failIndices: Seq[Int] = Seq.empty): Supplier[CloseableIterator[String]] = new Supplier[CloseableIterator[String]] { var numGetCalls = 0 - override def get(): CloseableIterator[String] = numGetCalls match { - case 0 => - // The 1st time this has been called! This will fail at index 25 - numGetCalls = numGetCalls + 1 - getIter(range, Seq(25, 50, 75, 110)) - case 1 => - // The 2nd time this has been called! The underlying RetryableCloseableIterator should - // replay until index 25, and then continue. - numGetCalls = numGetCalls + 1 - getIter(range, Seq(50, 75, 110)) - case 2 => - // The 3rd time this has been called! The underlying RetryableCloseableIterator should - // replay until index 50, and then continue. - numGetCalls = numGetCalls + 1 - getIter(range, Seq(75, 110)) - case 3 => - // The 4th time this has been called! The underlying RetryableCloseableIterator should - // replay until index 75, and then continue. + override def get(): CloseableIterator[String] = { + if (numGetCalls < failIndices.length) { + val result = getIter(range, Some(failIndices(numGetCalls))) numGetCalls = numGetCalls + 1 - getIter(range, Seq(110)) - case _ => throw new RuntimeException("Should never all a 5th time - there's only 4 (1st " + - "call + 3 retries) allowed!") + result + } else { + getIter(range) + } } } @@ -90,14 +79,16 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { test("complex case - replays underlying iter back to correct index after error") { // Here, we just do the simplest verification - val testIter1 = new RetryableCloseableIterator(getFailingIterSupplier(0 to 100)) - val output1 = testIter1.asScala.toList + val testIter1 = new RetryableCloseableIterator( + getFailingIterSupplier(0 to 100, Seq(25, 50, 75))) // this asserts the size, order, and elements of the testIter1 - assert(output1.map(_.toInt) == (0 to 100).toList) + assert(testIter1.asScala.toList.map(_.toInt) == (0 to 100).toList) // Here, we do more complex verification - val testIter2 = new RetryableCloseableIterator(getFailingIterSupplier(0 to 100)) + val testIter2 = new RetryableCloseableIterator( + getFailingIterSupplier(0 to 100, Seq(25, 50, 75))) + for (_ <- 0 to 24) { testIter2.next() } assert(testIter2.getLastSuccessfullIndex == 24) assert(testIter2.getNumRetries == 0) @@ -128,15 +119,29 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { } test("handles exceptions while retrying") { - + // Iterates normally until index 50 (return [0, 49] successfully). Then fails. + // Tries to replay, but fails at 30 + // Tries to replay again, but fails at 20 + // Successfully replays to 49, starts returning results from index 50 (inclusive) again + val testIter1 = + new RetryableCloseableIterator(getFailingIterSupplier(0 to 100, Seq(50, 30, 20))) + + assert(testIter1.asScala.toList.map(_.toInt) == (0 to 100).toList) + + // Iterates normally until index 50 (return [0, 49] successfully). Then fails. + // Successfully replayed to 49, starts returning results from index 50 (inclusive) + // Fails at index 50 (returned [50, 69]). Tries to replay, but fails at 5 + // Successfully replayes until 69, then normally returns results from 70 + val testIter2 = + new RetryableCloseableIterator(getFailingIterSupplier(0 to 100, Seq(50, 70, 5))) + assert(testIter2.asScala.toList.map(_.toInt) == (0 to 100).toList) } test("throws after MAX_RETRIES exceptions") { - // Here, we will try to iterate from [0, 200] but getFailingIterSupplier fails at indices - // 25, 50, 75, 110. - val testIter = new RetryableCloseableIterator(getFailingIterSupplier(0 to 200)) + val testIter = + new RetryableCloseableIterator(getFailingIterSupplier(0 to 100, Seq(20, 49, 60, 80))) - for (i <- 0 to 109) { + for (i <- 0 to 79) { assert(testIter.next().toInt == i) } assert(testIter.getNumRetries == 3) From 1e30ed8eceff15b508988b03231937d42e9d01f1 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 25 Apr 2023 09:47:26 -0700 Subject: [PATCH 07/12] Add a conf to set the max number of retries --- .../storage/RetryableCloseableIterator.java | 22 ++++++++---- .../io/delta/storage/S3DynamoDBLogStore.java | 35 ++++++++++++++----- .../RetryableCloseableIteratorSuite.scala | 30 ++++++++++++++-- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java index 7f285354022..42328ccf5f5 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java @@ -26,11 +26,12 @@ public class RetryableCloseableIterator implements CloseableIterator { private static final Logger LOG = LoggerFactory.getLogger(RetryableCloseableIterator.class); - /** Visible for testing. */ - public static final int MAX_RETRIES = 3; + public static final int DEFAULT_MAX_RETRIES = 3; private final Supplier> iterSupplier; + private final int maxRetries; + /** * Index of the last element successfully returned without an exception. A value of -1 means * that no element has ever been returned yet. @@ -41,12 +42,21 @@ public class RetryableCloseableIterator implements CloseableIterator { private CloseableIterator currentIter; - public RetryableCloseableIterator(Supplier> iterSupplier) { + public RetryableCloseableIterator( + Supplier> iterSupplier, + int maxRetries) { + if (maxRetries < 0) throw new IllegalArgumentException("maxRetries can't be negative"); + this.iterSupplier = Objects.requireNonNull(iterSupplier); + this.maxRetries = maxRetries; this.lastSuccessfullIndex = -1; this.currentIter = this.iterSupplier.get(); } + public RetryableCloseableIterator(Supplier> iterSupplier) { + this(iterSupplier, DEFAULT_MAX_RETRIES); + } + /** Visible for testing. */ public int getLastSuccessfullIndex() { return lastSuccessfullIndex; @@ -131,15 +141,15 @@ private String nextInternal() throws IOException { private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOException { LOG.warn( "Caught a RemoteFileChangedException in `next`. NumRetries is {} / {}.\n{}", - numRetries + 1, MAX_RETRIES, topLevelEx + numRetries + 1, maxRetries, topLevelEx ); currentIter.close(); - while (numRetries < MAX_RETRIES) { + while (numRetries < maxRetries) { numRetries++; LOG.info( "Replaying until (inclusive) index {}. NumRetries is {} / {}.", - lastSuccessfullIndex, numRetries + 1, MAX_RETRIES + lastSuccessfullIndex, numRetries + 1, maxRetries ); currentIter = iterSupplier.get(); diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java index f6e66ad2dac..d8610f63512 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java @@ -80,6 +80,7 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore { */ public static final String SPARK_CONF_PREFIX = "spark.io.delta.storage.S3DynamoDBLogStore"; public static final String BASE_CONF_PREFIX = "io.delta.storage.S3DynamoDBLogStore"; + public static final String READ_RETRIES = "read.retries"; public static final String DDB_CLIENT_TABLE = "ddb.tableName"; public static final String DDB_CLIENT_REGION = "ddb.region"; public static final String DDB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider"; @@ -139,15 +140,31 @@ public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException { @Override public CloseableIterator read(Path path, Configuration hadoopConf) throws IOException { - return new RetryableCloseableIterator(() -> { - try { - return super.read(path, hadoopConf); - } catch (IOException e) { - // There was an issue resolving the file system. Nothing to do with a - // RemoteFileChangedException. - throw new RuntimeException(e); - } - }); + // With many concurrent readers/writers, there's a chance that concurrent 'recovery' + // operations occur on the same file, i.e. the same temp file T(N) is copied into the target + // N.json file more than once. Though data loss will *NOT* occur, readers of N.json may + // receive a RemoteFileChangedException from S3 as the ETag of N.json was changed. This is + // safe to retry, so we do so here. + final int maxRetries = Integer.parseInt( + getParam( + hadoopConf, + READ_RETRIES, + Integer.toString(RetryableCloseableIterator.DEFAULT_MAX_RETRIES) + ) + ); + + return new RetryableCloseableIterator( + () -> { + try { + return super.read(path, hadoopConf); + } catch (IOException e) { + // There was an issue resolving the file system. Nothing to do with a + // RemoteFileChangedException. + throw new RuntimeException(e); + } + }, + maxRetries + ); } @Override diff --git a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala index ff810401d7c..6c43aa47986 100644 --- a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala +++ b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala @@ -1,6 +1,5 @@ package io.delta.storage -import java.io.UncheckedIOException import java.util.function.Supplier import scala.collection.JavaConverters._ @@ -131,13 +130,13 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { // Iterates normally until index 50 (return [0, 49] successfully). Then fails. // Successfully replayed to 49, starts returning results from index 50 (inclusive) // Fails at index 50 (returned [50, 69]). Tries to replay, but fails at 5 - // Successfully replayes until 69, then normally returns results from 70 + // Successfully replays until 69, then normally returns results from 70 val testIter2 = new RetryableCloseableIterator(getFailingIterSupplier(0 to 100, Seq(50, 70, 5))) assert(testIter2.asScala.toList.map(_.toInt) == (0 to 100).toList) } - test("throws after MAX_RETRIES exceptions") { + test("throws after maxRetries exceptions") { val testIter = new RetryableCloseableIterator(getFailingIterSupplier(0 to 100, Seq(20, 49, 60, 80))) @@ -151,4 +150,29 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { assert(ex.getCause.isInstanceOf[RemoteFileChangedException]) } + test("can specify maxRetries") { + val testIter1 = + new RetryableCloseableIterator( + getFailingIterSupplier(0 to 100, Seq(5, 10, 15, 20, 25, 30, 35, 40, 45, 50)), + 10 // maxRetries + ) + + assert(testIter1.asScala.toList.map(_.toInt) == (0 to 100).toList) + + val testIter2 = + new RetryableCloseableIterator( + getFailingIterSupplier(0 to 100, Seq(5, 10, 15, 20, 25, 30)), + 5 // maxRetries + ) + + for (i <- 0 to 29) { + assert(testIter2.next().toInt == i) + } + assert(testIter2.getNumRetries == 5) + val ex = intercept[RuntimeException] { + testIter2.next() + } + assert(ex.getCause.isInstanceOf[RemoteFileChangedException]) + } + } From de97330afdb2d718ef2930e43dc43a28acee3e07 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 25 Apr 2023 10:00:01 -0700 Subject: [PATCH 08/12] Add test for retried iter doesn't have enough data --- .../RetryableCloseableIteratorSuite.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala index 6c43aa47986..cfba7405146 100644 --- a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala +++ b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala @@ -175,4 +175,28 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { assert(ex.getCause.isInstanceOf[RemoteFileChangedException]) } + test("retried iterator doesn't have enough data (underlying data changed!)") { + val testIter = new RetryableCloseableIterator( + new Supplier[CloseableIterator[String]] { + var getCount = 0 + + override def get(): CloseableIterator[String] = getCount match { + case 0 => + getCount = getCount + 1 + getIter(0 to 100, Some(50)) // try to iterate 0->100, fail at 50 + + case 1 => + getCount = getCount + 1 + getIter(0 to 30) // try to replay 0 to 50, but no elements after 30! + } + } + ) + + for (_ <- 0 to 49) { testIter.next() } + val e = intercept[IllegalStateException] { + testIter.next() + } + assert(e.getMessage.contains("A retried iterator doesn't have enough data")) + } + } From c05d0c57118b4444c0550d2f1dc13ebd4c1c810b Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Tue, 25 Apr 2023 10:02:19 -0700 Subject: [PATCH 09/12] update exception msg --- .../main/java/io/delta/storage/RetryableCloseableIterator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java index 42328ccf5f5..475637efcdd 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java @@ -140,7 +140,7 @@ private String nextInternal() throws IOException { */ private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOException { LOG.warn( - "Caught a RemoteFileChangedException in `next`. NumRetries is {} / {}.\n{}", + "Caught a RemoteFileChangedException. NumRetries is {} / {}.\n{}", numRetries + 1, maxRetries, topLevelEx ); currentIter.close(); From 4c70285399f5dcb85f5a50611e9c3fc8ee996340 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Thu, 27 Apr 2023 09:27:02 -0700 Subject: [PATCH 10/12] Handle FileAlreadyExistsException in fixDeltaLog --- .../io/delta/storage/BaseExternalLogStore.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java index 9b31a294718..7f158b067ec 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java @@ -167,6 +167,8 @@ public void write( prevFileName ); if (prevEntry.isPresent() && !prevEntry.get().complete) { + // This will never throw a java.nio.file.FileAlreadyExistsException for N-1.json + // (which would be confusing for the writer trying to write N.json). fixDeltaLog(fs, prevEntry.get()); } else { if (!fs.exists(prevPath)) { @@ -334,6 +336,12 @@ protected void fixDeltaLogPutCompleteDbEntry(ExternalCommitEntry entry) throws I /** * Method for assuring consistency on filesystem according to the external cache. * Method tries to rewrite TransactionLog entry from temporary path if it does not exist. + * + * Should never throw a java.nio.file.FileAlreadyExistsException: + * - if N.json already exists, we either don't copy T(N) -> N.json, or we swallow the + * FileAlreadyExistsException + * - we won't receive a FileAlreadyExistsException when writing to the external cache, as + * overwrite will be true */ private void fixDeltaLog(FileSystem fs, ExternalCommitEntry entry) throws IOException { if (entry.complete) { @@ -351,7 +359,12 @@ private void fixDeltaLog(FileSystem fs, ExternalCommitEntry entry) throws IOExce fixDeltaLogPutCompleteDbEntry(entry); LOG.info("fixed {}", entry.fileName); return; - } catch(Throwable e) { + } catch (java.nio.file.FileAlreadyExistsException e) { + LOG.info("{}:", e.getClass().getSimpleName(), e); + copied = true; + + // Never re-throw a FileAlreadyExistsException + } catch (IOException e) { LOG.info("{}:", e.getClass().getSimpleName(), e); if (retry >= 3) { throw e; From 0cf30e27cd4e38a75fee57154494d57eb3bc2a17 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Thu, 27 Apr 2023 10:26:32 -0700 Subject: [PATCH 11/12] Clean up PR & respond to PR comments --- .../storage/RetryableCloseableIterator.java | 77 ++++++++++++------- .../io/delta/storage/S3DynamoDBLogStore.java | 3 +- .../RetryableCloseableIteratorSuite.scala | 36 +++++++-- 3 files changed, 81 insertions(+), 35 deletions(-) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java index 475637efcdd..300942e9978 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/RetryableCloseableIterator.java @@ -6,7 +6,6 @@ import java.util.Objects; import java.util.function.Supplier; -import org.apache.hadoop.fs.s3a.RemoteFileChangedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,21 +56,13 @@ public RetryableCloseableIterator(Supplier> iterSuppli this(iterSupplier, DEFAULT_MAX_RETRIES); } - /** Visible for testing. */ - public int getLastSuccessfullIndex() { - return lastSuccessfullIndex; - } - - /** Visible for testing. */ - public int getNumRetries() { - return numRetries; - } + ///////////////// + // Public APIs // + ///////////////// @Override public void close() throws IOException { - if (currentIter != null) { - currentIter.close(); - } + currentIter.close(); } /** @@ -82,9 +73,7 @@ public boolean hasNext() { try { return hasNextInternal(); } catch (IOException ex) { - // `endsWith` should still work if the class is shaded. - final String exClassName = ex.getClass().getName(); - if (exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) { + if (isRemoteFileChangedException(ex)) { try { replayIterToLastSuccessfulIndex(ex); } catch (IOException ex2) { @@ -98,11 +87,6 @@ public boolean hasNext() { } } - /** Throw a checked exception so we can catch this in the caller. */ - private boolean hasNextInternal() throws IOException { - return currentIter.hasNext(); - } - @Override public String next() { if (!hasNext()) throw new NoSuchElementException(); @@ -112,14 +96,23 @@ public String next() { lastSuccessfullIndex++; return ret; } catch (IOException ex) { - // `endsWith` should still work if the class is shaded. - final String exClassName = ex.getClass().getName(); - if (exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) { + if (isRemoteFileChangedException(ex)) { try { replayIterToLastSuccessfulIndex(ex); } catch (IOException ex2) { throw new UncheckedIOException(ex2); } + + if (!hasNext()) { + throw new IllegalStateException( + String.format( + "A retried iterator doesn't have enough data " + + "(hasNext=false, lastSuccessfullIndex=%s)", + lastSuccessfullIndex + ) + ); + } + return next(); } else { throw new UncheckedIOException(ex); @@ -127,6 +120,29 @@ public String next() { } } + ////////////////////////////////////// + // Package-private APIs for testing // + ////////////////////////////////////// + + /** Visible for testing. */ + int getLastSuccessfullIndex() { + return lastSuccessfullIndex; + } + + /** Visible for testing. */ + int getNumRetries() { + return numRetries; + } + + //////////////////// + // Helper Methods // + //////////////////// + + /** Throw a checked exception so we can catch this in the caller. */ + private boolean hasNextInternal() throws IOException { + return currentIter.hasNext(); + } + /** Throw a checked exception so we can catch this in the caller. */ private String nextInternal() throws IOException { return currentIter.next(); @@ -180,21 +196,28 @@ private void replayIterToLastSuccessfulIndex(IOException topLevelEx) throws IOEx // via `fakeIOException`. That way, we can catch it, and retry replaying the iter. fakeIOException(); + LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex); + return; } catch (IOException ex) { - final String exClassName = ex.getClass().getName(); - if (exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) { + if (isRemoteFileChangedException(ex)) { // Ignore and try replaying the iter again at the top of the while loop + LOG.warn("Caught a RemoteFileChangedException while replaying the iterator"); } else { throw ex; } } - LOG.info("Successfully replayed until (inclusive) index {}", lastSuccessfullIndex); } throw topLevelEx; } + private boolean isRemoteFileChangedException(IOException ex) { + // `endsWith` should still work if the class is shaded. + final String exClassName = ex.getClass().getName(); + return exClassName.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException"); + } + private void fakeIOException() throws IOException { if (false) { throw new IOException(); diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java index d8610f63512..2351ec2e6c0 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -160,7 +161,7 @@ public CloseableIterator read(Path path, Configuration hadoopConf) throw } catch (IOException e) { // There was an issue resolving the file system. Nothing to do with a // RemoteFileChangedException. - throw new RuntimeException(e); + throw new UncheckedIOException(e); } }, maxRetries diff --git a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala index cfba7405146..f9d3f4a1dff 100644 --- a/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala +++ b/storage-s3-dynamodb/src/test/scala/io/delta/storage/RetryableCloseableIteratorSuite.scala @@ -7,7 +7,6 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.s3a.RemoteFileChangedException import org.scalatest.funsuite.AnyFunSuite -// scalastyle:off println class RetryableCloseableIteratorSuite extends AnyFunSuite { private def getIter( @@ -20,17 +19,11 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { override def close(): Unit = { } override def hasNext: Boolean = { - if (throwAtIndex.contains(index)) { - println(s"`hasNext` throwing for index $index") - throw new RemoteFileChangedException(s"path -> index $index", "operation", "msg"); - } - impl.hasNext } override def next(): String = { if (throwAtIndex.contains(index)) { - println(s"`next` throwing for index $index") throw new RemoteFileChangedException(s"path -> index $index", "operation", "msg"); } @@ -199,4 +192,33 @@ class RetryableCloseableIteratorSuite extends AnyFunSuite { assert(e.getMessage.contains("A retried iterator doesn't have enough data")) } + test("after replaying the iter, hasNext is false") { + val testIter = new RetryableCloseableIterator( + new Supplier[CloseableIterator[String]] { + var getCount = 0 + + override def get(): CloseableIterator[String] = getCount match { + case 0 => + getCount = getCount + 1 + getIter(0 to 100, Some(50)) // try to iterate 0->100, fail at 50 + + case 1 => + getCount = getCount + 1 + // when we failed at index 50 above, the lastSuccessfulIndex was 49. here, we can + // replay back to index 49, but the `hasNext` call will be false! + getIter(0 to 49) + } + } + ) + + for (_ <- 0 to 49) { testIter.next() } + assert(testIter.getLastSuccessfullIndex == 49) + + val e = intercept[IllegalStateException] { + testIter.next() + } + assert(e.getMessage.contains("A retried iterator doesn't have enough data (hasNext=false, " + + "lastSuccessfullIndex=49)")) + } + } From 038c0b830f5b1a505115e9d0ca57f1be133f1e18 Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 28 Apr 2023 10:08:30 -0700 Subject: [PATCH 12/12] Revert "Handle FileAlreadyExistsException in fixDeltaLog" This reverts commit 4c70285399f5dcb85f5a50611e9c3fc8ee996340. --- .../io/delta/storage/BaseExternalLogStore.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java b/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java index 7f158b067ec..9b31a294718 100644 --- a/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java +++ b/storage-s3-dynamodb/src/main/java/io/delta/storage/BaseExternalLogStore.java @@ -167,8 +167,6 @@ public void write( prevFileName ); if (prevEntry.isPresent() && !prevEntry.get().complete) { - // This will never throw a java.nio.file.FileAlreadyExistsException for N-1.json - // (which would be confusing for the writer trying to write N.json). fixDeltaLog(fs, prevEntry.get()); } else { if (!fs.exists(prevPath)) { @@ -336,12 +334,6 @@ protected void fixDeltaLogPutCompleteDbEntry(ExternalCommitEntry entry) throws I /** * Method for assuring consistency on filesystem according to the external cache. * Method tries to rewrite TransactionLog entry from temporary path if it does not exist. - * - * Should never throw a java.nio.file.FileAlreadyExistsException: - * - if N.json already exists, we either don't copy T(N) -> N.json, or we swallow the - * FileAlreadyExistsException - * - we won't receive a FileAlreadyExistsException when writing to the external cache, as - * overwrite will be true */ private void fixDeltaLog(FileSystem fs, ExternalCommitEntry entry) throws IOException { if (entry.complete) { @@ -359,12 +351,7 @@ private void fixDeltaLog(FileSystem fs, ExternalCommitEntry entry) throws IOExce fixDeltaLogPutCompleteDbEntry(entry); LOG.info("fixed {}", entry.fileName); return; - } catch (java.nio.file.FileAlreadyExistsException e) { - LOG.info("{}:", e.getClass().getSimpleName(), e); - copied = true; - - // Never re-throw a FileAlreadyExistsException - } catch (IOException e) { + } catch(Throwable e) { LOG.info("{}:", e.getClass().getSimpleName(), e); if (retry >= 3) { throw e;