From fa2b934e8fab399ceaa242fd7bc33e940a2471f0 Mon Sep 17 00:00:00 2001 From: Abner Eduardo Ferreira Date: Tue, 17 Aug 2021 14:23:33 -0300 Subject: [PATCH] [WIP] FlightSQL Ratification based on Community Comments (#73) * Move FlightSql examples to their own subpackage * Fix checkstyle issues * fix: change Status use to CallStatus * Remove unnecessary overhead of wrapping nullable objects into Optionals for the sole purpose of null-checking * Replace Guava's Preconditions with the ones provided by Apache * Fix typo in FlightSql.proto * Fix ordering of schema for FlightSql.proto * Explain why reserved range of IDs for GetSqlInfo is not entirely in use * Add comment to CommandGetTables to explain the encoding of table_schema * Remove redundat information on schemas * Fixed Javadoc on some methods, added Thread interrupt to executeUpdate methods, and updated Signal exceptions to CallStatus with description * Replace int32 with uint32 for GetSqlInfo name representation * Replace AssertionError with StatusRuntimeException for whenever attempting to unpack an invalid protobuf message * add comment to FlightSql.proto to update_rule and delete_rule * Replace inconsistent exception handling with CallStatus predetermined exceptions * correct comment to CreatePreparedStatement on FlightSql.proto * Remove unused dependencies * fix: change Status use to CallStatus on FlightSqlProducer * Changed from if not null check to Objects requireNonNull on Flight SQL Client * Remove Nullable annotation * Changed from checkNotNull to Objects#requireNotNull with description on Flight SQL Example * Add CallOptions to every RPC call by the client * Fix Maven dependency problems and checkstyle violations * Replace generic Collections with Lists when order matters in an RPC call * Fix Javadoc for FlightSqlClient * Add description to StatusRuntimeExceptions * Add descriptions to Exceptions * Correct update_rule and delete_rule description on FlighSql.proto * Verify wheter Root is empty before sending request to server * Add call options to PreparedStatement * Replace constant checking of whether client is open with #checkOpen * Add CallOptions to #close for PreparedStatement * Refactor PreparedStatement usages of CallOptions * Fix broken tests * Fix FlightSql.proto documentation * Update documentation for format/FlightSql.proto Co-authored-by: kylep-dremio <38920967+kylep-dremio@users.noreply.github.com> * Fix checkstyle violations * Require non null tables for GetExportedKeys and GetImportedKeys * Not storing CallOptions in PreparedStatement * Update documentation comments for protobuf * Replace IntVector for UInt1Vector for delete_rule and update_rule * Fix protobuf for FlightSQL * Fix bug with empty metadata * Update update_rule and delete_rule documentation on proto * Remove explicit dependency on JDBC's DatabaseMetaData on UpdateDeleteRules * Use MessageOptions instead of FieldOptions on proto * Add missing JavaDoc about 'options' parameter * Fix CommandGetSqlInfo documentation * Add @throws to FlightSqlClient#checkOpen JavaDoc Co-authored-by: Juscelino Junior Co-authored-by: Vinicius Fraga Co-authored-by: Rafael Telles Co-authored-by: kylep-dremio <38920967+kylep-dremio@users.noreply.github.com> --- format/FlightSql.proto | 69 +++- java/flight/flight-sql/pom.xml | 4 - .../arrow/flight/sql/FlightSqlClient.java | 305 +++++++++++------- .../arrow/flight/sql/FlightSqlProducer.java | 36 ++- .../arrow/flight/sql/FlightSqlUtils.java | 15 +- .../apache/arrow/flight/TestFlightSql.java | 16 +- .../sql/{ => example}/FlightSqlExample.java | 147 +++++---- .../sql/{ => example}/StatementContext.java | 15 +- java/pom.xml | 15 - 9 files changed, 378 insertions(+), 244 deletions(-) rename java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/{ => example}/FlightSqlExample.java (94%) rename java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/{ => example}/StatementContext.java (85%) diff --git a/format/FlightSql.proto b/format/FlightSql.proto index acdb50b22969b..a065972559b10 100644 --- a/format/FlightSql.proto +++ b/format/FlightSql.proto @@ -18,6 +18,7 @@ syntax = "proto3"; import "google/protobuf/wrappers.proto"; +import "google/protobuf/descriptor.proto"; option java_package = "org.apache.arrow.flight.sql.impl"; package arrow.flight.protocol.sql; @@ -30,12 +31,14 @@ package arrow.flight.protocol.sql; * * The returned schema will be: * < - * info_name: int32, + * info_name: uint32, * value: dense_union * > * where there is one row per requested piece of metadata information. */ message CommandGetSqlInfo { + option (experimental) = true; + /* * Values are modelled after ODBC's SQLGetInfo() function. This information is intended to provide * Flight SQL clients with basic, SQL syntax and SQL functions related information. @@ -46,7 +49,7 @@ message CommandGetSqlInfo { * * Initially, Flight SQL will support the following information types: * - Server Information - Range [0-500) - * - Syntax Information - Ragne [500-1000) + * - Syntax Information - Range [500-1000) * Range [0-100000) is reserved for defaults. Custom options should start at 100000. * * 1. Server Information [0-500): Provides basic information about the Flight SQL Server. @@ -89,7 +92,7 @@ message CommandGetSqlInfo { * * If omitted, then all metadata will be retrieved. * Flight SQL Servers may choose to include additional metadata above and beyond the specified set, however they must - * at least return the specified set. IDs ranging from 0 to 10,000 (exclusive) are reserved. + * at least return the specified set. IDs ranging from 0 to 10,000 (exclusive) are reserved for future use. * If additional metadata is included, the metadata IDs should start from 10,000. */ repeated uint32 info = 1; @@ -108,6 +111,7 @@ message CommandGetSqlInfo { * The returned data should be ordered by catalog_name. */ message CommandGetCatalogs { + option (experimental) = true; } /* @@ -124,6 +128,8 @@ message CommandGetCatalogs { * The returned data should be ordered by catalog_name, then schema_name. */ message CommandGetSchemas { + option (experimental) = true; + /* * Specifies the Catalog to search for schemas. * If omitted, then all catalogs are searched. @@ -152,11 +158,13 @@ message CommandGetSchemas { * schema_name: utf8, * table_name: utf8, * table_type: utf8, - * table_schema: bytes + * table_schema: bytes (schema of the table as described in Schema.fbs::Schema, it is serialized as an IPC message.) * > * The returned data should be ordered by catalog_name, schema_name, table_name, then table_type. */ message CommandGetTables { + option (experimental) = true; + /* * Specifies the Catalog to search for the tables. * If omitted, then all catalogs are searched. @@ -201,6 +209,7 @@ message CommandGetTables { * The returned data should be ordered by table_type. */ message CommandGetTableTypes { + option (experimental) = true; } /* @@ -215,12 +224,14 @@ message CommandGetTableTypes { * schema_name: utf8, * table_name: utf8, * column_name: utf8, - * key_sequence: int, * key_name: utf8 + * key_sequence: int, * > * The returned data should be ordered by catalog_name, schema_name, table_name, key_name, then key_sequence. */ message CommandGetPrimaryKeys { + option (experimental) = true; + // Specifies the catalog to search for the table. google.protobuf.StringValue catalog = 1; @@ -251,12 +262,20 @@ message CommandGetPrimaryKeys { * key_sequence: int, * fk_key_name: utf8, * pk_key_name: utf8, - * update_rule: int, - * delete_rule: int + * update_rule: uint1, + * delete_rule: uint1 * > * The returned data should be ordered by catalog_name, schema_name, table_name, key_name, then key_sequence. + * update_rule and delete_rule returns a byte that is equivalent to actions: + * - 0 = CASCADE + * - 1 = RESTRICT + * - 2 = SET NULL + * - 3 = NO ACTION + * - 4 = SET DEFAULT */ message CommandGetExportedKeys { + option (experimental) = true; + // Specifies the catalog to search for the foreign key table. google.protobuf.StringValue catalog = 1; @@ -286,12 +305,20 @@ message CommandGetExportedKeys { * key_sequence: int, * fk_key_name: utf8, * pk_key_name: utf8, - * update_rule: int, - * delete_rule: int + * update_rule: uint1, + * delete_rule: uint1 * > * The returned data should be ordered by catalog_name, schema_name, table_name, key_name, then key_sequence. + * update_rule and delete_rule returns a byte that is equivalent to actions: + * - 0 = CASCADE + * - 1 = RESTRICT + * - 2 = SET NULL + * - 3 = NO ACTION + * - 4 = SET DEFAULT */ message CommandGetImportedKeys { + option (experimental) = true; + // Specifies the catalog to search for the primary key table. google.protobuf.StringValue catalog = 1; @@ -305,9 +332,11 @@ message CommandGetImportedKeys { // SQL Execution Action Messages /* - * Request message for the "GetPreparedStatement" action on a Flight SQL enabled backend. + * Request message for the "CreatePreparedStatement" action on a Flight SQL enabled backend. */ message ActionCreatePreparedStatementRequest { + option (experimental) = true; + // The valid SQL string to create a prepared statement for. string query = 1; } @@ -316,6 +345,8 @@ message ActionCreatePreparedStatementRequest { * Wrap the result of a "GetPreparedStatement" action. */ message ActionCreatePreparedStatementResult { + option (experimental) = true; + // Opaque handle for the prepared statement on the server. bytes prepared_statement_handle = 1; @@ -324,7 +355,7 @@ message ActionCreatePreparedStatementResult { bytes dataset_schema = 2; // If the query provided contained parameters, parameter_schema contains the - // Schema of the expected parameters as described in Schema.fbs::Schema. + // schema of the expected parameters as described in Schema.fbs::Schema, it is serialized as an IPC message. bytes parameter_schema = 3; } @@ -333,6 +364,8 @@ message ActionCreatePreparedStatementResult { * Closes server resources associated with the prepared statement handle. */ message ActionClosePreparedStatementRequest { + option (experimental) = true; + // Opaque handle for the prepared statement on the server. bytes prepared_statement_handle = 1; } @@ -347,6 +380,8 @@ message ActionClosePreparedStatementRequest { * - GetFlightInfo: execute the query. */ message CommandStatementQuery { + option (experimental) = true; + // The SQL syntax. string query = 1; @@ -361,6 +396,8 @@ message CommandStatementQuery { * - GetFlightInfo: execute the prepared statement instance. */ message CommandPreparedStatementQuery { + option (experimental) = true; + // Unique identifier for the instance of the prepared statement to execute. bytes client_execution_handle = 1; @@ -373,6 +410,8 @@ message CommandPreparedStatementQuery { * for the the RPC call DoPut to cause the server to execute the included SQL update. */ message CommandStatementUpdate { + option (experimental) = true; + // The SQL syntax. string query = 1; } @@ -383,6 +422,8 @@ message CommandStatementUpdate { * prepared statement handle as an update. */ message CommandPreparedStatementUpdate { + option (experimental) = true; + // Unique identifier for the instance of the prepared statement to execute. bytes client_execution_handle = 1; @@ -396,7 +437,13 @@ message CommandPreparedStatementUpdate { * results from the update. */ message DoPutUpdateResult { + option (experimental) = true; + // The number of records updated. A return value of -1 represents // an unknown updated record count. int64 record_count = 1; } + +extend google.protobuf.MessageOptions { + bool experimental = 1000; +} diff --git a/java/flight/flight-sql/pom.xml b/java/flight/flight-sql/pom.xml index 7f70650f30b0c..903ce550e7d81 100644 --- a/java/flight/flight-sql/pom.xml +++ b/java/flight/flight-sql/pom.xml @@ -129,10 +129,6 @@ org.hamcrest hamcrest - - com.google.code.findbugs - jsr305 - diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java index 5364fc611268a..78848dd84e604 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java @@ -31,20 +31,20 @@ import static org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementUpdate; import static org.apache.arrow.flight.sql.impl.FlightSql.DoPutUpdateResult; -import java.io.Closeable; -import java.io.IOException; import java.nio.ByteBuffer; import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; - -import javax.annotation.Nullable; +import java.util.stream.Collectors; import org.apache.arrow.flight.Action; import org.apache.arrow.flight.CallOption; +import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.FlightClient; import org.apache.arrow.flight.FlightDescriptor; import org.apache.arrow.flight.FlightInfo; @@ -58,6 +58,7 @@ import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult; import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.Schema; @@ -66,44 +67,44 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.StringValue; -import io.grpc.Status; - /** * Flight client with Flight SQL semantics. */ public class FlightSqlClient { - private FlightClient client; + private final FlightClient client; - public FlightSqlClient(FlightClient client) { - this.client = client; + public FlightSqlClient(final FlightClient client) { + this.client = Objects.requireNonNull(client, "Client cannot be null!"); } /** * Execute a query on the server. * - * @param query The query to execute. + * @param query The query to execute. + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public FlightInfo execute(String query) { + public FlightInfo execute(final String query, final CallOption... options) { final CommandStatementQuery.Builder builder = CommandStatementQuery.newBuilder(); builder.setQuery(query); final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** * Execute an update query on the server. * - * @param query The query to execute. + * @param query The query to execute. + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public long executeUpdate(String query) { + public long executeUpdate(final String query, final CallOption... options) { final CommandStatementUpdate.Builder builder = CommandStatementUpdate.newBuilder(); builder.setQuery(query); final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); final SyncPutListener putListener = new SyncPutListener(); - client.startPut(descriptor, VectorSchemaRoot.of(), putListener); + client.startPut(descriptor, VectorSchemaRoot.of(), putListener, options); try { final PutResult read = putListener.read(); @@ -111,20 +112,23 @@ public long executeUpdate(String query) { final DoPutUpdateResult doPutUpdateResult = DoPutUpdateResult.parseFrom(metadata.nioBuffer()); return doPutUpdateResult.getRecordCount(); } - } catch (InterruptedException | ExecutionException | InvalidProtocolBufferException e) { - throw new RuntimeException(e); + } catch (final InterruptedException | ExecutionException e) { + throw CallStatus.CANCELLED.withCause(e).toRuntimeException(); + } catch (final InvalidProtocolBufferException e) { + throw CallStatus.INVALID_ARGUMENT.withCause(e).toRuntimeException(); } } /** * Request a list of catalogs. * + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public FlightInfo getCatalogs() { + public FlightInfo getCatalogs(final CallOption... options) { final CommandGetCatalogs.Builder builder = CommandGetCatalogs.newBuilder(); final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** @@ -132,9 +136,10 @@ public FlightInfo getCatalogs() { * * @param catalog The catalog. * @param schemaFilterPattern The schema filter pattern. + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public FlightInfo getSchemas(final String catalog, final String schemaFilterPattern) { + public FlightInfo getSchemas(final String catalog, final String schemaFilterPattern, final CallOption... options) { final CommandGetSchemas.Builder builder = CommandGetSchemas.newBuilder(); if (catalog != null) { @@ -146,7 +151,7 @@ public FlightInfo getSchemas(final String catalog, final String schemaFilterPatt } final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** @@ -156,7 +161,7 @@ public FlightInfo getSchemas(final String catalog, final String schemaFilterPatt * @param options RPC-layer hints for this call. */ public SchemaResult getSchema(FlightDescriptor descriptor, CallOption... options) { - return this.client.getSchema(descriptor, options); + return client.getSchema(descriptor, options); } /** @@ -166,22 +171,41 @@ public SchemaResult getSchema(FlightDescriptor descriptor, CallOption... options * @param options RPC-layer hints for this call. */ public FlightStream getStream(Ticket ticket, CallOption... options) { - return this.client.getStream(ticket, options); + return client.getStream(ticket, options); + } + + /** + * Request a set of Flight SQL metadata. + * + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo getSqlInfo(final int... info) { + return getSqlInfo(info, new CallOption[0]); + } + + /** + * Request a set of Flight SQL metadata. + * + * @param info The set of metadata to retrieve. None to retrieve all metadata. + * @param options RPC-layer hints for this call. + * @return a FlightInfo object representing the stream(s) to fetch. + */ + public FlightInfo getSqlInfo(final int[] info, final CallOption... options) { + return getSqlInfo(Arrays.stream(info).boxed().collect(Collectors.toList()), options); } /** * Request a set of Flight SQL metadata. * - * @param info The set of metadata to retrieve. None to retrieve all metadata. + * @param info The set of metadata to retrieve. None to retrieve all metadata. + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public FlightInfo getSqlInfo(final @Nullable int... info) { + public FlightInfo getSqlInfo(final List info, final CallOption... options) { final CommandGetSqlInfo.Builder builder = CommandGetSqlInfo.newBuilder(); - for (final int pieceOfInfo : Objects.isNull(info) ? new int[0] : info) { - builder.addInfo(pieceOfInfo); - } + builder.addAllInfo(info); final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** @@ -192,11 +216,12 @@ public FlightInfo getSqlInfo(final @Nullable int... info) { * @param tableFilterPattern The table filter pattern. * @param tableTypes The table types to include. * @param includeSchema True to include the schema upon return, false to not include the schema. + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public FlightInfo getTables(final @Nullable String catalog, final @Nullable String schemaFilterPattern, - final @Nullable String tableFilterPattern, final List tableTypes, - final boolean includeSchema) { + public FlightInfo getTables(final String catalog, final String schemaFilterPattern, + final String tableFilterPattern, final List tableTypes, + final boolean includeSchema, final CallOption... options) { final CommandGetTables.Builder builder = CommandGetTables.newBuilder(); if (catalog != null) { @@ -217,7 +242,7 @@ public FlightInfo getTables(final @Nullable String catalog, final @Nullable Stri builder.setIncludeSchema(includeSchema); final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** @@ -226,10 +251,11 @@ public FlightInfo getTables(final @Nullable String catalog, final @Nullable Stri * @param catalog The catalog. * @param schema The schema. * @param table The table. + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public FlightInfo getPrimaryKeys(final @Nullable String catalog, final @Nullable String schema, - final @Nullable String table) { + public FlightInfo getPrimaryKeys(final String catalog, final String schema, + final String table, final CallOption... options) { final CommandGetPrimaryKeys.Builder builder = CommandGetPrimaryKeys.newBuilder(); if (catalog != null) { @@ -244,21 +270,20 @@ public FlightInfo getPrimaryKeys(final @Nullable String catalog, final @Nullable builder.setTable(StringValue.newBuilder().setValue(table).build()); } final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** - * Request to get info about keys on a table. The table, which exports the foreign keys, parameter must be specified. + * Retrieves a description about the foreign key columns that reference the primary key columns of the given table. * * @param catalog The foreign key table catalog. * @param schema The foreign key table schema. - * @param table The foreign key table. + * @param table The foreign key table. Cannot be null. + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public FlightInfo getExportedKeys(String catalog, String schema, String table) { - if (null == table) { - throw Status.INVALID_ARGUMENT.asRuntimeException(); - } + public FlightInfo getExportedKeys(String catalog, String schema, String table, final CallOption... options) { + Objects.requireNonNull(table, "Table cannot be null."); final CommandGetExportedKeys.Builder builder = CommandGetExportedKeys.newBuilder(); @@ -270,24 +295,25 @@ public FlightInfo getExportedKeys(String catalog, String schema, String table) { builder.setSchema(StringValue.newBuilder().setValue(schema).build()); } + Objects.requireNonNull(table); builder.setTable(table).build(); final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** - * Request to get info about keys on a table. The table, which imports the foreign keys, parameter must be specified. + * Retrieves the foreign key columns for the given table. * * @param catalog The primary key table catalog. * @param schema The primary key table schema. - * @param table The primary key table. + * @param table The primary key table. Cannot be null. + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public FlightInfo getImportedKeys(String catalog, String schema, String table) { - if (null == table) { - throw Status.INVALID_ARGUMENT.asRuntimeException(); - } + public FlightInfo getImportedKeys(final String catalog, final String schema, final String table, + final CallOption... options) { + Objects.requireNonNull(table, "Table cannot be null."); final CommandGetImportedKeys.Builder builder = CommandGetImportedKeys.newBuilder(); @@ -299,61 +325,65 @@ public FlightInfo getImportedKeys(String catalog, String schema, String table) { builder.setSchema(StringValue.newBuilder().setValue(schema).build()); } + Objects.requireNonNull(table); builder.setTable(table).build(); final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** * Request a list of table types. * + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. */ - public FlightInfo getTableTypes() { + public FlightInfo getTableTypes(final CallOption... options) { final CommandGetTableTypes.Builder builder = CommandGetTableTypes.newBuilder(); final FlightDescriptor descriptor = FlightDescriptor.command(Any.pack(builder.build()).toByteArray()); - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** * Create a prepared statement on the server. * - * @param query The query to prepare. + * @param query The query to prepare. + * @param options RPC-layer hints for this call. * @return The representation of the prepared statement which exists on the server. */ - public PreparedStatement prepare(String query) { - return new PreparedStatement(client, query); + public PreparedStatement prepare(final String query, final CallOption... options) { + return new PreparedStatement(client, query, options); } /** * Helper class to encapsulate Flight SQL prepared statement logic. */ - public static class PreparedStatement implements Closeable { + public static class PreparedStatement implements AutoCloseable { private final FlightClient client; private final ActionCreatePreparedStatementResult preparedStatementResult; - private AtomicLong invocationCount; - private boolean isClosed; - private Schema resultSetSchema = null; - private Schema parameterSchema = null; + private final AtomicLong invocationCount; private VectorSchemaRoot parameterBindingRoot; + private boolean isClosed; + private Schema resultSetSchema; + private Schema parameterSchema; /** * Constructor. * - * @param client The client. FlightSqlPreparedStatement does not maintain this resource. - * @param sql The query. + * @param client The client. PreparedStatement does not maintain this resource. + * @param sql The query. + * @param options RPC-layer hints for this call. */ - public PreparedStatement(FlightClient client, String sql) { + public PreparedStatement(final FlightClient client, final String sql, final CallOption... options) { this.client = client; - - final Iterator preparedStatementResults = client.doAction(new Action( + final Action action = new Action( FlightSqlUtils.FLIGHT_SQL_CREATEPREPAREDSTATEMENT.getType(), Any.pack(ActionCreatePreparedStatementRequest - .newBuilder() - .setQuery(sql) - .build()) - .toByteArray())); + .newBuilder() + .setQuery(sql) + .build()) + .toByteArray()); + final Iterator preparedStatementResults = client.doAction(action, options); preparedStatementResult = FlightSqlUtils.unpackAndParseOrThrow( preparedStatementResults.next().getBody(), @@ -364,23 +394,31 @@ public PreparedStatement(FlightClient client, String sql) { } /** - * Set the {@link VectorSchemaRoot} containing the parameter binding from a preparedStatemnt + * Set the {@link #parameterBindingRoot} containing the parameter binding from a {@link PreparedStatement} * operation. * - * @param parameterBindingRoot a {@link VectorSchemaRoot} object contain the values to be used in the - * PreparedStatement setters. + * @param parameterBindingRoot a {@code VectorSchemaRoot} object containing the values to be used in the + * {@code PreparedStatement} setters. */ - public void setParameters(VectorSchemaRoot parameterBindingRoot) { - this.parameterBindingRoot = parameterBindingRoot; + public void setParameters(final VectorSchemaRoot parameterBindingRoot) { + if (this.parameterBindingRoot != null) { + if (this.parameterBindingRoot.equals(parameterBindingRoot)) { + return; + } + this.parameterBindingRoot.close(); + } + this.parameterBindingRoot = + Objects.requireNonNull(parameterBindingRoot, "Parameter binding root cannot be null!"); } /** - * Empty the {@link VectorSchemaRoot} that contains the parameter binding from a preparedStatemnt - * operation. - * + * Empty the {@link #parameterBindingRoot}, which contains the parameter binding from + * a {@link PreparedStatement} operation. */ public void clearParameters() { - this.parameterBindingRoot = null; + if (parameterBindingRoot != null) { + parameterBindingRoot.close(); + } } /** @@ -389,8 +427,12 @@ public void clearParameters() { * @return the Schema of the resultset. */ public Schema getResultSetSchema() { - if (resultSetSchema == null && preparedStatementResult.getDatasetSchema() != null) { - resultSetSchema = Schema.deserialize(preparedStatementResult.getDatasetSchema().asReadOnlyByteBuffer()); + if (resultSetSchema == null) { + final ByteString bytes = preparedStatementResult.getDatasetSchema(); + if (bytes.isEmpty()) { + return new Schema(Collections.emptyList()); + } + resultSetSchema = Schema.deserialize(bytes.asReadOnlyByteBuffer()); } return resultSetSchema; } @@ -401,8 +443,12 @@ public Schema getResultSetSchema() { * @return the Schema of the parameters. */ public Schema getParameterSchema() { - if (parameterSchema == null && preparedStatementResult.getParameterSchema() != null) { - parameterSchema = Schema.deserialize(preparedStatementResult.getParameterSchema().asReadOnlyByteBuffer()); + if (parameterSchema == null) { + final ByteString bytes = preparedStatementResult.getParameterSchema(); + if (bytes.isEmpty()) { + return new Schema(Collections.emptyList()); + } + parameterSchema = Schema.deserialize(bytes.asReadOnlyByteBuffer()); } return parameterSchema; } @@ -410,62 +456,62 @@ public Schema getParameterSchema() { /** * Executes the prepared statement query on the server. * + * @param options RPC-layer hints for this call. * @return a FlightInfo object representing the stream(s) to fetch. - * @throws IOException if the PreparedStatement is closed. */ - public FlightInfo execute() throws IOException { - if (isClosed) { - throw new IllegalStateException("Prepared statement has already been closed on the server."); - } + public FlightInfo execute(final CallOption... options) throws SQLException { + checkOpen(); final FlightDescriptor descriptor = FlightDescriptor .command(Any.pack(CommandPreparedStatementQuery.newBuilder() - .setClientExecutionHandle( - ByteString.copyFrom(ByteBuffer.allocate(Long.BYTES).putLong(invocationCount.getAndIncrement()))) - .setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle()) - .build()) + .setClientExecutionHandle( + ByteString.copyFrom(ByteBuffer.allocate(Long.BYTES).putLong(invocationCount.getAndIncrement()))) + .setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle()) + .build()) .toByteArray()); - if (parameterBindingRoot != null) { + if (parameterBindingRoot != null && parameterBindingRoot.getRowCount() > 0) { final SyncPutListener putListener = new SyncPutListener(); FlightClient.ClientStreamListener listener = - client.startPut(descriptor, this.parameterBindingRoot, putListener); + client.startPut(descriptor, parameterBindingRoot, putListener, options); listener.putNext(); listener.completed(); } - return client.getInfo(descriptor); + return client.getInfo(descriptor, options); } /** - * Executes the prepared statement update on the server. + * Checks whether this client is open. + * + * @throws IllegalStateException if client is closed. */ - public long executeUpdate() throws SQLException { - if (isClosed) { - throw new IllegalStateException("Prepared statement has already been closed on the server."); - } + protected final void checkOpen() { + Preconditions.checkState(!isClosed, "Statement closed"); + } + /** + * Executes the prepared statement update on the server. + * + * @param options RPC-layer hints for this call. + */ + public long executeUpdate(final CallOption... options) { + checkOpen(); final FlightDescriptor descriptor = FlightDescriptor .command(Any.pack(FlightSql.CommandPreparedStatementUpdate.newBuilder() - .setClientExecutionHandle( - ByteString.copyFrom(ByteBuffer.allocate(Long.BYTES).putLong(invocationCount.getAndIncrement()))) - .setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle()) - .build()) + .setClientExecutionHandle( + ByteString.copyFrom(ByteBuffer.allocate(Long.BYTES).putLong(invocationCount.getAndIncrement()))) + .setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle()) + .build()) .toByteArray()); - - if (this.parameterBindingRoot == null) { - this.parameterBindingRoot = VectorSchemaRoot.of(); - } - + setParameters(parameterBindingRoot == null ? VectorSchemaRoot.of() : parameterBindingRoot); final SyncPutListener putListener = new SyncPutListener(); final FlightClient.ClientStreamListener listener = - client.startPut(descriptor, this.parameterBindingRoot, putListener); - + client.startPut(descriptor, parameterBindingRoot, putListener, options); listener.putNext(); listener.completed(); - try { final PutResult read = putListener.read(); try (final ArrowBuf metadata = read.getApplicationMetadata()) { @@ -473,23 +519,40 @@ public long executeUpdate() throws SQLException { FlightSql.DoPutUpdateResult.parseFrom(metadata.nioBuffer()); return doPutUpdateResult.getRecordCount(); } - } catch (InterruptedException | InvalidProtocolBufferException | ExecutionException e) { - throw new SQLException(e); + } catch (final InterruptedException | ExecutionException e) { + throw CallStatus.CANCELLED.withCause(e).toRuntimeException(); + } catch (final InvalidProtocolBufferException e) { + throw CallStatus.INVALID_ARGUMENT.withCause(e).toRuntimeException(); } } - @Override - public void close() { + /** + * Closes the client. + * + * @param options RPC-layer hints for this call. + */ + public void close(final CallOption... options) { + if (isClosed) { + return; + } isClosed = true; - final Iterator closePreparedStatementResults = client.doAction(new Action( + final Action action = new Action( FlightSqlUtils.FLIGHT_SQL_CLOSEPREPAREDSTATEMENT.getType(), - Any.pack(ActionClosePreparedStatementRequest - .newBuilder() - .setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle()) - .build()) - .toByteArray())); + Any.pack(ActionClosePreparedStatementRequest.newBuilder() + .setPreparedStatementHandle(preparedStatementResult.getPreparedStatementHandle()) + .build()) + .toByteArray()); + final Iterator closePreparedStatementResults = client.doAction(action, options); closePreparedStatementResults.forEachRemaining(result -> { }); + if (parameterBindingRoot != null) { + parameterBindingRoot.close(); + } + } + + @Override + public void close() { + close(new CallOption[0]); } /** diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java index 49886ace6b85d..bba60fdec3b3c 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlProducer.java @@ -27,6 +27,7 @@ import org.apache.arrow.flight.Action; import org.apache.arrow.flight.ActionType; +import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.FlightDescriptor; import org.apache.arrow.flight.FlightInfo; import org.apache.arrow.flight.FlightProducer; @@ -50,6 +51,7 @@ import org.apache.arrow.flight.sql.impl.FlightSql.DoPutUpdateResult; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.UnionMode; +import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.ArrowType.Union; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; @@ -58,8 +60,6 @@ import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; -import io.grpc.Status; - /** * API to Implement an Arrow Flight SQL producer. */ @@ -109,7 +109,7 @@ default FlightInfo getFlightInfo(CallContext context, FlightDescriptor descripto FlightSqlUtils.unpackOrThrow(command, CommandGetImportedKeys.class), context, descriptor); } - throw Status.INVALID_ARGUMENT.asRuntimeException(); + throw CallStatus.INVALID_ARGUMENT.withDescription("The defined request is invalid.").toRuntimeException(); } /** @@ -144,7 +144,7 @@ default SchemaResult getSchema(CallContext context, FlightDescriptor descriptor) return getSchemaForImportedAndExportedKeys(); } - throw Status.INVALID_ARGUMENT.asRuntimeException(); + throw CallStatus.INVALID_ARGUMENT.withDescription("Invalid command provided.").toRuntimeException(); } /** @@ -193,7 +193,7 @@ default void getStream(CallContext context, Ticket ticket, ServerStreamListener getStreamImportedKeys(FlightSqlUtils.unpackOrThrow(command, CommandGetImportedKeys.class), context, ticket, listener); } else { - throw Status.INVALID_ARGUMENT.asRuntimeException(); + throw CallStatus.INVALID_ARGUMENT.withDescription("The defined request is invalid.").toRuntimeException(); } } @@ -227,7 +227,7 @@ default Runnable acceptPut(CallContext context, FlightStream flightStream, Strea context, flightStream, ackStream); } - throw Status.INVALID_ARGUMENT.asRuntimeException(); + throw CallStatus.INVALID_ARGUMENT.withDescription("The defined request is invalid.").toRuntimeException(); } /** @@ -262,7 +262,7 @@ default void doAction(CallContext context, Action action, StreamListener closePreparedStatement(request, context, listener); } - throw Status.INVALID_ARGUMENT.asRuntimeException(); + throw CallStatus.INVALID_ARGUMENT.withDescription("Invalid action provided.").toRuntimeException(); } /** @@ -667,11 +667,11 @@ final class Schemas { Field.nullable("key_sequence", MinorType.INT.getType()), Field.nullable("fk_key_name", MinorType.VARCHAR.getType()), Field.nullable("pk_key_name", MinorType.VARCHAR.getType()), - Field.nullable("update_rule", MinorType.INT.getType()), - Field.nullable("delete_rule", MinorType.INT.getType()))); + Field.nullable("update_rule", new ArrowType.Int(8, false)), + Field.nullable("delete_rule", new ArrowType.Int(8, false)))); public static final Schema GET_SQL_INFO_SCHEMA = new Schema(Arrays.asList( - Field.nullable("info_name", MinorType.INT.getType()), + Field.nullable("info_name", new ArrowType.Int(32, false)), new Field("value", // dense_union new FieldType(true, new Union(UnionMode.Dense, new int[] {0, 1, 2, 3}), /*dictionary=*/null), @@ -701,4 +701,20 @@ final class SqlInfo { public static final int SQL_IDENTIFIER_QUOTE_CHAR = 504; public static final int SQL_QUOTED_IDENTIFIER_CASE = 505; } + + /** + * Update/delete rules for {@link FlightSqlProducer#getStreamImportedKeys} and + * {@link FlightSqlProducer#getStreamExportedKeys}. + */ + final class UpdateDeleteRules { + public static final byte CASCADE = 0; // Borrowed from DatabaseMetaData.importedKeyCascade + public static final byte RESTRICT = 1; // Borrowed from DatabaseMetaData.importedKeyRestrict + public static final byte SET_NULL = 2; // Borrowed from DatabaseMetaData.importedKeySetNull + public static final byte NO_ACTION = 3; // Borrowed from DatabaseMetaData.importedKeyNoAction + public static final byte SET_DEFAULT = 4; // Borrowed from DatabaseMetaData.importedKeyNoAction + + private UpdateDeleteRules() { + // Prevent instantiation. + } + } } diff --git a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java index 9360d4070b8cd..5f73c97e0b641 100644 --- a/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java +++ b/java/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlUtils.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.arrow.flight.ActionType; +import org.apache.arrow.flight.CallStatus; import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; @@ -54,8 +55,11 @@ public final class FlightSqlUtils { public static Any parseOrThrow(byte[] source) { try { return Any.parseFrom(source); - } catch (InvalidProtocolBufferException e) { - throw new AssertionError(e.getMessage()); + } catch (final InvalidProtocolBufferException e) { + throw CallStatus.INVALID_ARGUMENT + .withDescription("Received invalid message from remote.") + .withCause(e) + .toRuntimeException(); } } @@ -70,8 +74,11 @@ public static Any parseOrThrow(byte[] source) { public static T unpackOrThrow(Any source, Class as) { try { return source.unpack(as); - } catch (InvalidProtocolBufferException e) { - throw new AssertionError(e.getMessage()); + } catch (final InvalidProtocolBufferException e) { + throw CallStatus.INVALID_ARGUMENT + .withDescription("Provided message cannot be unpacked as desired type.") + .withCause(e) + .toRuntimeException(); } } diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java index 465f2dbaee0d1..06fc5b37b4149 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/TestFlightSql.java @@ -42,12 +42,14 @@ import org.apache.arrow.flight.sql.FlightSqlClient; import org.apache.arrow.flight.sql.FlightSqlClient.PreparedStatement; -import org.apache.arrow.flight.sql.FlightSqlExample; import org.apache.arrow.flight.sql.FlightSqlProducer; +import org.apache.arrow.flight.sql.example.FlightSqlExample; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.UInt1Vector; +import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -614,6 +616,18 @@ List> getResults(FlightStream stream) { final Object data = denseUnionVector.getObject(rowIndex); results.get(rowIndex).add(isNull(data) ? null : Objects.toString(data)); } + } else if (fieldVector instanceof UInt4Vector) { + final UInt4Vector uInt4Vector = (UInt4Vector) fieldVector; + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { + final Object data = uInt4Vector.getObject(rowIndex); + results.get(rowIndex).add(isNull(data) ? null : Objects.toString(data)); + } + } else if (fieldVector instanceof UInt1Vector) { + final UInt1Vector uInt1Vector = (UInt1Vector) fieldVector; + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { + final Object data = uInt1Vector.getObject(rowIndex); + results.get(rowIndex).add(isNull(data) ? null : Objects.toString(data)); + } } else { throw new UnsupportedOperationException("Not yet implemented"); } diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/FlightSqlExample.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java similarity index 94% rename from java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/FlightSqlExample.java rename to java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java index fbb60771a3182..3f1771c35b067 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/FlightSqlExample.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/FlightSqlExample.java @@ -15,10 +15,8 @@ * limitations under the License. */ -package org.apache.arrow.flight.sql; +package org.apache.arrow.flight.sql.example; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.emptyToNull; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.protobuf.Any.pack; @@ -26,11 +24,11 @@ import static java.lang.String.format; import static java.util.Collections.singletonList; import static java.util.Objects.isNull; -import static java.util.Optional.empty; import static java.util.UUID.randomUUID; import static java.util.stream.StreamSupport.stream; import static org.apache.arrow.adapter.jdbc.JdbcToArrow.sqlToArrowVectorIterator; import static org.apache.arrow.adapter.jdbc.JdbcToArrowUtils.jdbcToArrowSchema; +import static org.apache.arrow.util.Preconditions.checkState; import static org.slf4j.LoggerFactory.getLogger; import java.io.File; @@ -61,7 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.TimeZone; @@ -71,8 +69,6 @@ import java.util.function.Consumer; import java.util.stream.Stream; -import javax.annotation.Nullable; - import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; import org.apache.arrow.adapter.jdbc.JdbcFieldInfo; import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; @@ -82,14 +78,13 @@ import org.apache.arrow.flight.FlightDescriptor; import org.apache.arrow.flight.FlightEndpoint; import org.apache.arrow.flight.FlightInfo; -import org.apache.arrow.flight.FlightRuntimeException; -import org.apache.arrow.flight.FlightStatusCode; import org.apache.arrow.flight.FlightStream; import org.apache.arrow.flight.Location; import org.apache.arrow.flight.PutResult; import org.apache.arrow.flight.Result; import org.apache.arrow.flight.SchemaResult; import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.sql.FlightSqlProducer; import org.apache.arrow.flight.sql.impl.FlightSql; import org.apache.arrow.flight.sql.impl.FlightSql.ActionClosePreparedStatementRequest; import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementRequest; @@ -108,7 +103,6 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.util.AutoCloseables; -import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; @@ -170,8 +164,6 @@ import com.google.protobuf.Message; import com.google.protobuf.ProtocolStringList; -import io.grpc.Status; - /** * Proof of concept {@link FlightSqlProducer} implementation showing an Apache Derby backed Flight SQL server capable * of the following workflows: @@ -197,7 +189,7 @@ public class FlightSqlExample implements FlightSqlProducer, AutoCloseable { public FlightSqlExample(final Location location) { // TODO Constructor should not be doing work. - Preconditions.checkState( + checkState( removeDerbyDatabaseIfExists() && populateDerbyDatabase(), "Failed to reset Derby database!"); final ConnectionFactory connectionFactory = @@ -271,7 +263,6 @@ private static boolean removeDerbyDatabaseIfExists() { } private static boolean populateDerbyDatabase() { - Optional exception = empty(); try (final Connection connection = DriverManager.getConnection("jdbc:derby:target/derbyDB;create=true"); Statement statement = connection.createStatement()) { statement.execute("CREATE TABLE foreignTable (" + @@ -289,13 +280,11 @@ private static boolean populateDerbyDatabase() { statement.execute("INSERT INTO intTable (keyName, value, foreignId) VALUES ('one', 1, 1)"); statement.execute("INSERT INTO intTable (keyName, value, foreignId) VALUES ('zero', 0, 1)"); statement.execute("INSERT INTO intTable (keyName, value, foreignId) VALUES ('negative one', -1, 1)"); - } catch (SQLException e) { - LOGGER.error( - format("Failed attempt to populate DerbyDB: <%s>", e.getMessage()), - (exception = Optional.of(e)).get()); + } catch (final SQLException e) { + LOGGER.error(format("Failed attempt to populate DerbyDB: <%s>", e.getMessage()), e); + return false; } - - return !exception.isPresent(); + return true; } private static ArrowType getArrowTypeFromJdbcType(final int jdbcDataType, final int precision, final int scale) { @@ -305,7 +294,15 @@ private static ArrowType getArrowTypeFromJdbcType(final int jdbcDataType, final return isNull(type) ? ArrowType.Utf8.INSTANCE : type; } - private static void saveToVector(final byte typeRegisteredId, final @Nullable String data, + private static void saveToVector(final Byte data, final UInt1Vector vector, final int index) { + vectorConsumer( + data, + vector, + fieldVector -> fieldVector.setNull(index), + (theData, fieldVector) -> fieldVector.setSafe(index, theData)); + } + + private static void saveToVector(final byte typeRegisteredId, final String data, final DenseUnionVector vector, final int index) { vectorConsumer( data, @@ -327,7 +324,7 @@ private static void saveToVector(final byte typeRegisteredId, final @Nullable St }); } - private static void saveToVector(final byte typeRegisteredId, final @Nullable Integer data, + private static void saveToVector(final byte typeRegisteredId, final Integer data, final DenseUnionVector vector, final int index) { vectorConsumer( data, @@ -344,26 +341,32 @@ private static void saveToVector(final byte typeRegisteredId, final @Nullable In }); } - private static void saveToVector(final @Nullable String data, final VarCharVector vector, final int index) { + private static void saveToVector(final Integer data, final UInt4Vector vector, final int index) { + preconditionCheckSaveToVector(vector, index); + vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index), + (theData, fieldVector) -> fieldVector.setSafe(index, data)); + } + + private static void saveToVector(final String data, final VarCharVector vector, final int index) { preconditionCheckSaveToVector(vector, index); vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index), (theData, fieldVector) -> fieldVector.setSafe(index, new Text(theData))); } - private static void saveToVector(final @Nullable Integer data, final IntVector vector, final int index) { + private static void saveToVector(final Integer data, final IntVector vector, final int index) { preconditionCheckSaveToVector(vector, index); vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index), (theData, fieldVector) -> fieldVector.setSafe(index, theData)); } - private static void saveToVector(final @Nullable byte[] data, final VarBinaryVector vector, final int index) { + private static void saveToVector(final byte[] data, final VarBinaryVector vector, final int index) { preconditionCheckSaveToVector(vector, index); vectorConsumer(data, vector, fieldVector -> fieldVector.setNull(index), (theData, fieldVector) -> fieldVector.setSafe(index, theData)); } private static void preconditionCheckSaveToVector(final FieldVector vector, final int index) { - checkNotNull(vector); + Objects.requireNonNull(vector, "vector cannot be null."); checkState(index >= 0, "Index must be a positive number!"); } @@ -395,8 +398,8 @@ private static VectorSchemaRoot getSchemasRoot(final ResultSet data, final Buffe private static int saveToVectors(final Map vectorToColumnName, final ResultSet data, boolean emptyToNull) throws SQLException { - checkNotNull(vectorToColumnName); - checkNotNull(data); + Objects.requireNonNull(vectorToColumnName, "vectorToColumnName cannot be null."); + Objects.requireNonNull(data, "data cannot be null."); final Set> entrySet = vectorToColumnName.entrySet(); int rows = 0; for (; data.next(); rows++) { @@ -411,8 +414,12 @@ private static int saveToVectors(final Map ve final int intValue = data.getInt(columnName); saveToVector(data.wasNull() ? null : intValue, (IntVector) vector, rows); continue; + } else if (vector instanceof UInt1Vector) { + final byte byteValue = data.getByte(columnName); + saveToVector(data.wasNull() ? null : byteValue, (UInt1Vector) vector, rows); + continue; } - throw Status.INVALID_ARGUMENT.asRuntimeException(); + throw CallStatus.INVALID_ARGUMENT.withDescription("Provided vector not supported").toRuntimeException(); } } for (final Entry vectorToColumn : entrySet) { @@ -451,10 +458,10 @@ private static VectorSchemaRoot getRoot(final ResultSet data, final BufferAlloca private static VectorSchemaRoot getTablesRoot(final DatabaseMetaData databaseMetaData, final BufferAllocator allocator, final boolean includeSchema, - final @Nullable String catalog, - final @Nullable String schemaFilterPattern, - final @Nullable String tableFilterPattern, - final @Nullable String... tableTypes) + final String catalog, + final String schemaFilterPattern, + final String tableFilterPattern, + final String... tableTypes) throws SQLException, IOException { /* * TODO Fix DerbyDB inconsistency if possible. @@ -466,7 +473,8 @@ private static VectorSchemaRoot getTablesRoot(final DatabaseMetaData databaseMet * returns an empty String.The temporary workaround for this was making sure we convert the empty Strings * to null using `com.google.common.base.Strings#emptyToNull`. */ - final VarCharVector catalogNameVector = new VarCharVector("catalog_name", checkNotNull(allocator)); + Objects.requireNonNull(allocator, "BufferAllocator cannot be null."); + final VarCharVector catalogNameVector = new VarCharVector("catalog_name", allocator); final VarCharVector schemaNameVector = new VarCharVector("schema_name", allocator); final VarCharVector tableNameVector = new VarCharVector("table_name", allocator); final VarCharVector tableTypeVector = new VarCharVector("table_type", allocator); @@ -484,9 +492,9 @@ private static VectorSchemaRoot getTablesRoot(final DatabaseMetaData databaseMet tableTypeVector, "TABLE_TYPE"); try (final ResultSet data = - checkNotNull( - databaseMetaData, - format("%s cannot be null!", databaseMetaData.getClass().getName())) + Objects.requireNonNull( + databaseMetaData, + format("%s cannot be null.", databaseMetaData.getClass().getName())) .getTables(catalog, schemaFilterPattern, tableFilterPattern, tableTypes)) { saveToVectors(vectorToColumnName, data, true); @@ -543,10 +551,10 @@ private static VectorSchemaRoot getSqlInfoRoot(final DatabaseMetaData metaData, private static VectorSchemaRoot getSqlInfoRoot(final DatabaseMetaData metaData, final BufferAllocator allocator, final Integer... requestedInfo) throws SQLException { - checkNotNull(metaData, "metaData cannot be null!"); - checkNotNull(allocator, "allocator cannot be null!"); - checkNotNull(requestedInfo, "requestedInfo cannot be null!"); - final IntVector infoNameVector = new IntVector("info_name", allocator); + Objects.requireNonNull(metaData, "metaData cannot be null."); + Objects.requireNonNull(allocator, "allocator cannot be null."); + Objects.requireNonNull(requestedInfo, "requestedInfo cannot be null."); + final UInt4Vector infoNameVector = new UInt4Vector("info_name", allocator); final DenseUnionVector valueVector = DenseUnionVector.empty("value", allocator); valueVector.initializeChildrenFromFields( ImmutableList.of( @@ -560,7 +568,8 @@ private static VectorSchemaRoot getSqlInfoRoot(final DatabaseMetaData metaData, vectors.forEach(FieldVector::allocateNew); final int rows = requestedInfo.length; for (int index = 0; index < rows; index++) { - final int currentInfo = checkNotNull(requestedInfo[index], "Required info cannot be nulL!"); + final int currentInfo = Objects.requireNonNull(requestedInfo[index], + String.format("requestedInfo had a null value at index %d", index)); saveToVector(currentInfo, infoNameVector, index); switch (currentInfo) { case SqlInfo.FLIGHT_SQL_SERVER_NAME: @@ -599,7 +608,7 @@ private static VectorSchemaRoot getSqlInfoRoot(final DatabaseMetaData metaData, metaData.storesLowerCaseQuotedIdentifiers() ? "LOWERCASE" : "UNKNOWN", valueVector, index); break; default: - throw Status.INVALID_ARGUMENT.asRuntimeException(); + throw CallStatus.INVALID_ARGUMENT.withDescription("Provided option is unknown.").toRuntimeException(); } } vectors.forEach(vector -> vector.setValueCount(rows)); @@ -609,20 +618,20 @@ private static VectorSchemaRoot getSqlInfoRoot(final DatabaseMetaData metaData, @Override public void getStreamPreparedStatement(final CommandPreparedStatementQuery command, final CallContext context, final Ticket ticket, final ServerStreamListener listener) { - ByteString handle = command.getPreparedStatementHandle(); + final ByteString handle = command.getPreparedStatementHandle(); StatementContext statementContext = preparedStatementLoadingCache.getIfPresent(handle); - assert statementContext != null; - try (PreparedStatement statement = statementContext.getStatement(); - ResultSet resultSet = statement.executeQuery()) { + Objects.requireNonNull(statementContext); + try (final PreparedStatement statement = statementContext.getStatement(); + final ResultSet resultSet = statement.executeQuery()) { final Schema schema = jdbcToArrowSchema(resultSet.getMetaData(), DEFAULT_CALENDAR); - try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { - VectorLoader loader = new VectorLoader(vectorSchemaRoot); + try (final VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { + final VectorLoader loader = new VectorLoader(vectorSchemaRoot); listener.start(vectorSchemaRoot); final ArrowVectorIterator iterator = sqlToArrowVectorIterator(resultSet, rootAllocator); while (iterator.hasNext()) { - VectorUnloader unloader = new VectorUnloader(iterator.next()); + final VectorUnloader unloader = new VectorUnloader(iterator.next()); loader.load(unloader.getRecordBatch()); listener.putNext(); vectorSchemaRoot.clear(); @@ -630,7 +639,7 @@ public void getStreamPreparedStatement(final CommandPreparedStatementQuery comma listener.putNext(); } - } catch (SQLException | IOException e) { + } catch (final SQLException | IOException e) { LOGGER.error(format("Failed to getStreamPreparedStatement: <%s>.", e.getMessage()), e); listener.error(e); } finally { @@ -640,11 +649,11 @@ public void getStreamPreparedStatement(final CommandPreparedStatementQuery comma } @Override - public void closePreparedStatement(ActionClosePreparedStatementRequest request, CallContext context, - StreamListener listener) { + public void closePreparedStatement(final ActionClosePreparedStatementRequest request, final CallContext context, + final StreamListener listener) { try { preparedStatementLoadingCache.invalidate(request.getPreparedStatementHandle()); - } catch (Exception e) { + } catch (final Exception e) { listener.onError(e); return; } @@ -665,7 +674,7 @@ public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, fi LOGGER.error( format("There was a problem executing the prepared statement: <%s>.", e.getMessage()), e); - throw new FlightRuntimeException(new CallStatus(FlightStatusCode.INTERNAL, e, e.getMessage(), null)); + throw CallStatus.INTERNAL.withCause(e).toRuntimeException(); } } @@ -687,14 +696,14 @@ public FlightInfo getFlightInfoPreparedStatement(final CommandPreparedStatementQ LOGGER.error( format("There was a problem executing the prepared statement: <%s>.", e.getMessage()), e); - throw new FlightRuntimeException(new CallStatus(FlightStatusCode.INTERNAL, e, e.getMessage(), null)); + throw CallStatus.INTERNAL.withCause(e).toRuntimeException(); } } @Override public SchemaResult getSchemaStatement(final CommandStatementQuery command, final CallContext context, final FlightDescriptor descriptor) { - throw Status.UNIMPLEMENTED.asRuntimeException(); + throw CallStatus.UNIMPLEMENTED.toRuntimeException(); } @Override @@ -717,7 +726,7 @@ public void close() throws Exception { @Override public void listFlights(CallContext context, Criteria criteria, StreamListener listener) { // TODO - build example implementation - throw Status.UNIMPLEMENTED.asRuntimeException(); + throw CallStatus.UNIMPLEMENTED.toRuntimeException(); } private CommandStatementQuery getIdentifiableRequest(final CommandStatementQuery request) { @@ -728,7 +737,7 @@ private CommandStatementQuery getIdentifiableRequest(final CommandStatementQuery } private void createStatementIfNotPresent(final CommandStatementQuery request) { - checkNotNull(request); + Objects.requireNonNull(request, "request cannot be null."); final ByteString handle = request.getClientExecutionHandle(); if (!isNull(statementLoadingCache.getIfPresent(handle))) { return; @@ -783,7 +792,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r @Override public void doExchange(CallContext context, FlightStream reader, ServerStreamListener writer) { // TODO - build example implementation - throw Status.UNIMPLEMENTED.asRuntimeException(); + throw CallStatus.UNIMPLEMENTED.toRuntimeException(); } @Override @@ -1602,7 +1611,7 @@ public void getStreamImportedKeys(final FlightSql.CommandGetImportedKeys command VectorSchemaRoot vectorSchemaRoot = createVectors(keys)) { listener.start(vectorSchemaRoot); listener.putNext(); - } catch (SQLException e) { + } catch (final SQLException e) { listener.error(e); } finally { listener.completed(); @@ -1621,8 +1630,8 @@ private VectorSchemaRoot createVectors(ResultSet keys) throws SQLException { final IntVector keySequenceVector = new IntVector("key_sequence", rootAllocator); final VarCharVector fkKeyNameVector = new VarCharVector("fk_key_name", rootAllocator); final VarCharVector pkKeyNameVector = new VarCharVector("pk_key_name", rootAllocator); - final IntVector updateRuleVector = new IntVector("update_rule", rootAllocator); - final IntVector deleteRuleVector = new IntVector("delete_rule", rootAllocator); + final UInt1Vector updateRuleVector = new UInt1Vector("update_rule", rootAllocator); + final UInt1Vector deleteRuleVector = new UInt1Vector("delete_rule", rootAllocator); Map vectorToColumnName = new HashMap<>(); vectorToColumnName.put(pkCatalogNameVector, "PKTABLE_CAT"); @@ -1656,7 +1665,8 @@ private VectorSchemaRoot createVectors(ResultSet keys) throws SQLException { public void getStreamStatement(final CommandStatementQuery command, final CallContext context, final Ticket ticket, final ServerStreamListener listener) { final ByteString handle = command.getClientExecutionHandle(); - try (final ResultSet resultSet = checkNotNull(commandExecuteStatementLoadingCache.getIfPresent(handle))) { + try (final ResultSet resultSet = Objects.requireNonNull(commandExecuteStatementLoadingCache.getIfPresent(handle), + "Got a null ResultSet.")) { final Schema schema = jdbcToArrowSchema(resultSet.getMetaData(), DEFAULT_CALENDAR); try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, rootAllocator)) { VectorLoader loader = new VectorLoader(vectorSchemaRoot); @@ -1707,7 +1717,8 @@ private abstract static class CommandExecuteQueryCacheLoader> statementLoadingCache; public CommandExecuteQueryCacheLoader(final Cache> statementLoadingCache) { - this.statementLoadingCache = checkNotNull(statementLoadingCache); + this.statementLoadingCache = + Objects.requireNonNull(statementLoadingCache, "statementLoadingCache cannot be null."); } public final Cache> getStatementLoadingCache() { @@ -1716,7 +1727,7 @@ public final Cache> getStatementLoadingCache() { @Override public final ResultSet load(final ByteString key) throws SQLException { - return generateResultSetExecutingQuery(checkNotNull(key)); + return generateResultSetExecutingQuery(Objects.requireNonNull(key, "key cannot be null.")); } protected abstract ResultSet generateResultSetExecutingQuery(ByteString handle) throws SQLException; @@ -1732,7 +1743,7 @@ public CommandExecuteStatementCacheLoader( @Override protected ResultSet generateResultSetExecutingQuery(final ByteString handle) throws SQLException { final StatementContext statementContext = getStatementLoadingCache().getIfPresent(handle); - checkNotNull(statementContext); + Objects.requireNonNull(statementContext, "statementContext cannot be null."); return statementContext.getStatement() .executeQuery(statementContext.getQuery().orElseThrow(IllegalStateException::new)); } @@ -1749,7 +1760,7 @@ public CommandExecutePreparedStatementCacheLoader( protected ResultSet generateResultSetExecutingQuery(final ByteString handle) throws SQLException { final StatementContext preparedStatementContext = getStatementLoadingCache().getIfPresent(handle); - checkNotNull(preparedStatementContext); + Objects.requireNonNull(preparedStatementContext, "preparedStatementContext cannot be null."); return preparedStatementContext.getStatement().executeQuery(); } } diff --git a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/StatementContext.java b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/StatementContext.java similarity index 85% rename from java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/StatementContext.java rename to java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/StatementContext.java index 6e50103122d0c..a0659ac40fd51 100644 --- a/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/StatementContext.java +++ b/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/StatementContext.java @@ -15,33 +15,28 @@ * limitations under the License. */ -package org.apache.arrow.flight.sql; +package org.apache.arrow.flight.sql.example; -import java.io.Serializable; import java.sql.Connection; import java.sql.Statement; import java.util.Objects; import java.util.Optional; -import javax.annotation.Nullable; - +import org.apache.arrow.flight.sql.FlightSqlProducer; import org.apache.arrow.util.AutoCloseables; -import org.apache.arrow.util.Preconditions; /** * Context for {@link T} to be persisted in memory in between {@link FlightSqlProducer} calls. * * @param the {@link Statement} to be persisted. */ -public final class StatementContext implements AutoCloseable, Serializable { - - private static final long serialVersionUID = 1344967087502630673L; +public final class StatementContext implements AutoCloseable { private final T statement; private final String query; - public StatementContext(final T statement, final @Nullable String query) { - this.statement = Preconditions.checkNotNull(statement); + public StatementContext(final T statement, final String query) { + this.statement = Objects.requireNonNull(statement, "statement cannot be null."); this.query = query; } diff --git a/java/pom.xml b/java/pom.xml index 9ff65cc4e4cb2..c61c2cb9d2e3e 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -550,21 +550,6 @@ 2.8.2 provided - - org.apache.calcite.avatica - avatica - 1.18.0 - - - org.bouncycastle - bcpkix-jdk15on - 1.61 - - - com.google.code.findbugs - annotations - 3.0.1 - org.hamcrest hamcrest