From 5fe04b8fa43db40608ed1a594ab9b2421c8349c1 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 26 Jul 2022 10:06:13 -0400
Subject: [PATCH] ARROW-17199: [Java][FlightRPC] Clean up example Flight RPC
server
---
.../arrow/flight/sql/FlightSqlClient.java | 11 +-
.../flight/sql/example/FlightSqlExample.java | 585 ++----------------
2 files changed, 52 insertions(+), 544 deletions(-)
diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java
index 221b9d0c76047..dd9480f40041b 100644
--- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java
+++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java
@@ -38,7 +38,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.channels.Channels;
-import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -442,12 +441,8 @@ public PreparedStatement prepare(final String query, final CallOption... options
}
@Override
- public void close() throws SQLException {
- try {
- AutoCloseables.close(client);
- } catch (final Exception e) {
- throw new SQLException(e);
- }
+ public void close() throws Exception {
+ AutoCloseables.close(client);
}
/**
@@ -557,7 +552,7 @@ private Schema deserializeSchema(final ByteString bytes) {
* @param options RPC-layer hints for this call.
* @return a FlightInfo object representing the stream(s) to fetch.
*/
- public FlightInfo execute(final CallOption... options) throws SQLException {
+ public FlightInfo execute(final CallOption... options) {
checkOpen();
final FlightDescriptor descriptor = FlightDescriptor
diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java
index 553357a4ffbfa..baf162cb919fc 100644
--- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java
+++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java
@@ -41,7 +41,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.file.Files;
@@ -50,16 +49,13 @@
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
-import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.SQLSyntaxErrorException;
import java.sql.Statement;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@@ -71,7 +67,6 @@
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
-import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -82,12 +77,14 @@
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
+import org.apache.arrow.adapter.jdbc.JdbcParameterBinder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.PutResult;
@@ -115,32 +112,10 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
-import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DateMilliVector;
-import org.apache.arrow.vector.Decimal256Vector;
-import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.LargeVarCharVector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TimeMicroVector;
-import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeNanoVector;
-import org.apache.arrow.vector.TimeSecVector;
-import org.apache.arrow.vector.TimeStampMicroTZVector;
-import org.apache.arrow.vector.TimeStampMilliTZVector;
-import org.apache.arrow.vector.TimeStampNanoTZVector;
-import org.apache.arrow.vector.TimeStampSecTZVector;
-import org.apache.arrow.vector.TimeStampVector;
-import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt1Vector;
-import org.apache.arrow.vector.UInt2Vector;
-import org.apache.arrow.vector.UInt4Vector;
-import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
@@ -176,15 +151,8 @@
import com.google.protobuf.ProtocolStringList;
/**
- * Proof of concept {@link FlightSqlProducer} implementation showing an Apache Derby backed Flight SQL server capable
- * of the following workflows:
- *
- * - returning a list of tables from the action `GetTables`.
- * - creation of a prepared statement from the action `CreatePreparedStatement`.
- * - execution of a prepared statement by using a {@link CommandPreparedStatementQuery}
- * with {@link #getFlightInfo} and {@link #getStream}.
+ * Example {@link FlightSqlProducer} implementation showing an Apache Derby backed Flight SQL server that generally
+ * supports all current features of Flight SQL.
*/
public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
private static final String DATABASE_URI = "jdbc:derby:target/derbyDB";
@@ -199,6 +167,17 @@ public class FlightSqlExample implements FlightSqlProducer, AutoCloseable {
private final Cache> statementLoadingCache;
private final SqlInfoBuilder sqlInfoBuilder;
+ public static void main(String[] args) throws Exception {
+ Location location = Location.forGrpcInsecure("localhost", 55555);
+ final FlightSqlExample example = new FlightSqlExample(location);
+ Location listenLocation = Location.forGrpcInsecure("0.0.0.0", 55555);
+ try (final BufferAllocator allocator = new RootAllocator();
+ final FlightServer server = FlightServer.builder(allocator, listenLocation, example).build()) {
+ server.start();
+ server.awaitTermination();
+ }
+ }
+
public FlightSqlExample(final Location location) {
// TODO Constructor should not be doing work.
checkState(
@@ -685,7 +664,7 @@ public void getStreamPreparedStatement(final CommandPreparedStatementQuery comma
}
} catch (final SQLException | IOException e) {
LOGGER.error(format("Failed to getStreamPreparedStatement: <%s>.", e.getMessage()), e);
- listener.error(e);
+ listener.error(CallStatus.INTERNAL.withDescription("Failed to prepare statement: " + e).toRuntimeException());
} finally {
listener.completed();
}
@@ -810,11 +789,16 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
.setPreparedStatementHandle(preparedStatementHandle)
.build();
listener.onNext(new Result(pack(result).toByteArray()));
+ } catch (final SQLException e) {
+ listener.onError(CallStatus.INTERNAL
+ .withDescription("Failed to create prepared statement: " + e)
+ .toRuntimeException());
+ return;
} catch (final Throwable t) {
- listener.onError(t);
- } finally {
- listener.onCompleted();
+ listener.onError(CallStatus.INTERNAL.withDescription("Unknown error: " + t).toRuntimeException());
+ return;
}
+ listener.onCompleted();
});
}
@@ -843,8 +827,14 @@ public Runnable acceptPutStatement(CommandStatementUpdate command,
ackStream.onNext(PutResult.metadata(buffer));
ackStream.onCompleted();
}
+ } catch (SQLSyntaxErrorException e) {
+ ackStream.onError(CallStatus.INVALID_ARGUMENT
+ .withDescription("Failed to execute statement (invalid syntax): " + e)
+ .toRuntimeException());
} catch (SQLException e) {
- ackStream.onError(e);
+ ackStream.onError(CallStatus.INTERNAL
+ .withDescription("Failed to execute statement: " + e)
+ .toRuntimeException());
}
};
}
@@ -856,7 +846,12 @@ public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate
preparedStatementLoadingCache.getIfPresent(command.getPreparedStatementHandle());
return () -> {
- assert statement != null;
+ if (statement == null) {
+ ackStream.onError(CallStatus.NOT_FOUND
+ .withDescription("Prepared statement does not exist")
+ .toRuntimeException());
+ return;
+ }
try {
final PreparedStatement preparedStatement = statement.getStatement();
@@ -870,9 +865,12 @@ public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate
preparedStatement.execute();
recordCount = preparedStatement.getUpdateCount();
} else {
- setDataPreparedStatement(preparedStatement, root, true);
- int[] recordCount1 = preparedStatement.executeBatch();
- recordCount = Arrays.stream(recordCount1).sum();
+ final JdbcParameterBinder binder = JdbcParameterBinder.builder(preparedStatement, root).bindAll().build();
+ while (binder.next()) {
+ preparedStatement.addBatch();
+ }
+ int[] recordCounts = preparedStatement.executeBatch();
+ recordCount = Arrays.stream(recordCounts).sum();
}
final DoPutUpdateResult build =
@@ -884,501 +882,13 @@ public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate
}
}
} catch (SQLException e) {
- ackStream.onError(e);
+ ackStream.onError(CallStatus.INTERNAL.withDescription("Failed to execute update: " + e).toRuntimeException());
return;
}
ackStream.onCompleted();
};
}
- /**
- * Method responsible to set the parameters, to the preparedStatement object, sent via doPut request.
- *
- * @param preparedStatement the preparedStatement object for the operation.
- * @param root a {@link VectorSchemaRoot} object contain the values to be used in the
- * PreparedStatement setters.
- * @param isUpdate a flag to indicate if is an update or query operation.
- * @throws SQLException in case of error.
- */
- private void setDataPreparedStatement(PreparedStatement preparedStatement, VectorSchemaRoot root,
- boolean isUpdate)
- throws SQLException {
- for (int i = 0; i < root.getRowCount(); i++) {
- for (FieldVector vector : root.getFieldVectors()) {
- final int vectorPosition = root.getFieldVectors().indexOf(vector);
- final int position = vectorPosition + 1;
-
- if (vector instanceof UInt1Vector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (UInt1Vector) vector);
- } else if (vector instanceof TimeStampNanoTZVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TimeStampNanoTZVector) vector);
- } else if (vector instanceof TimeStampMicroTZVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TimeStampMicroTZVector) vector);
- } else if (vector instanceof TimeStampMilliTZVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TimeStampMilliTZVector) vector);
- } else if (vector instanceof TimeStampSecTZVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TimeStampSecTZVector) vector);
- } else if (vector instanceof UInt2Vector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (UInt2Vector) vector);
- } else if (vector instanceof UInt4Vector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (UInt4Vector) vector);
- } else if (vector instanceof UInt8Vector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (UInt8Vector) vector);
- } else if (vector instanceof TinyIntVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TinyIntVector) vector);
- } else if (vector instanceof SmallIntVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (SmallIntVector) vector);
- } else if (vector instanceof IntVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (IntVector) vector);
- } else if (vector instanceof BigIntVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (BigIntVector) vector);
- } else if (vector instanceof Float4Vector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (Float4Vector) vector);
- } else if (vector instanceof Float8Vector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (Float8Vector) vector);
- } else if (vector instanceof BitVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (BitVector) vector);
- } else if (vector instanceof DecimalVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (DecimalVector) vector);
- } else if (vector instanceof Decimal256Vector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (Decimal256Vector) vector);
- } else if (vector instanceof TimeStampVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TimeStampVector) vector);
- } else if (vector instanceof TimeNanoVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TimeNanoVector) vector);
- } else if (vector instanceof TimeMicroVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TimeMicroVector) vector);
- } else if (vector instanceof TimeMilliVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TimeMilliVector) vector);
- } else if (vector instanceof TimeSecVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (TimeSecVector) vector);
- } else if (vector instanceof DateDayVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (DateDayVector) vector);
- } else if (vector instanceof DateMilliVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (DateMilliVector) vector);
- } else if (vector instanceof VarCharVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (VarCharVector) vector);
- } else if (vector instanceof LargeVarCharVector) {
- setOnPreparedStatement(preparedStatement, position, vectorPosition, (LargeVarCharVector) vector);
- }
- }
- if (isUpdate) {
- preparedStatement.addBatch();
- }
- }
- }
-
- protected TimeZone getTimeZoneForVector(TimeStampVector vector) {
- ArrowType.Timestamp arrowType = (ArrowType.Timestamp) vector.getField().getFieldType().getType();
-
- String timezoneName = arrowType.getTimezone();
- if (timezoneName == null) {
- return TimeZone.getDefault();
- }
-
- return TimeZone.getTimeZone(timezoneName);
- }
-
- /**
- * Set a string parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, VarCharVector vector)
- throws SQLException {
- final Text object = vector.getObject(vectorIndex);
- statement.setObject(column, object.toString());
- }
-
- /**
- * Set a string parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex,
- LargeVarCharVector vector)
- throws SQLException {
- final Text object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a byte parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, TinyIntVector vector)
- throws SQLException {
- final Byte object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a short parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, SmallIntVector vector)
- throws SQLException {
- final Short object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set an integer parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, IntVector vector)
- throws SQLException {
- final Integer object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a long parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, BigIntVector vector)
- throws SQLException {
- final Long object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a float parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, Float4Vector vector)
- throws SQLException {
- final Float object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a double parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, Float8Vector vector)
- throws SQLException {
- final Double object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a BigDecimal parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, DecimalVector vector)
- throws SQLException {
- final BigDecimal object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a BigDecimal parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, Decimal256Vector vector)
- throws SQLException {
- final BigDecimal object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a timestamp parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, TimeStampVector vector)
- throws SQLException {
- final Object object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a time parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, TimeNanoVector vector)
- throws SQLException {
- final Long object = vector.getObject(vectorIndex);
- statement.setTime(column, new Time(object * 1000L));
- }
-
- /**
- * Set a time parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, TimeMicroVector vector)
- throws SQLException {
- final Long object = vector.getObject(vectorIndex);
- statement.setTime(column, new Time(object / 1000L));
- }
-
- /**
- * Set a time parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, TimeMilliVector vector)
- throws SQLException {
- final LocalDateTime object = vector.getObject(vectorIndex);
- statement.setTime(column, Time.valueOf(object.toLocalTime()));
- }
-
- /**
- * Set a time parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, TimeSecVector vector)
- throws SQLException {
- final Integer object = vector.getObject(vectorIndex);
- statement.setTime(column, new Time(object));
- }
-
- /**
- * Set a date parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, DateDayVector vector)
- throws SQLException {
- final Integer object = vector.getObject(vectorIndex);
- statement.setDate(column, new Date(TimeUnit.DAYS.toMillis(object)));
- }
-
- /**
- * Set a date parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, DateMilliVector vector)
- throws SQLException {
- final LocalDateTime object = vector.getObject(vectorIndex);
- statement.setDate(column, Date.valueOf(object.toLocalDate()));
-
- }
-
- /**
- * Set an unsigned 1 byte number parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, UInt1Vector vector)
- throws SQLException {
- final Byte object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set an unsigned 2 bytes number parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, UInt2Vector vector)
- throws SQLException {
- final Character object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set an unsigned 4 bytes number parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, UInt4Vector vector)
- throws SQLException {
- final Integer object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set an unsigned 8 bytes number parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, UInt8Vector vector)
- throws SQLException {
- final Long object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a boolean parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex, BitVector vector)
- throws SQLException {
- final Boolean object = vector.getObject(vectorIndex);
- statement.setObject(column, object);
- }
-
- /**
- * Set a timestamp parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex,
- TimeStampNanoTZVector vector)
- throws SQLException {
- final Long object = vector.getObject(vectorIndex);
- statement.setTimestamp(column, new Timestamp(object / 1000000L),
- Calendar.getInstance(getTimeZoneForVector(vector)));
- }
-
- /**
- * Set a timestamp parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex,
- TimeStampMicroTZVector vector)
- throws SQLException {
- final Long object = vector.getObject(vectorIndex);
- statement.setTimestamp(column, new Timestamp(object / 1000L),
- Calendar.getInstance(getTimeZoneForVector(vector)));
- }
-
- /**
- * Set a timestamp parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex,
- TimeStampMilliTZVector vector)
- throws SQLException {
- final Long object = vector.getObject(vectorIndex);
- statement.setTimestamp(column, new Timestamp(object),
- Calendar.getInstance(getTimeZoneForVector(vector)));
- }
-
- /**
- * Set a timestamp parameter to the preparedStatement object.
- *
- * @param statement an instance of the {@link PreparedStatement} class.
- * @param column the index of the column in the {@link PreparedStatement}.
- * @param vectorIndex the index from the vector which contain the value.
- * @param vector an instance of the vector the will be accessed.
- * @throws SQLException in case of error.
- */
- public void setOnPreparedStatement(PreparedStatement statement, int column, int vectorIndex,
- TimeStampSecTZVector vector)
- throws SQLException {
- final Long object = vector.getObject(vectorIndex);
- statement.setTimestamp(column, new Timestamp(object * 1000L),
- Calendar.getInstance(getTimeZoneForVector(vector)));
- }
-
@Override
public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery command, CallContext context,
FlightStream flightStream, StreamListener ackStream) {
@@ -1392,7 +902,10 @@ public Runnable acceptPutPreparedStatementQuery(CommandPreparedStatementQuery co
try {
while (flightStream.next()) {
final VectorSchemaRoot root = flightStream.getRoot();
- setDataPreparedStatement(preparedStatement, root, false);
+ final JdbcParameterBinder binder = JdbcParameterBinder.builder(preparedStatement, root).bindAll().build();
+ while (binder.next()) {
+ // Do not execute() - will be done in a getStream call
+ }
}
} catch (SQLException e) {