From 846df73050d93abafe8a8849475473e696243faf Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 25 Jan 2019 10:00:27 -0500 Subject: [PATCH] Implement put in Java Flight integration server --- .../integration/IntegrationTestServer.java | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestServer.java b/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestServer.java index 7b45e53a149be..2b78cca93aaef 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestServer.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestServer.java @@ -18,6 +18,7 @@ package org.apache.arrow.flight.example.integration; import java.io.File; +import java.io.FileOutputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.concurrent.Callable; @@ -39,6 +40,9 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.ipc.ArrowFileWriter; import org.apache.arrow.vector.ipc.JsonFileReader; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.commons.cli.CommandLine; @@ -131,7 +135,7 @@ public FlightInfo getFlightInfo(FlightDescriptor descriptor) { Schema schema = reader.start(); return new FlightInfo(schema, descriptor, Collections.singletonList(new FlightEndpoint(new Ticket(path.getBytes()), - new Location("localhost", 31338))), + new Location("localhost", 31338))), 0, 0); } catch (Exception e) { throw new RuntimeException(e); @@ -140,12 +144,35 @@ public FlightInfo getFlightInfo(FlightDescriptor descriptor) { @Override public Callable acceptPut(FlightStream flightStream) { - return null; + return () -> { + if (flightStream.getDescriptor().isCommand()) { + throw new UnsupportedOperationException("Commands not supported."); + } + if (flightStream.getDescriptor().getPath().size() < 1) { + throw new IllegalArgumentException("Must provide a path."); + } + String path = flightStream.getDescriptor().getPath().get(0); + File outputFile = new File(path); + if (!outputFile.createNewFile()) { + throw new IllegalStateException("File already exists."); + } + try (VectorSchemaRoot root = flightStream.getRoot(); + FileOutputStream fileOutputStream = new FileOutputStream(outputFile); + ArrowFileWriter writer = new ArrowFileWriter(root, new DictionaryProvider.MapDictionaryProvider(), + fileOutputStream.getChannel())) { + writer.start(); + while (flightStream.next()) { + writer.writeBatch(); + } + writer.end(); + } + return Flight.PutResult.getDefaultInstance(); + }; } @Override public Result doAction(Action action) { - return null; + throw new UnsupportedOperationException("No actions implemented."); } @Override