Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't delete response collectors in a transaction #369

Merged
merged 3 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/exception/ErrorMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static class Client extends ErrorMessage {
public static final Client MISSING_RESPONSE =
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new Client(9, "Unexpected empty response for request ID '%s'.");
public static final Client UNKNOWN_REQUEST_ID =
new Client(10, "Received a response with unknown request id '%s'.");
new Client(10, "Received a response with unknown request id '%s':\n%s");
public static final Client CLUSTER_NO_PRIMARY_REPLICA_YET =
new Client(11, "No replica has been marked as the primary replica for latest known term '%d'.");
public static final Client CLUSTER_UNABLE_TO_CONNECT =
Expand Down
26 changes: 5 additions & 21 deletions stream/BidirectionalStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import io.grpc.stub.StreamObserver;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -71,7 +69,7 @@ public Single<Res> single(Req.Builder request, boolean batch) {
ResponseCollector.Queue<Res> queue = resCollector.queue(requestID);
if (batch) dispatcher.dispatch(req);
else dispatcher.dispatchNow(req);
return new Single<>(requestID, queue, this);
return new Single<>(queue);
}

public Stream<ResPart> stream(Req.Builder request) {
Expand All @@ -90,14 +88,14 @@ private void collect(Res res) {
UUID requestID = byteStringAsUUID(res.getReqId());
ResponseCollector.Queue<Res> collector = resCollector.get(requestID);
if (collector != null) collector.put(res);
else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID);
else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID, res);
}

private void collect(ResPart resPart) {
UUID requestID = byteStringAsUUID(resPart.getReqId());
ResponseCollector.Queue<ResPart> collector = resPartCollector.get(requestID);
if (collector != null) collector.put(resPart);
else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID);
else throw new TypeDBClientException(UNKNOWN_REQUEST_ID, requestID, resPart);
}

private static UUID byteStringAsUUID(ByteString byteString) {
Expand All @@ -122,14 +120,6 @@ private void close(@Nullable StatusRuntimeException error) {
}
}

void singleDone(UUID requestID) {
resCollector.remove(requestID);
}

void iteratorDone(UUID requestID) {
resPartCollector.remove(requestID);
}

public Optional<StatusRuntimeException> getError() {
return Optional.ofNullable(error);
}
Expand All @@ -140,20 +130,14 @@ RequestTransmitter.Dispatcher dispatcher() {

public static class Single<T> {

private final UUID requestID;
private final BidirectionalStream stream;
private final ResponseCollector.Queue<T> queue;

public Single(UUID requestID, ResponseCollector.Queue<T> queue, BidirectionalStream stream) {
this.requestID = requestID;
public Single(ResponseCollector.Queue<T> queue) {
this.queue = queue;
this.stream = stream;
}

public T get() {
T value = queue.take();
stream.singleDone(requestID);
return value;
return queue.take();
}
}

Expand Down
7 changes: 0 additions & 7 deletions stream/ResponseCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
import io.grpc.StatusRuntimeException;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -57,10 +54,6 @@ Queue<R> get(UUID requestId) {
return collectors.get(requestId);
}

void remove(UUID requestID) {
this.collectors.remove(requestID);
}

synchronized void close(@Nullable StatusRuntimeException error) {
collectors.values().forEach(collector -> collector.close(error));
}
Expand Down
1 change: 0 additions & 1 deletion stream/ResponsePartIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ private boolean fetchAndCheck() {
case STREAM_RES_PART:
switch (resPart.getStreamResPart().getState()) {
case DONE:
stream.iteratorDone(requestID);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the bidirectional stream in this class at all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm!!

state = State.DONE;
return false;
case CONTINUE:
Expand Down
20 changes: 20 additions & 0 deletions test/integration/ClientQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.vaticle.typedb.client.api.TypeDBSession;
import com.vaticle.typedb.client.api.TypeDBTransaction;
import com.vaticle.typedb.client.api.answer.ConceptMap;
import com.vaticle.typedb.client.api.concept.type.AttributeType;
import com.vaticle.typedb.client.api.concept.type.EntityType;
import com.vaticle.typedb.client.api.logic.Explanation;
import com.vaticle.typedb.common.test.server.TypeDBCoreRunner;
import com.vaticle.typeql.lang.TypeQL;
Expand All @@ -42,6 +44,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -518,6 +521,23 @@ public void testSimpleExplanation() {
}, READ, TypeDBOptions.core().infer(true).explain(true));
}

@Test
public void testStreaming() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With these options (the default in TypeDB at time of writing), the server may respond with: 50 answers -> CONTINUE -> 1 answer [compensating for latency] -> DONE. The client will respond to CONTINUE with STREAM to keep iterating, and the server responds to STREAM with a 2nd DONE message.

This is expected and should be handled correctly (ie: ignored) by the client.

localhostTypeDBTX(tx -> {
for (int i = 0; i < 51; i++) {
tx.query().define(String.format("define person sub entity, owns name%d; name%d sub attribute, value string;", i, i));
}
tx.commit();
}, TypeDBSession.Type.SCHEMA);
localhostTypeDBTX(tx -> {
for (int i = 0; i < 50; i++) {
EntityType.Remote concept = tx.concepts().getEntityType("person").asRemote(tx);
List<? extends AttributeType> attributeTypes = concept.getOwns(false).collect(toList());
Optional<ConceptMap> conceptMap = tx.query().match("match $x sub thing; limit 1;").findFirst();
}
}, READ, TypeDBOptions.core().prefetch(true).prefetchSize(50));
}

private String[] lionNames() {
return new String[]{"male-partner", "female-partner", "young-lion"};
}
Expand Down