Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-18300: [Java] WIP: Implement parameters for JDBC driver #14627

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static ArrowFlightJdbcFlightStreamResultSet fromFlightInfo(
final TimeZone timeZone = TimeZone.getDefault();
final QueryState state = new QueryState();

final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null);
final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null);

final AvaticaResultSetMetaData resultSetMetaData =
new AvaticaResultSetMetaData(null, null, signature);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static ArrowFlightJdbcVectorSchemaRootResultSet fromVectorSchemaRoot(
final TimeZone timeZone = TimeZone.getDefault();
final QueryState state = new QueryState();

final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null);
final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null);

final AvaticaResultSetMetaData resultSetMetaData =
new AvaticaResultSetMetaData(null, null, signature);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
import org.apache.arrow.driver.jdbc.utils.TypedValueBinder;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaParameter;
import org.apache.calcite.avatica.ColumnMetaData;
Expand All @@ -54,17 +57,31 @@ public ArrowFlightMetaImpl(final AvaticaConnection connection) {
setDefaultConnectionProperties();
}

static Signature newSignature(final String sql) {
static Signature newSignature(final String sql, Schema parameterSchema) {
final List<AvaticaParameter> parameters;
if (parameterSchema == null) {
parameters = Collections.emptyList();
} else {
parameters = parameterSchema.getFields()
.stream()
.map(AvaticaParameterFromArrowTypeVisitor::fromArrowField)
.collect(Collectors.toList());
}
Map<String, Object> internalParameters = Collections.emptyMap();
return new Signature(
new ArrayList<ColumnMetaData>(),
sql,
Collections.<AvaticaParameter>emptyList(),
Collections.<String, Object>emptyMap(),
null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
parameters,
internalParameters,
/*cursorFactory*/null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
StatementType.SELECT
);
}

private ArrowFlightConnection getConnection() {
return (ArrowFlightConnection) connection;
}

@Override
public void closeStatement(final StatementHandle statementHandle) {
PreparedStatement preparedStatement =
Expand All @@ -86,17 +103,26 @@ public ExecuteResult execute(final StatementHandle statementHandle,
Preconditions.checkArgument(connection.id.equals(statementHandle.connectionId),
"Connection IDs are not consistent");
if (statementHandle.signature == null) {
// TODO: refactor update/select queries out into separate methods
// Update query
final StatementHandleKey key = new StatementHandleKey(statementHandle);
PreparedStatement preparedStatement = statementHandlePreparedStatementMap.get(key);
if (preparedStatement == null) {
throw new IllegalStateException("Prepared statement not found: " + statementHandle);
}
long updatedCount = preparedStatement.executeUpdate();

long updatedCount;
try (final TypedValueBinder binder =
new TypedValueBinder(preparedStatement, getConnection().getBufferAllocator())) {
binder.bind(typedValues);
updatedCount = preparedStatement.executeUpdate();
}
return new ExecuteResult(Collections.singletonList(MetaResultSet.count(statementHandle.connectionId,
statementHandle.id, updatedCount)));
} else {
// TODO Why is maxRowCount ignored?
// TODO: TypedValues
// TODO: should move execution eagerly here instead of deferring it
return new ExecuteResult(
Collections.singletonList(MetaResultSet.create(
statementHandle.connectionId, statementHandle.id,
Expand Down Expand Up @@ -134,9 +160,8 @@ public Frame fetch(final StatementHandle statementHandle, final long offset,
public StatementHandle prepare(final ConnectionHandle connectionHandle,
final String query, final long maxRowCount) {
final StatementHandle handle = super.createStatement(connectionHandle);
handle.signature = newSignature(query);
final PreparedStatement preparedStatement =
((ArrowFlightConnection) connection).getClientHandler().prepare(query);
final PreparedStatement preparedStatement = getConnection().getClientHandler().prepare(query);
handle.signature = newSignature(query, preparedStatement.getParameterSchema());
statementHandlePreparedStatementMap.put(new StatementHandleKey(handle), preparedStatement);
return handle;
}
Expand All @@ -157,11 +182,10 @@ public ExecuteResult prepareAndExecute(final StatementHandle handle,
final PrepareCallback callback)
throws NoSuchStatementException {
try {
final PreparedStatement preparedStatement =
((ArrowFlightConnection) connection).getClientHandler().prepare(query);
final PreparedStatement preparedStatement = getConnection().getClientHandler().prepare(query);
final StatementType statementType = preparedStatement.getType();
statementHandlePreparedStatementMap.put(new StatementHandleKey(handle), preparedStatement);
final Signature signature = newSignature(query);
final Signature signature = newSignature(query, preparedStatement.getParameterSchema());
final long updateCount =
statementType.equals(StatementType.UPDATE) ? preparedStatement.executeUpdate() : -1;
synchronized (callback.getMonitor()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.driver.jdbc;

import java.sql.Types;

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.calcite.avatica.AvaticaParameter;

/**
* Turn an Arrow Field into an equivalent AvaticaParameter.
*/
class AvaticaParameterFromArrowTypeVisitor implements ArrowType.ArrowTypeVisitor<AvaticaParameter> {
private final Field field;

AvaticaParameterFromArrowTypeVisitor(Field field) {
this.field = field;
}

@Override
public AvaticaParameter visit(ArrowType.Null type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Struct type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.List type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.LargeList type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.FixedSizeList type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Union type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Map type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Int type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.FloatingPoint type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Utf8 type) {
return new AvaticaParameter(/*signed*/false, /*precision*/0, /*scale*/0, Types.VARCHAR, "VARCHAR",
String.class.getName(),
field.getName());
}

@Override
public AvaticaParameter visit(ArrowType.LargeUtf8 type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Binary type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.LargeBinary type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.FixedSizeBinary type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Bool type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Decimal type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Date type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Time type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Timestamp type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Interval type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

@Override
public AvaticaParameter visit(ArrowType.Duration type) {
throw new UnsupportedOperationException("Creating parameter with Arrow type " + type);
}

static AvaticaParameter fromArrowField(Field field) {
return field.getType().accept(new AvaticaParameterFromArrowTypeVisitor(field));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.Meta.StatementType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -155,6 +156,10 @@ public interface PreparedStatement extends AutoCloseable {
*/
Schema getDataSetSchema();

Schema getParameterSchema();

void setParameters(VectorSchemaRoot parameters);

@Override
void close();
}
Expand Down Expand Up @@ -190,6 +195,16 @@ public Schema getDataSetSchema() {
return preparedStatement.getResultSetSchema();
}

@Override
public Schema getParameterSchema() {
return preparedStatement.getParameterSchema();
}

@Override
public void setParameters(VectorSchemaRoot parameters) {
preparedStatement.setParameters(parameters);
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.driver.jdbc.utils;

import java.nio.charset.StandardCharsets;
import java.util.List;

import org.apache.arrow.driver.jdbc.client.ArrowFlightSqlClientHandler.PreparedStatement;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.calcite.avatica.remote.TypedValue;

/**
* Bind {@link TypedValue}s to a {@link VectorSchemaRoot}.
*/
public class TypedValueBinder implements AutoCloseable {
private final PreparedStatement preparedStatement;
private final VectorSchemaRoot parameters;

public TypedValueBinder(PreparedStatement preparedStatement, BufferAllocator bufferAllocator) {
this.parameters = VectorSchemaRoot.create(preparedStatement.getParameterSchema(), bufferAllocator);
this.preparedStatement = preparedStatement;
}

/**
* Bind the given Avatica values to the prepared statement.
* @param typedValues The parameter values.
*/
public void bind(List<TypedValue> typedValues) {
if (preparedStatement.getParameterSchema().getFields().size() != typedValues.size()) {
throw new IllegalStateException(
String.format("Prepared statement has %s parameters, but only received %s",
preparedStatement.getParameterSchema().getFields().size(),
typedValues.size()));
}

for (int i = 0; i < typedValues.size(); i++) {
final TypedValue param = typedValues.get(i);
final FieldVector vector = parameters.getVector(i);
switch (param.type) {
case STRING:
bindValue((String) param.value, vector);
break;
default:
throw new UnsupportedOperationException(
String.format("Binding JDBC type %s to Arrow Flight SQL statement", param.type));
}
}

if (!typedValues.isEmpty()) {
parameters.setRowCount(1);
preparedStatement.setParameters(parameters);
}
}

private void bindValue(String value, FieldVector vector) {
if (vector instanceof VarCharVector) {
((VarCharVector) vector).setSafe(0, value.getBytes(StandardCharsets.UTF_8));
} else {
throw new UnsupportedOperationException(
String.format("Binding String to parameter of Arrow type %s", vector.getField().getType()));
}
}

@Override
public void close() {
parameters.close();
}
}
Loading