Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-33475: [Java][FlightRPC] Send prepared statement parameters #33961

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions java/flight/flight-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
</properties>

<dependencies>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.2.16</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-format</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.arrow.flight.ArrowMessage.HeaderType;
import org.apache.arrow.flight.grpc.StatusUtils;
import org.apache.arrow.memory.ArrowBuf;
Expand All @@ -51,6 +53,8 @@

import io.grpc.stub.StreamObserver;

import static com.codahale.metrics.MetricRegistry.name;

/**
* An adaptor between protobuf streams and flight data streams.
*/
Expand Down Expand Up @@ -85,6 +89,13 @@ public class FlightStream implements AutoCloseable {
@VisibleForTesting
volatile MetadataVersion metadataVersion = null;

public static final MetricRegistry metrics = new MetricRegistry();

private static final Timer fsNext = metrics.timer(name(FlightStream.class, "fsNext"));
private static final Timer fsTake = metrics.timer(name(FlightStream.class, "fsTake"));
private static final Timer fsRb = metrics.timer(name(FlightStream.class, "fsRb"));
private static final Timer fsDict = metrics.timer(name(FlightStream.class, "fsDict"));

/**
* Constructs a new instance.
*
Expand Down Expand Up @@ -220,76 +231,87 @@ public void close() throws Exception {
* @return Whether or not more data was found.
*/
public boolean next() {
try {
if (completed.isDone() && queue.isEmpty()) {
return false;
}
try (final Timer.Context context = fsNext.time()) {
try {
if (completed.isDone() && queue.isEmpty()) {
return false;
}

pending--;
requestOutstanding();
pending--;
requestOutstanding();

Object data = queue.take();
if (DONE == data) {
queue.put(DONE);
// Other code ignores the value of this CompletableFuture, only whether it's completed (or has an exception)
completed.complete(null);
return false;
} else if (DONE_EX == data) {
queue.put(DONE_EX);
if (ex instanceof Exception) {
throw (Exception) ex;
} else {
throw new Exception(ex);
Object data;
try (final Timer.Context takeCtx = fsTake.time()) {
// System.out.format("Trying to take @ %dms\n", System.currentTimeMillis());
data = queue.take();
// System.out.format("Took @ %dms\n", System.currentTimeMillis());
}
} else {
try (ArrowMessage msg = ((ArrowMessage) data)) {
if (msg.getMessageType() == HeaderType.NONE) {
updateMetadata(msg);
// We received a message without data, so erase any leftover data
if (fulfilledRoot != null) {
fulfilledRoot.clear();
}
} else if (msg.getMessageType() == HeaderType.RECORD_BATCH) {
checkMetadataVersion(msg);
// Ensure we have the root
root.get().clear();
try (ArrowRecordBatch arb = msg.asRecordBatch()) {
loader.load(arb);
}
updateMetadata(msg);
} else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) {
checkMetadataVersion(msg);
// Ensure we have the root
root.get().clear();
try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) {
final long id = arb.getDictionaryId();
if (dictionaries == null) {
throw new IllegalStateException("Dictionary ownership was claimed by the application.");
if (DONE == data) {
queue.put(DONE);
// Other code ignores the value of this CompletableFuture, only whether it's completed (or has an exception)
completed.complete(null);
return false;
} else if (DONE_EX == data) {
queue.put(DONE_EX);
if (ex instanceof Exception) {
throw (Exception) ex;
} else {
throw new Exception(ex);
}
} else {
try (ArrowMessage msg = ((ArrowMessage) data)) {
if (msg.getMessageType() == HeaderType.NONE) {
updateMetadata(msg);
// We received a message without data, so erase any leftover data
if (fulfilledRoot != null) {
fulfilledRoot.clear();
}
final Dictionary dictionary = dictionaries.lookup(id);
if (dictionary == null) {
throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id);
} else if (msg.getMessageType() == HeaderType.RECORD_BATCH) {
try (final Timer.Context rbCtx = fsRb.time()) {
checkMetadataVersion(msg);
// Ensure we have the root
root.get().clear();
try (ArrowRecordBatch arb = msg.asRecordBatch()) {
loader.load(arb);
}
updateMetadata(msg);
}

final FieldVector vector = dictionary.getVector();
final VectorSchemaRoot dictionaryRoot = new VectorSchemaRoot(Collections.singletonList(vector.getField()),
Collections.singletonList(vector), 0);
final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot);
dictionaryLoader.load(arb.getDictionary());
} else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) {
try (final Timer.Context dictCtx = fsRb.time()) {
checkMetadataVersion(msg);
// Ensure we have the root
root.get().clear();
try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) {
final long id = arb.getDictionaryId();
if (dictionaries == null) {
throw new IllegalStateException("Dictionary ownership was claimed by the application.");
}
final Dictionary dictionary = dictionaries.lookup(id);
if (dictionary == null) {
throw new IllegalArgumentException("Dictionary not defined in schema: ID " + id);
}

final FieldVector vector = dictionary.getVector();
final VectorSchemaRoot dictionaryRoot = new VectorSchemaRoot(Collections.singletonList(vector.getField()),
Collections.singletonList(vector), 0);
final VectorLoader dictionaryLoader = new VectorLoader(dictionaryRoot);
dictionaryLoader.load(arb.getDictionary());
}
return next();
}
} else {
throw new UnsupportedOperationException("Message type is unsupported: " + msg.getMessageType());
}
return next();
} else {
throw new UnsupportedOperationException("Message type is unsupported: " + msg.getMessageType());
return true;
}
return true;
}
} catch (RuntimeException e) {
throw e;
} catch (ExecutionException e) {
throw StatusUtils.fromThrowable(e.getCause());
} catch (Exception e) {
throw new RuntimeException(e);
}
} catch (RuntimeException e) {
throw e;
} catch (ExecutionException e) {
throw StatusUtils.fromThrowable(e.getCause());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -383,6 +405,7 @@ private void enqueue(AutoCloseable message) {

@Override
public void onNext(ArrowMessage msg) {
// System.out.format("FlightStream.onNext() @ %dms\n", System.currentTimeMillis());
// Operations here have to be under a lock so that we don't add a message to the queue while in the middle of
// close().
requestOutstanding();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.arrow.driver.jdbc;

import static com.codahale.metrics.MetricRegistry.name;
import static org.apache.arrow.driver.jdbc.utils.FlightStreamQueue.createNewQueue;
import static org.apache.arrow.flight.FlightStream.metrics;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand All @@ -26,10 +28,12 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.Timer;
import org.apache.arrow.driver.jdbc.utils.FlightStreamQueue;
import org.apache.arrow.driver.jdbc.utils.VectorSchemaRootTransformer;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -46,6 +50,8 @@
public final class ArrowFlightJdbcFlightStreamResultSet
extends ArrowFlightJdbcVectorSchemaRootResultSet {

private static final Timer nextStream = metrics.timer(name(FlightSqlClient.class, "nextStream"));

private final ArrowFlightConnection connection;
private FlightStream currentFlightStream;
private FlightStreamQueue flightStreamQueue;
Expand All @@ -55,6 +61,10 @@ public final class ArrowFlightJdbcFlightStreamResultSet

private Schema schema;

private static final Timer rsExec = metrics.timer(name(FlightSqlClient.class, "rsExec"));
private static final Timer getStreams = metrics.timer(name(FlightSqlClient.class, "getStreams"));
private static final Timer getNextFlightStream = metrics.timer(name(FlightStreamQueue.class, "getNextFlightStream"));

ArrowFlightJdbcFlightStreamResultSet(final AvaticaStatement statement,
final QueryState state,
final Meta.Signature signature,
Expand Down Expand Up @@ -112,31 +122,40 @@ private void loadNewQueue() {
}

private void loadNewFlightStream() throws SQLException {
if (currentFlightStream != null) {
AutoCloseables.closeNoChecked(currentFlightStream);
try(final Timer.Context context = nextStream.time()) {
if (currentFlightStream != null) {
AutoCloseables.closeNoChecked(currentFlightStream);
}
this.currentFlightStream = getNextFlightStream(true);
}
this.currentFlightStream = getNextFlightStream(true);
}

@Override
protected AvaticaResultSet execute() throws SQLException {
final FlightInfo flightInfo = ((ArrowFlightInfoStatement) statement).executeFlightInfoQuery();

if (flightInfo != null) {
schema = flightInfo.getSchema();
execute(flightInfo);
try(final Timer.Context context = rsExec.time()) {
// System.out.format("Getting FlightInfo @ %dms\n", System.currentTimeMillis());
final FlightInfo flightInfo = ((ArrowFlightInfoStatement) statement).executeFlightInfoQuery();

if (flightInfo != null) {
schema = flightInfo.getSchema();
// System.out.format("Getting FlightDatas @ %dms\n", System.currentTimeMillis());
execute(flightInfo);
}
// System.out.format("Got FlightDatas @ %dms\n", System.currentTimeMillis());
return this;
}
return this;
}

private void execute(final FlightInfo flightInfo) throws SQLException {
loadNewQueue();
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
loadNewFlightStream();
try(final Timer.Context context = getStreams.time()) {
loadNewQueue();
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
loadNewFlightStream();

// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
executeForCurrentFlightStream();
// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
executeForCurrentFlightStream();
}
}
}

Expand Down Expand Up @@ -239,12 +258,14 @@ public synchronized void close() {
}

private FlightStream getNextFlightStream(final boolean isExecution) throws SQLException {
if (isExecution) {
final int statementTimeout = statement != null ? statement.getQueryTimeout() : 0;
return statementTimeout != 0 ?
flightStreamQueue.next(statementTimeout, TimeUnit.SECONDS) : flightStreamQueue.next();
} else {
return flightStreamQueue.next();
try (final Timer.Context context = getNextFlightStream.time()) {
if (isExecution) {
final int statementTimeout = statement != null ? statement.getQueryTimeout() : 0;
return statementTimeout != 0 ?
flightStreamQueue.next(statementTimeout, TimeUnit.SECONDS) : flightStreamQueue.next();
} else {
return flightStreamQueue.next();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static Signature newSignature(final String sql) {
return new Signature(
new ArrayList<ColumnMetaData>(),
sql,
Collections.<AvaticaParameter>emptyList(),
new ArrayList<AvaticaParameter>(),
Collections.<String, Object>emptyMap(),
null, // unnecessary, as SQL requests use ArrowFlightJdbcCursor
StatementType.SELECT
Expand Down
Loading