metadataListener, CallOption... options) {
Preconditions.checkNotNull(descriptor);
Preconditions.checkNotNull(root);
@@ -173,8 +199,7 @@ public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRo
ClientCalls.asyncBidiStreamingCall(
authInterceptor.interceptCall(doPutDescriptor, callOptions, channel), resultObserver);
// send the schema to start.
- ArrowMessage message = new ArrowMessage(descriptor.toProtocol(), root.getSchema());
- observer.onNext(message);
+ DictionaryUtils.generateSchemaMessages(root.getSchema(), descriptor, provider, observer::onNext);
return new PutObserver(new VectorUnloader(
root, true /* include # of nulls in vectors */, true /* must align buffers to be C++-compatible */),
observer, resultObserver.getFuture());
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightProducer.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightProducer.java
index 8b9df4b9fa8e9..316fbf97fb9cb 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/FlightProducer.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightProducer.java
@@ -18,6 +18,7 @@
package org.apache.arrow.flight;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
/**
* API to Implement an Arrow Flight producer.
@@ -100,6 +101,13 @@ public interface ServerStreamListener {
*/
void start(VectorSchemaRoot root);
+ /**
+ * Start sending data, using the schema of the given {@link VectorSchemaRoot}.
+ *
+ * This method must be called before all others.
+ */
+ void start(VectorSchemaRoot root, DictionaryProvider dictionaries);
+
/**
* Send the current contents of the associated {@link VectorSchemaRoot}.
*/
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java b/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
index dc055c1f0d758..8f8c2050d5af8 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/FlightService.java
@@ -17,6 +17,11 @@
package org.apache.arrow.flight;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BooleanSupplier;
@@ -32,8 +37,17 @@
import org.apache.arrow.flight.impl.Flight.HandshakeResponse;
import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.dictionary.Dictionary;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.DictionaryUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,9 +151,14 @@ public boolean isCancelled() {
@Override
public void start(VectorSchemaRoot root) {
- responseObserver.onNext(new ArrowMessage(null, root.getSchema()));
- // [ARROW-4213] We must align buffers to be compatible with other languages.
+ start(root, new MapDictionaryProvider());
+ }
+
+ @Override
+ public void start(VectorSchemaRoot root, DictionaryProvider provider) {
unloader = new VectorUnloader(root, true, true);
+
+ DictionaryUtils.generateSchemaMessages(root.getSchema(), null, provider, responseObserver::onNext);
}
@Override
@@ -171,7 +190,7 @@ public StreamObserver doPutCustom(final StreamObserver responseObserver.request(count));
+ FlightStream fs = new FlightStream(allocator, PENDING_REQUESTS, null, responseObserver::request);
executors.submit(() -> {
try {
producer.acceptPut(makeContext(responseObserver), fs,
@@ -179,6 +198,9 @@ public StreamObserver doPutCustom(final StreamObserver fields = new ArrayList<>();
+ final Map dictionaryMap = new HashMap<>();
+ for (final Field originalField : schema.getFields()) {
+ final Field updatedField = DictionaryUtility.toMemoryFormat(originalField, allocator, dictionaryMap);
+ fields.add(updatedField);
+ }
+ for (final Map.Entry entry : dictionaryMap.entrySet()) {
+ dictionaries.put(entry.getValue());
+ }
+ schema = new Schema(fields, schema.getCustomMetadata());
fulfilledRoot = VectorSchemaRoot.create(schema, allocator);
loader = new VectorLoader(fulfilledRoot);
descriptor = msg.getDescriptor() != null ? new FlightDescriptor(msg.getDescriptor()) : null;
root.set(fulfilledRoot);
break;
+ }
case RECORD_BATCH:
queue.add(msg);
break;
- case NONE:
case DICTIONARY_BATCH:
+ queue.add(msg);
+ break;
+ case NONE:
case TENSOR:
default:
queue.add(DONE_EX);
- ex = new UnsupportedOperationException("Unable to handle message of type: " + msg);
+ ex = new UnsupportedOperationException("Unable to handle message of type: " + msg.getMessageType());
}
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/GenericOperation.java b/java/flight/src/main/java/org/apache/arrow/flight/GenericOperation.java
deleted file mode 100644
index 03a1e92af12c0..0000000000000
--- a/java/flight/src/main/java/org/apache/arrow/flight/GenericOperation.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.flight;
-
-/**
- * Unused?.
- */
-class GenericOperation {
-
- private final String type;
- private final byte[] body;
-
- public GenericOperation(String type, byte[] body) {
- super();
- this.type = type;
- this.body = body == null ? new byte[0] : body;
- }
-
- public String getType() {
- return type;
- }
-
- public byte[] getBody() {
- return body;
- }
-
-}
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java b/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java
index dffafbdc1e8f0..7cf615ebf69bd 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java
@@ -36,6 +36,9 @@ private PutResult(ByteBuffer metadata) {
/** Create a PutResult with application-specific metadata. */
public static PutResult metadata(byte[] metadata) {
+ if (metadata == null) {
+ return empty();
+ }
return new PutResult(ByteBuffer.wrap(metadata));
}
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/example/FlightHolder.java b/java/flight/src/main/java/org/apache/arrow/flight/example/FlightHolder.java
index 91ed04e7ffaba..cf3eb154ed7e1 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/example/FlightHolder.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/example/FlightHolder.java
@@ -28,6 +28,7 @@
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.types.pojo.Schema;
import com.google.common.base.Preconditions;
@@ -43,19 +44,22 @@ public class FlightHolder implements AutoCloseable {
private final FlightDescriptor descriptor;
private final Schema schema;
private final List streams = new CopyOnWriteArrayList<>();
+ private final DictionaryProvider dictionaryProvider;
/**
* Creates a new instance.
- *
- * @param allocator The allocator to use for allocating buffers to store data.
+ * @param allocator The allocator to use for allocating buffers to store data.
* @param descriptor The descriptor for the streams.
* @param schema The schema for the stream.
+ * @param dictionaryProvider The dictionary provider for the stream.
*/
- public FlightHolder(BufferAllocator allocator, FlightDescriptor descriptor, Schema schema) {
+ public FlightHolder(BufferAllocator allocator, FlightDescriptor descriptor, Schema schema,
+ DictionaryProvider dictionaryProvider) {
Preconditions.checkArgument(!descriptor.isCommand());
this.allocator = allocator.newChildAllocator(descriptor.toString(), 0, Long.MAX_VALUE);
this.descriptor = descriptor;
this.schema = schema;
+ this.dictionaryProvider = dictionaryProvider;
}
/**
@@ -72,8 +76,8 @@ public Stream getStream(ExampleTicket ticket) {
* Adds a new streams which clients can populate via the returned object.
*/
public Stream.StreamCreator addStream(Schema schema) {
- Preconditions.checkArgument(schema.equals(schema), "Stream schema inconsistent with existing schema.");
- return new Stream.StreamCreator(schema, allocator, t -> {
+ Preconditions.checkArgument(this.schema.equals(schema), "Stream schema inconsistent with existing schema.");
+ return new Stream.StreamCreator(schema, dictionaryProvider, allocator, t -> {
synchronized (streams) {
streams.add(t);
}
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/example/InMemoryStore.java b/java/flight/src/main/java/org/apache/arrow/flight/example/InMemoryStore.java
index 1d9eb889d743d..73d448e3ab1e1 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/example/InMemoryStore.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/example/InMemoryStore.java
@@ -79,17 +79,6 @@ public Stream getStream(Ticket t) {
return h.getStream(example);
}
- /**
- * Create a new {@link Stream} with the given schema and descriptor.
- */
- public StreamCreator putStream(final FlightDescriptor descriptor, final Schema schema) {
- final FlightHolder h = holders.computeIfAbsent(
- descriptor,
- t -> new FlightHolder(allocator, t, schema));
-
- return h.addStream(schema);
- }
-
@Override
public void listFlights(CallContext context, Criteria criteria,
StreamListener listener) {
@@ -123,7 +112,7 @@ public Runnable acceptPut(CallContext context,
try (VectorSchemaRoot root = flightStream.getRoot()) {
final FlightHolder h = holders.computeIfAbsent(
flightStream.getDescriptor(),
- t -> new FlightHolder(allocator, t, flightStream.getSchema()));
+ t -> new FlightHolder(allocator, t, flightStream.getSchema(), flightStream.getDictionaryProvider()));
creator = h.addStream(flightStream.getSchema());
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/example/Stream.java b/java/flight/src/main/java/org/apache/arrow/flight/example/Stream.java
index b79525caab1fb..1139e8c592f7d 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/example/Stream.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/example/Stream.java
@@ -29,6 +29,7 @@
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -41,6 +42,7 @@
public class Stream implements AutoCloseable, Iterable {
private final String uuid = UUID.randomUUID().toString();
+ private final DictionaryProvider dictionaryProvider;
private final List batches;
private final Schema schema;
private final long recordCount;
@@ -54,9 +56,11 @@ public class Stream implements AutoCloseable, Iterable {
*/
public Stream(
final Schema schema,
+ final DictionaryProvider dictionaryProvider,
List batches,
long recordCount) {
this.schema = schema;
+ this.dictionaryProvider = dictionaryProvider;
this.batches = ImmutableList.copyOf(batches);
this.recordCount = recordCount;
}
@@ -83,7 +87,7 @@ public String getUuid() {
*/
public void sendTo(BufferAllocator allocator, ServerStreamListener listener) {
try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
- listener.start(root);
+ listener.start(root, dictionaryProvider);
final VectorLoader loader = new VectorLoader(root);
int counter = 0;
for (ArrowRecordBatch batch : batches) {
@@ -121,18 +125,22 @@ public static class StreamCreator {
private final List batches = new ArrayList<>();
private final Consumer committer;
private long recordCount = 0;
+ private DictionaryProvider dictionaryProvider;
/**
* Creates a new instance.
*
* @param schema The schema for batches in the stream.
+ * @param dictionaryProvider The dictionary provider for the stream.
* @param allocator The allocator used to copy data permanently into the stream.
* @param committer A callback for when the the stream is ready to be finalized (no more batches).
*/
- public StreamCreator(Schema schema, BufferAllocator allocator, Consumer committer) {
+ public StreamCreator(Schema schema, DictionaryProvider dictionaryProvider,
+ BufferAllocator allocator, Consumer committer) {
this.allocator = allocator;
this.committer = committer;
this.schema = schema;
+ this.dictionaryProvider = dictionaryProvider;
}
/**
@@ -155,7 +163,7 @@ public void add(ArrowRecordBatch batch) {
* Complete building the stream (no more batches can be added).
*/
public void complete() {
- Stream stream = new Stream(schema, batches, recordCount);
+ Stream stream = new Stream(schema, dictionaryProvider, batches, recordCount);
committer.accept(stream);
}
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java b/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java
index 47acbe9f6a46e..75a01f8633cc3 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java
@@ -96,27 +96,28 @@ private void run(String[] args) throws ParseException, IOException {
jsonRoot = VectorSchemaRoot.create(root.getSchema(), allocator);
VectorUnloader unloader = new VectorUnloader(root);
VectorLoader jsonLoader = new VectorLoader(jsonRoot);
- FlightClient.ClientStreamListener stream = client.startPut(descriptor, root, new StreamListener() {
- int counter = 0;
-
- @Override
- public void onNext(PutResult val) {
- final String metadata = StandardCharsets.UTF_8.decode(val.getApplicationMetadata()).toString();
- if (!Integer.toString(counter).equals(metadata)) {
- throw new RuntimeException(
- String.format("Invalid ACK from server. Expected '%d' but got '%s'.", counter, metadata));
- }
- counter++;
- }
-
- @Override
- public void onError(Throwable t) {
- }
-
- @Override
- public void onCompleted() {
- }
- });
+ FlightClient.ClientStreamListener stream = client.startPut(descriptor, root, reader,
+ new StreamListener() {
+ int counter = 0;
+
+ @Override
+ public void onNext(PutResult val) {
+ final String metadata = StandardCharsets.UTF_8.decode(val.getApplicationMetadata()).toString();
+ if (!Integer.toString(counter).equals(metadata)) {
+ throw new RuntimeException(
+ String.format("Invalid ACK from server. Expected '%d' but got '%s'.", counter, metadata));
+ }
+ counter++;
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ });
int counter = 0;
while (reader.read(root)) {
stream.putNext(Integer.toString(counter).getBytes(StandardCharsets.UTF_8));