From 3b8b5f8fcc440b032de62e6b4e5673e1397b89dc Mon Sep 17 00:00:00 2001 From: Joshua Send Date: Mon, 14 Mar 2022 11:50:36 +0000 Subject: [PATCH] Query iterator only throws exeption once (#372) ## What is the goal of this PR? We revert previous changes from #368 and #364, which made query queues and iterators throw the same error idempotently if there was one. However, this goes counter to standard usages of iterators and queues, which are not meant to behave idempotently (each item is only returned once, and if they have an error they should no longer be used). ## What are the changes implemented in this PR? * remove idempotent error state of collectors and queues, which back query iterators --- connection/TypeDBTransactionImpl.java | 1 - stream/BidirectionalStream.java | 2 +- stream/ResponseCollector.java | 20 ++++++++++++-------- stream/ResponsePartIterator.java | 11 +++++------ 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/connection/TypeDBTransactionImpl.java b/connection/TypeDBTransactionImpl.java index 9a9606a83c..946a934a05 100644 --- a/connection/TypeDBTransactionImpl.java +++ b/connection/TypeDBTransactionImpl.java @@ -56,7 +56,6 @@ public class TypeDBTransactionImpl implements TypeDBTransaction.Extended { private final QueryManager queryMgr; private final BidirectionalStream bidirectionalStream; - TypeDBTransactionImpl(TypeDBSessionImpl session, ByteString sessionId, Type type, TypeDBOptions options) { this.type = type; this.options = options; diff --git a/stream/BidirectionalStream.java b/stream/BidirectionalStream.java index ede0a2530e..7c50db3e8e 100644 --- a/stream/BidirectionalStream.java +++ b/stream/BidirectionalStream.java @@ -76,7 +76,7 @@ public Stream stream(Req.Builder request) { UUID requestID = UUID.randomUUID(); ResponseCollector.Queue collector = resPartCollector.queue(requestID); dispatcher.dispatch(request.setReqId(UUIDAsByteString(requestID)).build()); - ResponsePartIterator iterator = new ResponsePartIterator(requestID, collector, this); + ResponsePartIterator iterator = new ResponsePartIterator(requestID, collector, dispatcher); return StreamSupport.stream(spliteratorUnknownSize(iterator, ORDERED | IMMUTABLE), false); } diff --git a/stream/ResponseCollector.java b/stream/ResponseCollector.java index 88384e4105..e5ab841e5e 100644 --- a/stream/ResponseCollector.java +++ b/stream/ResponseCollector.java @@ -26,6 +26,7 @@ import io.grpc.StatusRuntimeException; import javax.annotation.Nullable; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -33,7 +34,6 @@ import java.util.concurrent.LinkedTransferQueue; import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.TRANSACTION_CLOSED; -import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.TRANSACTION_CLOSED_WITH_ERRORS; import static com.vaticle.typedb.client.common.exception.ErrorMessage.Internal.UNEXPECTED_INTERRUPTION; public class ResponseCollector { @@ -61,20 +61,18 @@ synchronized void close(@Nullable StatusRuntimeException error) { public static class Queue { private final BlockingQueue, Done>> responseQueue; - private StatusRuntimeException error; Queue() { // TODO: switch LinkedTransferQueue to LinkedBlockingQueue once issue #351 is fixed responseQueue = new LinkedTransferQueue<>(); - error = null; } public R take() { try { Either, Done> response = responseQueue.take(); if (response.isFirst()) return response.first().message(); - else if (this.error != null) throw new TypeDBClientException(TRANSACTION_CLOSED_WITH_ERRORS, error); - else throw new TypeDBClientException(TRANSACTION_CLOSED); + else if (!response.second().error().isPresent()) throw new TypeDBClientException(TRANSACTION_CLOSED); + else throw TypeDBClientException.of(response.second().error().get()); } catch (InterruptedException e) { throw new TypeDBClientException(UNEXPECTED_INTERRUPTION); } @@ -85,8 +83,7 @@ public void put(R response) { } public void close(@Nullable StatusRuntimeException error) { - this.error = error; - responseQueue.add(Either.second(new Done())); + responseQueue.add(Either.second(new Done(error))); } private static class Response { @@ -105,7 +102,14 @@ private R message() { } private static class Done { - private Done() { + private final StatusRuntimeException error; + + private Done(@Nullable StatusRuntimeException error) { + this.error = error; + } + + private Optional error() { + return Optional.ofNullable(error); } } } diff --git a/stream/ResponsePartIterator.java b/stream/ResponsePartIterator.java index af71d9b4a0..1269553c2d 100644 --- a/stream/ResponsePartIterator.java +++ b/stream/ResponsePartIterator.java @@ -36,7 +36,7 @@ public class ResponsePartIterator implements Iterator { private final UUID requestID; - private final BidirectionalStream stream; + private final RequestTransmitter.Dispatcher dispatcher; private final ResponseCollector.Queue responseCollector; private TransactionProto.Transaction.ResPart next; private State state; @@ -44,10 +44,10 @@ public class ResponsePartIterator implements Iterator responseQueue, - BidirectionalStream stream) { + RequestTransmitter.Dispatcher dispatcher) { this.requestID = requestID; - this.stream = stream; this.responseCollector = responseQueue; + this.dispatcher = dispatcher; state = State.EMPTY; next = null; } @@ -63,7 +63,7 @@ private boolean fetchAndCheck() { state = State.DONE; return false; case CONTINUE: - stream.dispatcher().dispatch(RequestBuilder.Transaction.streamReq(requestID)); + dispatcher.dispatch(RequestBuilder.Transaction.streamReq(requestID)); return fetchAndCheck(); default: throw new TypeDBClientException(ILLEGAL_ARGUMENT); @@ -91,8 +91,7 @@ public boolean hasNext() { @Override public TransactionProto.Transaction.ResPart next() { - if (stream.getError().isPresent()) throw TypeDBClientException.of(stream.getError().get()); - else if (!hasNext()) throw new NoSuchElementException(); + if (!hasNext()) throw new NoSuchElementException(); else { state = State.EMPTY; return next;