Skip to content

Commit

Permalink
apacheGH-37795: [Java][FlightSQL] Add mock FlightSqlProducer and tests (
Browse files Browse the repository at this point in the history
apache#37837)

### Rationale for this change
Clarify how to write a FlightSqlProducer with examples and helper classes. This is more inline with what's available
to help developers write a FlightProducer.

### What changes are included in this PR?

Add helper classes for creating a No-op Flight SQL producer and a partially implemented FlightSqlProducer that can process metadata requests.

Add a mock flight producer and tests for it based on the new FlightSqlProducer partial implementations.

Clean-up missed closes of FlightStreams in TestFlightSql.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* Closes: apache#37795

Authored-by: James Duong <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
jduo authored and loicalleyne committed Nov 13, 2023
1 parent 4e429da commit 35940fc
Show file tree
Hide file tree
Showing 5 changed files with 917 additions and 262 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight.sql;

import java.util.List;

import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.sql.impl.FlightSql;
import org.apache.arrow.vector.types.pojo.Schema;

import com.google.protobuf.Message;

/**
* A {@link FlightSqlProducer} that implements getting FlightInfo for each metadata request.
*/
public abstract class BasicFlightSqlProducer extends NoOpFlightSqlProducer {

@Override
public FlightInfo getFlightInfoSqlInfo(FlightSql.CommandGetSqlInfo request, CallContext context,
FlightDescriptor descriptor) {
return generateFlightInfo(request, descriptor, Schemas.GET_SQL_INFO_SCHEMA);
}

@Override
public FlightInfo getFlightInfoTypeInfo(FlightSql.CommandGetXdbcTypeInfo request, CallContext context,
FlightDescriptor descriptor) {
return generateFlightInfo(request, descriptor, Schemas.GET_TYPE_INFO_SCHEMA);
}

@Override
public FlightInfo getFlightInfoCatalogs(FlightSql.CommandGetCatalogs request, CallContext context,
FlightDescriptor descriptor) {
return generateFlightInfo(request, descriptor, Schemas.GET_CATALOGS_SCHEMA);
}

@Override
public FlightInfo getFlightInfoSchemas(FlightSql.CommandGetDbSchemas request, CallContext context,
FlightDescriptor descriptor) {
return generateFlightInfo(request, descriptor, Schemas.GET_SCHEMAS_SCHEMA);
}

@Override
public FlightInfo getFlightInfoTables(FlightSql.CommandGetTables request, CallContext context,
FlightDescriptor descriptor) {
if (request.getIncludeSchema()) {
return generateFlightInfo(request, descriptor, Schemas.GET_TABLES_SCHEMA);
}
return generateFlightInfo(request, descriptor, Schemas.GET_TABLES_SCHEMA_NO_SCHEMA);
}

@Override
public FlightInfo getFlightInfoTableTypes(FlightSql.CommandGetTableTypes request, CallContext context,
FlightDescriptor descriptor) {
return generateFlightInfo(request, descriptor, Schemas.GET_TABLE_TYPES_SCHEMA);
}

@Override
public FlightInfo getFlightInfoPrimaryKeys(FlightSql.CommandGetPrimaryKeys request, CallContext context,
FlightDescriptor descriptor) {
return generateFlightInfo(request, descriptor, Schemas.GET_PRIMARY_KEYS_SCHEMA);
}

@Override
public FlightInfo getFlightInfoExportedKeys(FlightSql.CommandGetExportedKeys request, CallContext context,
FlightDescriptor descriptor) {
return generateFlightInfo(request, descriptor, Schemas.GET_EXPORTED_KEYS_SCHEMA);
}

@Override
public FlightInfo getFlightInfoImportedKeys(FlightSql.CommandGetImportedKeys request, CallContext context,
FlightDescriptor descriptor) {
return generateFlightInfo(request, descriptor, Schemas.GET_IMPORTED_KEYS_SCHEMA);
}

@Override
public FlightInfo getFlightInfoCrossReference(FlightSql.CommandGetCrossReference request, CallContext context,
FlightDescriptor descriptor) {
return generateFlightInfo(request, descriptor, Schemas.GET_CROSS_REFERENCE_SCHEMA);
}

/**
* Return a list of FlightEndpoints for the given request and FlightDescriptor. This method should validate that
* the request is supported by this FlightSqlProducer.
*/
protected abstract <T extends Message>
List<FlightEndpoint> determineEndpoints(T request, FlightDescriptor flightDescriptor, Schema schema);

protected <T extends Message> FlightInfo generateFlightInfo(T request, FlightDescriptor descriptor, Schema schema) {
final List<FlightEndpoint> endpoints = determineEndpoints(request, descriptor, schema);
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight.sql;

import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.SchemaResult;
import org.apache.arrow.flight.sql.impl.FlightSql;

/**
* A {@link FlightSqlProducer} that throws on all FlightSql-specific operations.
*/
public class NoOpFlightSqlProducer implements FlightSqlProducer {
@Override
public void createPreparedStatement(FlightSql.ActionCreatePreparedStatementRequest request,
CallContext context, StreamListener<Result> listener) {
listener.onError(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public void closePreparedStatement(FlightSql.ActionClosePreparedStatementRequest request,
CallContext context, StreamListener<Result> listener) {
listener.onError(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public FlightInfo getFlightInfoStatement(FlightSql.CommandStatementQuery command,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public FlightInfo getFlightInfoPreparedStatement(FlightSql.CommandPreparedStatementQuery command,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public SchemaResult getSchemaStatement(FlightSql.CommandStatementQuery command,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public void getStreamStatement(FlightSql.TicketStatementQuery ticket,
CallContext context, ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public void getStreamPreparedStatement(FlightSql.CommandPreparedStatementQuery command,
CallContext context, ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public Runnable acceptPutStatement(FlightSql.CommandStatementUpdate command, CallContext context,
FlightStream flightStream, StreamListener<PutResult> ackStream) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public Runnable acceptPutPreparedStatementUpdate(FlightSql.CommandPreparedStatementUpdate command,
CallContext context, FlightStream flightStream,
StreamListener<PutResult> ackStream) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public Runnable acceptPutPreparedStatementQuery(FlightSql.CommandPreparedStatementQuery command, CallContext context,
FlightStream flightStream, StreamListener<PutResult> ackStream) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public FlightInfo getFlightInfoSqlInfo(FlightSql.CommandGetSqlInfo request, CallContext context,
FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public void getStreamSqlInfo(FlightSql.CommandGetSqlInfo command, CallContext context,
ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public FlightInfo getFlightInfoTypeInfo(FlightSql.CommandGetXdbcTypeInfo request,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public void getStreamTypeInfo(FlightSql.CommandGetXdbcTypeInfo request,
CallContext context, ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public FlightInfo getFlightInfoCatalogs(FlightSql.CommandGetCatalogs request,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public void getStreamCatalogs(CallContext context, ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public FlightInfo getFlightInfoSchemas(FlightSql.CommandGetDbSchemas request,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public void getStreamSchemas(FlightSql.CommandGetDbSchemas command,
CallContext context, ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public FlightInfo getFlightInfoTables(FlightSql.CommandGetTables request,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public void getStreamTables(FlightSql.CommandGetTables command, CallContext context, ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public FlightInfo getFlightInfoTableTypes(FlightSql.CommandGetTableTypes request, CallContext context,
FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public void getStreamTableTypes(CallContext context, ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public FlightInfo getFlightInfoPrimaryKeys(FlightSql.CommandGetPrimaryKeys request,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public void getStreamPrimaryKeys(FlightSql.CommandGetPrimaryKeys command,
CallContext context, ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public FlightInfo getFlightInfoExportedKeys(FlightSql.CommandGetExportedKeys request,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public FlightInfo getFlightInfoImportedKeys(FlightSql.CommandGetImportedKeys request,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public FlightInfo getFlightInfoCrossReference(FlightSql.CommandGetCrossReference request,
CallContext context, FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}

@Override
public void getStreamExportedKeys(FlightSql.CommandGetExportedKeys command,
CallContext context, ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public void getStreamImportedKeys(FlightSql.CommandGetImportedKeys command, CallContext context,
ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public void getStreamCrossReference(FlightSql.CommandGetCrossReference command, CallContext context,
ServerStreamListener listener) {
listener.error(CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException());
}

@Override
public void close() throws Exception {

}

@Override
public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) {
throw CallStatus.UNIMPLEMENTED.withDescription("Not implemented.").toRuntimeException();
}
}
Loading

0 comments on commit 35940fc

Please sign in to comment.