Skip to content

Commit

Permalink
Query iterator only throws exeption once (#372)
Browse files Browse the repository at this point in the history
## What is the goal of this PR?

We revert previous changes from #368 and #364, which made query queues and iterators throw the same error idempotently if there was one. However, this goes counter to standard usages of iterators and queues, which are not meant to behave idempotently (each item is only returned once, and if they have an error they should no longer be used). 

## What are the changes implemented in this PR?

* remove idempotent error state of collectors and queues, which back query iterators
  • Loading branch information
flyingsilverfin authored Mar 14, 2022
1 parent ed90afc commit 3b8b5f8
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 16 deletions.
1 change: 0 additions & 1 deletion connection/TypeDBTransactionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class TypeDBTransactionImpl implements TypeDBTransaction.Extended {
private final QueryManager queryMgr;

private final BidirectionalStream bidirectionalStream;

TypeDBTransactionImpl(TypeDBSessionImpl session, ByteString sessionId, Type type, TypeDBOptions options) {
this.type = type;
this.options = options;
Expand Down
2 changes: 1 addition & 1 deletion stream/BidirectionalStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Stream<ResPart> stream(Req.Builder request) {
UUID requestID = UUID.randomUUID();
ResponseCollector.Queue<ResPart> collector = resPartCollector.queue(requestID);
dispatcher.dispatch(request.setReqId(UUIDAsByteString(requestID)).build());
ResponsePartIterator iterator = new ResponsePartIterator(requestID, collector, this);
ResponsePartIterator iterator = new ResponsePartIterator(requestID, collector, dispatcher);
return StreamSupport.stream(spliteratorUnknownSize(iterator, ORDERED | IMMUTABLE), false);
}

Expand Down
20 changes: 12 additions & 8 deletions stream/ResponseCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import io.grpc.StatusRuntimeException;

import javax.annotation.Nullable;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedTransferQueue;

import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.TRANSACTION_CLOSED;
import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.TRANSACTION_CLOSED_WITH_ERRORS;
import static com.vaticle.typedb.client.common.exception.ErrorMessage.Internal.UNEXPECTED_INTERRUPTION;

public class ResponseCollector<R> {
Expand Down Expand Up @@ -61,20 +61,18 @@ synchronized void close(@Nullable StatusRuntimeException error) {
public static class Queue<R> {

private final BlockingQueue<Either<Response<R>, Done>> responseQueue;
private StatusRuntimeException error;

Queue() {
// TODO: switch LinkedTransferQueue to LinkedBlockingQueue once issue #351 is fixed
responseQueue = new LinkedTransferQueue<>();
error = null;
}

public R take() {
try {
Either<Response<R>, Done> response = responseQueue.take();
if (response.isFirst()) return response.first().message();
else if (this.error != null) throw new TypeDBClientException(TRANSACTION_CLOSED_WITH_ERRORS, error);
else throw new TypeDBClientException(TRANSACTION_CLOSED);
else if (!response.second().error().isPresent()) throw new TypeDBClientException(TRANSACTION_CLOSED);
else throw TypeDBClientException.of(response.second().error().get());
} catch (InterruptedException e) {
throw new TypeDBClientException(UNEXPECTED_INTERRUPTION);
}
Expand All @@ -85,8 +83,7 @@ public void put(R response) {
}

public void close(@Nullable StatusRuntimeException error) {
this.error = error;
responseQueue.add(Either.second(new Done()));
responseQueue.add(Either.second(new Done(error)));
}

private static class Response<R> {
Expand All @@ -105,7 +102,14 @@ private R message() {
}

private static class Done {
private Done() {
private final StatusRuntimeException error;

private Done(@Nullable StatusRuntimeException error) {
this.error = error;
}

private Optional<StatusRuntimeException> error() {
return Optional.ofNullable(error);
}
}
}
Expand Down
11 changes: 5 additions & 6 deletions stream/ResponsePartIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@
public class ResponsePartIterator implements Iterator<TransactionProto.Transaction.ResPart> {

private final UUID requestID;
private final BidirectionalStream stream;
private final RequestTransmitter.Dispatcher dispatcher;
private final ResponseCollector.Queue<TransactionProto.Transaction.ResPart> responseCollector;
private TransactionProto.Transaction.ResPart next;
private State state;

enum State {EMPTY, FETCHED, DONE}

public ResponsePartIterator(UUID requestID, ResponseCollector.Queue<TransactionProto.Transaction.ResPart> responseQueue,
BidirectionalStream stream) {
RequestTransmitter.Dispatcher dispatcher) {
this.requestID = requestID;
this.stream = stream;
this.responseCollector = responseQueue;
this.dispatcher = dispatcher;
state = State.EMPTY;
next = null;
}
Expand All @@ -63,7 +63,7 @@ private boolean fetchAndCheck() {
state = State.DONE;
return false;
case CONTINUE:
stream.dispatcher().dispatch(RequestBuilder.Transaction.streamReq(requestID));
dispatcher.dispatch(RequestBuilder.Transaction.streamReq(requestID));
return fetchAndCheck();
default:
throw new TypeDBClientException(ILLEGAL_ARGUMENT);
Expand Down Expand Up @@ -91,8 +91,7 @@ public boolean hasNext() {

@Override
public TransactionProto.Transaction.ResPart next() {
if (stream.getError().isPresent()) throw TypeDBClientException.of(stream.getError().get());
else if (!hasNext()) throw new NoSuchElementException();
if (!hasNext()) throw new NoSuchElementException();
else {
state = State.EMPTY;
return next;
Expand Down

0 comments on commit 3b8b5f8

Please sign in to comment.