diff --git a/connection/TypeDBTransactionImpl.java b/connection/TypeDBTransactionImpl.java index 645c04676a..453465b8e7 100644 --- a/connection/TypeDBTransactionImpl.java +++ b/connection/TypeDBTransactionImpl.java @@ -39,6 +39,7 @@ import io.grpc.StatusRuntimeException; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.TRANSACTION_CLOSED; @@ -124,9 +125,9 @@ public Stream stream(Req.Builder request) { } private void throwTransactionClosed() { - List errors = bidirectionalStream.getErrors(); - if (errors.isEmpty()) throw new TypeDBClientException(TRANSACTION_CLOSED); - else throw new TypeDBClientException(TRANSACTION_CLOSED_WITH_ERRORS, errors); + Optional error = bidirectionalStream.getError(); + if (error.isPresent()) throw new TypeDBClientException(TRANSACTION_CLOSED_WITH_ERRORS, error.get()); + else throw new TypeDBClientException(TRANSACTION_CLOSED); } @Override diff --git a/stream/BidirectionalStream.java b/stream/BidirectionalStream.java index d0190b4f07..611aa4dc83 100644 --- a/stream/BidirectionalStream.java +++ b/stream/BidirectionalStream.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; @@ -53,6 +54,7 @@ public class BidirectionalStream implements AutoCloseable { private final ResponseCollector resPartCollector; private final RequestTransmitter.Dispatcher dispatcher; private final AtomicBoolean isOpen; + private StatusRuntimeException error; public BidirectionalStream(TypeDBStub stub, RequestTransmitter transmitter) { resPartCollector = new ResponseCollector<>(); @@ -60,6 +62,7 @@ public BidirectionalStream(TypeDBStub stub, RequestTransmitter transmitter) { isOpen = new AtomicBoolean(false); dispatcher = transmitter.dispatcher(stub.transaction(new ResponseObserver())); isOpen.set(true); + error = null; } public Single single(Req.Builder request, boolean batch) { @@ -108,6 +111,7 @@ public void close() { private void close(@Nullable StatusRuntimeException error) { if (isOpen.compareAndSet(true, false)) { + this.error = error; resCollector.close(error); resPartCollector.close(error); try { @@ -126,10 +130,8 @@ void iteratorDone(UUID requestID) { resPartCollector.remove(requestID); } - public List getErrors() { - List errors = new ArrayList<>(resCollector.getErrors()); - errors.addAll(resPartCollector.getErrors()); - return errors; + public Optional getError() { + return Optional.ofNullable(error); } RequestTransmitter.Dispatcher dispatcher() { diff --git a/stream/ResponseCollector.java b/stream/ResponseCollector.java index 32bfa8b352..440aec3ee1 100644 --- a/stream/ResponseCollector.java +++ b/stream/ResponseCollector.java @@ -65,12 +65,6 @@ synchronized void close(@Nullable StatusRuntimeException error) { collectors.values().forEach(collector -> collector.close(error)); } - List getErrors() { - List errors = new ArrayList<>(); - collectors.values().forEach(collector -> collector.getError().ifPresent(errors::add)); - return errors; - } - public static class Queue { private final BlockingQueue, Done>> responseQueue; @@ -93,10 +87,6 @@ public R take() { } } - public Optional getError() { - return Optional.ofNullable(error); - } - public void put(R response) { responseQueue.add(Either.first(new Response<>(response))); } diff --git a/stream/ResponsePartIterator.java b/stream/ResponsePartIterator.java index 1df78360da..aeb860bb46 100644 --- a/stream/ResponsePartIterator.java +++ b/stream/ResponsePartIterator.java @@ -92,8 +92,11 @@ public boolean hasNext() { @Override public TransactionProto.Transaction.ResPart next() { - if (!hasNext()) throw new NoSuchElementException(); - state = State.EMPTY; - return next; + if (stream.getError().isPresent()) throw TypeDBClientException.of(stream.getError().get()); + else if (!hasNext()) throw new NoSuchElementException(); + else { + state = State.EMPTY; + return next; + } } }