Skip to content
This repository has been archived by the owner on Dec 12, 2020. It is now read-only.

Commit

Permalink
Add Decimal
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan Murray committed Sep 5, 2019
1 parent b496594 commit d7eb4b1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
36 changes: 23 additions & 13 deletions src/main/java/com/dremio/flight/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.dremio.common.exceptions.UserException;
import com.dremio.common.exceptions.UserRemoteException;
import com.dremio.common.utils.protos.ExternalIdHelper;
import com.dremio.common.utils.protos.QueryWritableBatch;
Expand Down Expand Up @@ -100,9 +101,11 @@ class Producer implements FlightProducer, AutoCloseable {

@Override
public void doAction(CallContext context, Action action, StreamListener<Result> resultStreamListener) {
throw Status.UNIMPLEMENTED.asRuntimeException();
}

private FlightInfo getInfo(CallContext callContext, FlightDescriptor descriptor, String sql) {
logger.info("GetFlightInfo called for sql {}", sql);
return getInfoImpl(callContext, descriptor, sql);
}

Expand All @@ -116,8 +119,9 @@ private FlightInfo getInfoImpl(CallContext callContext, FlightDescriptor descrip

UserRequest request = new UserRequest(RpcType.CREATE_PREPARED_STATEMENT, req);
Prepare prepare = new Prepare();
submitWork(callContext, request, prepare);
return prepare.getInfo(descriptor);

UserBitShared.ExternalId externalId = submitWork(callContext, request, prepare);
return prepare.getInfo(descriptor, externalId);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
Expand All @@ -126,13 +130,12 @@ private FlightInfo getInfoImpl(CallContext callContext, FlightDescriptor descrip

@Override
public FlightInfo getFlightInfo(CallContext callContext, FlightDescriptor descriptor) {
logger.info("called get flight info");
return getInfo(callContext, descriptor, new String(descriptor.getCommand()));
}

@Override
public Runnable acceptPut(CallContext callContext, FlightStream flightStream, StreamListener<org.apache.arrow.flight.PutResult> streamListener) {
throw Status.UNAVAILABLE.asRuntimeException();
throw Status.UNIMPLEMENTED.asRuntimeException();
}

private UserBitShared.ExternalId submitWork(CallContext callContext, UserRequest request, UserResponseHandler handler) {
Expand All @@ -143,6 +146,7 @@ private UserBitShared.ExternalId submitWork(CallContext callContext, UserRequest
handler,
request,
TerminationListenerRegistry.NOOP);
logger.debug("Submitted job {} from flight for request with type {}", ExternalIdHelper.toQueryId(externalId), request.getType());
return externalId;
}

Expand All @@ -158,28 +162,34 @@ private class Prepare implements UserResponseHandler {
public Prepare() {
}

public FlightInfo getInfo(FlightDescriptor descriptor) {
public FlightInfo getInfo(FlightDescriptor descriptor, UserBitShared.ExternalId externalId) {
try {
logger.debug("Waiting for prepared statement handle to return for job id {}", ExternalIdHelper.toQueryId(externalId));
CreatePreparedStatementResp handle = future.get();
logger.debug("prepared statement handle for job id {} has returned", ExternalIdHelper.toQueryId(externalId));
if (handle.getStatus() == RequestStatus.FAILED) {
throw Status.UNKNOWN.withDescription(handle.getError().getMessage()).withCause(UserRemoteException.create(handle.getError())).asRuntimeException();
logger.warn("prepared statement handle for job id " + ExternalIdHelper.toQueryId(externalId) + " has failed", UserRemoteException.create(handle.getError()));
throw Status.INTERNAL.withDescription(handle.getError().getMessage()).withCause(UserRemoteException.create(handle.getError())).asRuntimeException();
}
logger.debug("prepared statement handle for job id {} has succeeded", ExternalIdHelper.toQueryId(externalId));
PreparedStatement statement = handle.getPreparedStatement();
Ticket ticket = new Ticket(statement.getServerHandle().toByteArray());
FlightEndpoint endpoint = new FlightEndpoint(ticket, location);
FlightInfo info = new FlightInfo(fromMetadata(statement.getColumnsList()), descriptor, Lists.newArrayList(endpoint), -1L, -1L);
logger.debug("flight endpoint for job id {} has been created with ticket {}", ExternalIdHelper.toQueryId(externalId), new String(ticket.getBytes()));
Schema schema = fromMetadata(statement.getColumnsList());
FlightInfo info = new FlightInfo(schema, descriptor, Lists.newArrayList(endpoint), -1L, -1L);
logger.debug("flight info for job id {} has been created with schema {}", ExternalIdHelper.toQueryId(externalId), schema.toJson());
return info;
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (Exception e) {
logger.warn("prepared statement handle for job id " + ExternalIdHelper.toQueryId(externalId) + " has failed", UserException.parseError(e).buildSilently());
throw Status.UNKNOWN.withCause(UserException.parseError(e).buildSilently()).asRuntimeException();
}
}

private Schema fromMetadata(List<ResultColumnMetadata> rcmd) {

Schema schema = new Schema(rcmd.stream().map(md -> {
ArrowType arrowType = SqlTypeNameToArrowType.toArrowType(md.getDataType());
ArrowType arrowType = SqlTypeNameToArrowType.toArrowType(md);
FieldType fieldType = new FieldType(md.getIsNullable(), arrowType, null, null);
return new Field(md.getColumnName(), fieldType, null);
}).collect(Collectors.toList()));
Expand All @@ -188,7 +198,7 @@ private Schema fromMetadata(List<ResultColumnMetadata> rcmd) {

@Override
public void sendData(RpcOutcomeListener<Ack> outcomeListener, QueryWritableBatch result) {
throw new IllegalStateException();
throw Status.UNIMPLEMENTED.asRuntimeException();
}

@Override
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/com/dremio/flight/SqlTypeNameToArrowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@
import org.apache.arrow.vector.types.pojo.ArrowType.Binary;
import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
import org.apache.arrow.vector.types.pojo.ArrowType.Date;
import org.apache.arrow.vector.types.pojo.ArrowType.Decimal;
import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
import org.apache.arrow.vector.types.pojo.ArrowType.Int;
import org.apache.arrow.vector.types.pojo.ArrowType.Interval;
import org.apache.arrow.vector.types.pojo.ArrowType.List;
import org.apache.arrow.vector.types.pojo.ArrowType.Null;
import org.apache.arrow.vector.types.pojo.ArrowType.Struct;
import org.apache.arrow.vector.types.pojo.ArrowType.Time;
import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp;
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;

import com.dremio.exec.proto.UserProtos;

/***
* Gets Dremio RPC-/protobuf-level data type for given SQL data type name.
* returns the canonical keyword sequence for SQL data type (leading keywords in
Expand All @@ -41,14 +42,17 @@
*/
public class SqlTypeNameToArrowType {

public static ArrowType toArrowType(String typeName) {
public static ArrowType toArrowType(UserProtos.ResultColumnMetadata type) {
String typeName = type.getDataType();
switch (typeName) {
case "NULL":
return new Null();
case "MAP":
return new Struct(); //todo inner type?
throw new UnsupportedOperationException("have not implemented map");
//return new Struct(); //todo inner type?
case "ARRAY":
return new List(); //todo inner type?
throw new UnsupportedOperationException("have not implemented array");
//return new List(); //todo inner type?
case "UNION":
throw new UnsupportedOperationException("have not implemented unions");
//return new Union(); //todo inner type?
Expand All @@ -71,7 +75,7 @@ public static ArrowType toArrowType(String typeName) {
case "BOOLEAN":
return new Bool();
case "DECIMAL":
throw new UnsupportedOperationException("have not implemented decimal");
return new Decimal(type.getPrecision(), type.getScale());
case "DATE":
return new Date(DateUnit.MILLISECOND);
case "TIME":
Expand Down

0 comments on commit d7eb4b1

Please sign in to comment.