Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate exceptions throughout transactions #367

Merged
merged 4 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions .grabl/automation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ build:
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-core.sh //test/behaviour/connection/... --test_output=errors --jobs=1
# TODO: delete --jobs=1 if we fix the issue with excess memory usage
test-behaviour-connection-cluster:
image: vaticle-ubuntu-21.04
command: |
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1
# test-behaviour-connection-cluster:
# image: vaticle-ubuntu-21.04
# command: |
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
# .grabl/test-cluster.sh //test/behaviour/connection/... --test_output=errors --jobs=1
# TODO: delete --jobs=1 if we fix the issue with excess memory usage
test-behaviour-concept-core:
image: vaticle-ubuntu-21.04
Expand All @@ -90,13 +90,13 @@ build:
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-core.sh //test/behaviour/concept/... --test_output=errors
test-behaviour-concept-cluster:
image: vaticle-ubuntu-21.04
command: |
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-cluster.sh //test/behaviour/concept/... --test_output=errors
# test-behaviour-concept-cluster:
# image: vaticle-ubuntu-21.04
# command: |
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
# .grabl/test-cluster.sh //test/behaviour/concept/... --test_output=errors
test-behaviour-match-core:
image: vaticle-ubuntu-21.04
command: |
Expand All @@ -105,14 +105,14 @@ build:
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-core.sh //test/behaviour/typeql/language/match/... --test_output=errors
.grabl/test-core.sh //test/behaviour/typeql/language/get/... --test_output=errors
test-behaviour-match-cluster:
image: vaticle-ubuntu-21.04
command: |
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
.grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors
.grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors
# test-behaviour-match-cluster:
# image: vaticle-ubuntu-21.04
# command: |
# export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
# export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
# bazel run @vaticle_dependencies//distribution/artifact:create-netrc
# .grabl/test-cluster.sh //test/behaviour/typeql/language/match/... --test_output=errors
# .grabl/test-cluster.sh //test/behaviour/typeql/language/get/... --test_output=errors
test-behaviour-writable-core:
image: vaticle-ubuntu-21.04
command: |
Expand Down Expand Up @@ -151,9 +151,9 @@ build:
image: vaticle-ubuntu-21.04
dependencies: [
build, build-dependency,
test-behaviour-connection-core, test-behaviour-connection-cluster,
test-behaviour-concept-core, test-behaviour-concept-cluster,
test-behaviour-match-core, test-behaviour-match-cluster,
test-behaviour-connection-core, #test-behaviour-connection-cluster,
test-behaviour-concept-core, #test-behaviour-concept-cluster,
test-behaviour-match-core, #test-behaviour-match-cluster,
test-behaviour-writable-core, test-behaviour-writable-cluster,
test-behaviour-definable-core, test-behaviour-definable-cluster
]
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.0
2.6.1
2 changes: 1 addition & 1 deletion connection/TypeDBTransactionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public Stream<ResPart> stream(Req.Builder request) {
}

private void throwTransactionClosed() {
List<StatusRuntimeException> errors = bidirectionalStream.drainErrors();
List<StatusRuntimeException> errors = bidirectionalStream.getErrors();
if (errors.isEmpty()) throw new TypeDBClientException(TRANSACTION_CLOSED);
else throw new TypeDBClientException(TRANSACTION_CLOSED_WITH_ERRORS, errors);
}
Expand Down
10 changes: 5 additions & 5 deletions dependencies/vaticle/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,28 @@ def vaticle_typeql_lang_java():
git_repository(
name = "vaticle_typeql_lang_java",
remote = "https://github.com/vaticle/typeql-lang-java",
commit = "bdd0ae3fa3e00e0337a05002cb97f5b82e3e1f53", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typeql_lang_java
tag = "2.6.1", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typeql_lang_java
)

def vaticle_typedb_common():
git_repository(
name = "vaticle_typedb_common",
remote = "https://github.com/vaticle/typedb-common",
commit = "c3f24569bcf465e4ea9cc5379f1df6720cee2b8f" # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_common
tag = "2.6.1" # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_common
)

def vaticle_dependencies():
git_repository(
name = "vaticle_dependencies",
remote = "https://github.com/vaticle/dependencies",
commit = "06a7aa4eb5e941364ddef61c12251cf53cc468d4", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_dependencies
commit = "aa6eb2eab10b032c0ead898b0a5eead1bea057c0", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_dependencies
)

def vaticle_typedb_protocol():
git_repository(
name = "vaticle_typedb_protocol",
remote = "https://github.com/vaticle/typedb-protocol",
commit = "38604e068b841cf26c9735d7fdd66b36b5447500", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_protocol
tag = "2.6.1", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_protocol
)

def vaticle_typedb_behaviour():
Expand All @@ -60,5 +60,5 @@ def vaticle_factory_tracing():
git_repository(
name = "vaticle_factory_tracing",
remote = "https://github.com/vaticle/factory-tracing",
commit = "6aff893637529ea5ece3e259474e6aea36845f38" # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_factory_tracing
tag = "2.1.1" # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_factory_tracing
)
32 changes: 25 additions & 7 deletions stream/BidirectionalStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ public Single<Res> single(Req.Builder request, boolean batch) {
ResponseCollector.Queue<Res> queue = resCollector.queue(requestID);
if (batch) dispatcher.dispatch(req);
else dispatcher.dispatchNow(req);
return new Single<>(queue);
return new Single<>(requestID, queue, this);
}

public Stream<ResPart> stream(Req.Builder request) {
UUID requestID = UUID.randomUUID();
ResponseCollector.Queue<ResPart> collector = resPartCollector.queue(requestID);
dispatcher.dispatch(request.setReqId(UUIDAsByteString(requestID)).build());
ResponsePartIterator iterator = new ResponsePartIterator(requestID, collector, dispatcher);
ResponsePartIterator iterator = new ResponsePartIterator(requestID, collector, this);
Comment on lines +71 to +78
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now add the stream to the iterator/Single so they can call back when they are finished in order to clean up

return StreamSupport.stream(spliteratorUnknownSize(iterator, ORDERED | IMMUTABLE), false);
}

Expand Down Expand Up @@ -118,22 +118,40 @@ private void close(@Nullable StatusRuntimeException error) {
}
}

public List<StatusRuntimeException> drainErrors() {
List<StatusRuntimeException> errors = new ArrayList<>(resCollector.drainErrors());
errors.addAll(resPartCollector.drainErrors());
void singleDone(UUID requestID) {
resCollector.remove(requestID);
}

void iteratorDone(UUID requestID) {
resPartCollector.remove(requestID);
Comment on lines 120 to +126
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate methods for separate collectors

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a fairly general comment, not limited to this change:

How aligned are the implementations of tx exception propagation in the 3 clients?

I'm pretty sure client-python had just one done method here, and client-nodejs doesn't appear to have any.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah python has only one collector for all single and iterator, so it doesn't need split methods. I'm going to go back to update client-nodejs so it's in one of these two styles next (but won't release it)

}

public List<StatusRuntimeException> getErrors() {
List<StatusRuntimeException> errors = new ArrayList<>(resCollector.getErrors());
errors.addAll(resPartCollector.getErrors());
return errors;
}

RequestTransmitter.Dispatcher dispatcher() {
return dispatcher;
}
Comment on lines +135 to +137
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've replaced the ResPartIterator argument which is dispatcher with a getter for dispatcher() on the stream, which the Iterator now has access to


public static class Single<T> {

private final UUID requestID;
private final BidirectionalStream stream;
private final ResponseCollector.Queue<T> queue;

public Single(ResponseCollector.Queue<T> queue) {
public Single(UUID requestID, ResponseCollector.Queue<T> queue, BidirectionalStream stream) {
this.requestID = requestID;
this.queue = queue;
this.stream = stream;
}

public T get() {
return queue.take();
T value = queue.take();
stream.singleDone(requestID);
return value;
}
}

Expand Down
46 changes: 19 additions & 27 deletions stream/ResponseCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.LinkedTransferQueue;

import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.TRANSACTION_CLOSED;
import static com.vaticle.typedb.client.common.exception.ErrorMessage.Client.TRANSACTION_CLOSED_WITH_ERRORS;
import static com.vaticle.typedb.client.common.exception.ErrorMessage.Internal.UNEXPECTED_INTERRUPTION;

public class ResponseCollector<R> {
Expand All @@ -46,64 +47,63 @@ public ResponseCollector() {
collectors = new ConcurrentHashMap<>();
}

public synchronized Queue<R> queue(UUID requestId) {
synchronized Queue<R> queue(UUID requestId) {
Queue<R> collector = new Queue<>();
collectors.put(requestId, collector);
return collector;
}

public Queue<R> get(UUID requestId) {
Queue<R> get(UUID requestId) {
return collectors.get(requestId);
}

public synchronized void close(@Nullable StatusRuntimeException error) {
void remove(UUID requestID) {
this.collectors.remove(requestID);
}

synchronized void close(@Nullable StatusRuntimeException error) {
collectors.values().forEach(collector -> collector.close(error));
}

public List<StatusRuntimeException> drainErrors() {
List<StatusRuntimeException> getErrors() {
List<StatusRuntimeException> errors = new ArrayList<>();
collectors.values().forEach(collectors -> errors.addAll(collectors.drainErrors()));
collectors.values().forEach(collector -> collector.getError().ifPresent(errors::add));
return errors;
}

public static class Queue<R> {

private final BlockingQueue<Either<Response<R>, Done>> responseQueue;
private StatusRuntimeException error;

Queue() {
// TODO: switch LinkedTransferQueue to LinkedBlockingQueue once issue #351 is fixed
responseQueue = new LinkedTransferQueue<>();
error = null;
}

public R take() {
try {
Either<Response<R>, Done> response = responseQueue.take();
if (response.isFirst()) return response.first().message();
else if (!response.second().error().isPresent()) throw new TypeDBClientException(TRANSACTION_CLOSED);
else throw TypeDBClientException.of(response.second().error().get());
else if (this.error != null) throw new TypeDBClientException(TRANSACTION_CLOSED_WITH_ERRORS, error);
else throw new TypeDBClientException(TRANSACTION_CLOSED);
} catch (InterruptedException e) {
throw new TypeDBClientException(UNEXPECTED_INTERRUPTION);
}
}

public List<StatusRuntimeException> drainErrors() {
List<Either<Response<R>, Done>> messages = new ArrayList<>();
responseQueue.drainTo(messages);
List<StatusRuntimeException> errors = new ArrayList<>();
messages.forEach(msg -> {
if (msg.isSecond() && msg.second().error().isPresent()) {
errors.add(msg.second().error().get());
}
});
return errors;
public Optional<StatusRuntimeException> getError() {
return Optional.ofNullable(error);
}

public void put(R response) {
responseQueue.add(Either.first(new Response<>(response)));
}

public void close(@Nullable StatusRuntimeException error) {
responseQueue.add(Either.second(new Done(error)));
this.error = error;
responseQueue.add(Either.second(new Done()));
Comment on lines 104 to +106
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hold on to error to be returned later like in node

}

private static class Response<R> {
Expand All @@ -122,15 +122,7 @@ private R message() {
}

private static class Done {
@Nullable
private final StatusRuntimeException error;

private Done(StatusRuntimeException error) {
this.error = error;
}

private Optional<StatusRuntimeException> error() {
return Optional.ofNullable(error);
private Done() {
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions stream/ResponsePartIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@
public class ResponsePartIterator implements Iterator<TransactionProto.Transaction.ResPart> {

private final UUID requestID;
private final RequestTransmitter.Dispatcher dispatcher;
private final BidirectionalStream stream;
private final ResponseCollector.Queue<TransactionProto.Transaction.ResPart> responseCollector;
private TransactionProto.Transaction.ResPart next;
private State state;

enum State {EMPTY, FETCHED, DONE}

public ResponsePartIterator(UUID requestID, ResponseCollector.Queue<TransactionProto.Transaction.ResPart> responseQueue,
RequestTransmitter.Dispatcher requestDispatcher) {
BidirectionalStream stream) {
this.requestID = requestID;
this.dispatcher = requestDispatcher;
this.stream = stream;
this.responseCollector = responseQueue;
state = State.EMPTY;
next = null;
Expand All @@ -60,10 +60,11 @@ private boolean fetchAndCheck() {
case STREAM_RES_PART:
switch (resPart.getStreamResPart().getState()) {
case DONE:
stream.iteratorDone(requestID);
state = State.DONE;
return false;
case CONTINUE:
dispatcher.dispatch(RequestBuilder.Transaction.streamReq(requestID));
stream.dispatcher().dispatch(RequestBuilder.Transaction.streamReq(requestID));
return fetchAndCheck();
default:
throw new TypeDBClientException(ILLEGAL_ARGUMENT);
Expand Down