diff --git a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java index b4d196a..40b5789 100644 --- a/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java +++ b/datafusion-java/src/main/java/org/apache/arrow/datafusion/DefaultDataFrame.java @@ -2,6 +2,7 @@ import java.nio.file.Path; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.ipc.ArrowFileReader; import org.apache.arrow.vector.ipc.ArrowReader; @@ -69,13 +70,7 @@ public CompletableFuture show() { DataFrames.showDataframe( runtimePointer, dataframe, - (String errString) -> { - if (ErrorUtil.containsError(errString)) { - future.completeExceptionally(new RuntimeException(errString)); - } else { - future.complete(null); - } - }); + new RuntimeExceptionCallback(future)); return future; } @@ -89,13 +84,7 @@ public CompletableFuture writeParquet(Path path) { runtimePointer, dataframe, path.toAbsolutePath().toString(), - (String errString) -> { - if (ErrorUtil.containsError(errString)) { - future.completeExceptionally(new RuntimeException(errString)); - } else { - future.complete(null); - } - }); + new RuntimeExceptionCallback(future)); return future; } @@ -109,13 +98,7 @@ public CompletableFuture writeCsv(Path path) { runtimePointer, dataframe, path.toAbsolutePath().toString(), - (String errString) -> { - if (ErrorUtil.containsError(errString)) { - future.completeExceptionally(new RuntimeException(errString)); - } else { - future.complete(null); - } - }); + new RuntimeExceptionCallback(future)); return future; } @@ -130,4 +113,21 @@ public TableProvider intoView() { void doClose(long pointer) { DataFrames.destroyDataFrame(pointer); } + + private static class RuntimeExceptionCallback implements Consumer { + private final CompletableFuture future; + + private RuntimeExceptionCallback(CompletableFuture future) { + this.future = future; + } + + @Override + public void accept(String errString) { + if (ErrorUtil.containsError(errString)) { + future.completeExceptionally(new RuntimeException(errString)); + } else { + future.complete(null); + } + } + } }