Skip to content

Commit

Permalink
Update the new reactive API to use Flow API (#1295)
Browse files Browse the repository at this point in the history
The new reactive API should use Java Flow API, the legacy deprecated reactive API stays on Reactive Streams API.
  • Loading branch information
injectives authored Sep 1, 2022
1 parent 8c54468 commit e21be5c
Show file tree
Hide file tree
Showing 36 changed files with 205 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public AbstractReactiveSession(NetworkSession session) {

abstract Publisher<Void> closeTransaction(S transaction, boolean commit);

public Publisher<S> beginTransaction(TransactionConfig config) {
Publisher<S> doBeginTransaction(TransactionConfig config) {
return createSingleItemPublisher(
() -> {
CompletableFuture<S> txFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -115,7 +115,7 @@ public Set<Bookmark> lastBookmarks() {
return session.lastBookmarks();
}

public <T> Publisher<T> close() {
<T> Publisher<T> doClose() {
return createEmptyPublisher(session::closeAsync);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ protected AbstractReactiveTransaction(UnmanagedTransaction tx) {
this.tx = tx;
}

public <T> Publisher<T> commit() {
<T> Publisher<T> doCommit() {
return createEmptyPublisher(tx::commitAsync);
}

public <T> Publisher<T> rollback() {
<T> Publisher<T> doRollback() {
return createEmptyPublisher(tx::rollbackAsync);
}

public Publisher<Void> close() {
Publisher<Void> doClose() {
return close(false);
}

public Publisher<Boolean> isOpen() {
Publisher<Boolean> doIsOpen() {
return Mono.just(tx.isOpen());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.neo4j.driver.internal.reactive;

import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;

import java.util.Map;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
Expand All @@ -27,7 +30,6 @@
import org.neo4j.driver.internal.value.MapValue;
import org.neo4j.driver.reactive.ReactiveQueryRunner;
import org.neo4j.driver.reactive.ReactiveResult;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

interface BaseReactiveQueryRunner extends ReactiveQueryRunner {
Expand All @@ -37,7 +39,7 @@ default Publisher<ReactiveResult> run(String queryStr, Value parameters) {
Query query = new Query(queryStr, parameters);
return run(query);
} catch (Throwable t) {
return Mono.error(t);
return publisherToFlowPublisher(Mono.error(t));
}
}

Expand All @@ -57,7 +59,7 @@ default Publisher<ReactiveResult> run(String queryStr) {
Query query = new Query(queryStr);
return run(query);
} catch (Throwable t) {
return Mono.error(t);
return publisherToFlowPublisher(Mono.error(t));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.neo4j.driver.internal.reactive;

import java.util.Map;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactive.ReactiveResult;
import org.neo4j.driver.reactive.ReactiveTransaction;
import org.neo4j.driver.reactive.ReactiveTransactionContext;
import org.reactivestreams.Publisher;

final class DelegatingReactiveTransactionContext implements ReactiveTransactionContext {
private final ReactiveTransaction delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
package org.neo4j.driver.internal.reactive;

import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError;
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;
import static reactor.core.publisher.FluxSink.OverflowStrategy.IGNORE;

import java.util.List;
import java.util.concurrent.Flow.Publisher;
import java.util.function.BiConsumer;
import org.neo4j.driver.Record;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.ReactiveResult;
import org.neo4j.driver.summary.ResultSummary;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
Expand All @@ -47,7 +48,7 @@ public List<String> keys() {

@Override
public Publisher<Record> records() {
return Flux.create(
return publisherToFlowPublisher(Flux.create(
sink -> {
if (cursor.isDone()) {
sink.error(newResultConsumedError());
Expand All @@ -57,24 +58,25 @@ public Publisher<Record> records() {
sink.onRequest(cursor::request);
}
},
IGNORE);
IGNORE));
}

@Override
public Publisher<ResultSummary> consume() {
return Mono.create(sink -> cursor.summaryAsync().whenComplete((summary, summaryCompletionError) -> {
Throwable error = Futures.completionExceptionCause(summaryCompletionError);
if (summary != null) {
sink.success(summary);
} else {
sink.error(error);
}
}));
return publisherToFlowPublisher(
Mono.create(sink -> cursor.summaryAsync().whenComplete((summary, summaryCompletionError) -> {
Throwable error = Futures.completionExceptionCause(summaryCompletionError);
if (summary != null) {
sink.success(summary);
} else {
sink.error(error);
}
})));
}

@Override
public Publisher<Boolean> isOpen() {
return Mono.just(!cursor.isDone());
return publisherToFlowPublisher(Mono.just(!cursor.isDone()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.neo4j.driver.internal.reactive;

import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux;
import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
Expand All @@ -33,7 +37,6 @@
import org.neo4j.driver.reactive.ReactiveSession;
import org.neo4j.driver.reactive.ReactiveTransaction;
import org.neo4j.driver.reactive.ReactiveTransactionCallback;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

public class InternalReactiveSession extends AbstractReactiveSession<ReactiveTransaction>
Expand All @@ -48,22 +51,31 @@ ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction)
}

@Override
Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
org.reactivestreams.Publisher<Void> closeTransaction(ReactiveTransaction transaction, boolean commit) {
return ((InternalReactiveTransaction) transaction).close(commit);
}

@Override
public Publisher<ReactiveTransaction> beginTransaction(TransactionConfig config) {
return publisherToFlowPublisher(doBeginTransaction(config));
}

@Override
public <T> Publisher<T> executeRead(
ReactiveTransactionCallback<? extends Publisher<T>> callback, TransactionConfig config) {
return runTransaction(
AccessMode.READ, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config);
return publisherToFlowPublisher(runTransaction(
AccessMode.READ,
tx -> flowPublisherToFlux(callback.execute(new DelegatingReactiveTransactionContext(tx))),
config));
}

@Override
public <T> Publisher<T> executeWrite(
ReactiveTransactionCallback<? extends Publisher<T>> callback, TransactionConfig config) {
return runTransaction(
AccessMode.WRITE, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config);
return publisherToFlowPublisher(runTransaction(
AccessMode.WRITE,
tx -> flowPublisherToFlux(callback.execute(new DelegatingReactiveTransactionContext(tx))),
config));
}

@Override
Expand All @@ -80,7 +92,7 @@ public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
cursorStage = Futures.failedFuture(t);
}

return Mono.fromCompletionStage(cursorStage)
return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage)
.onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync())
.onErrorMap(releaseError -> Futures.combineErrors(error, releaseError))
.then(Mono.error(error)))
Expand All @@ -96,11 +108,16 @@ public Publisher<ReactiveResult> run(Query query, TransactionConfig config) {
}
return publisher;
})
.map(InternalReactiveResult::new);
.map(InternalReactiveResult::new));
}

@Override
public Set<Bookmark> lastBookmarks() {
return new HashSet<>(session.lastBookmarks());
}

@Override
public <T> Publisher<T> close() {
return publisherToFlowPublisher(doClose());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
*/
package org.neo4j.driver.internal.reactive;

import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.Query;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.ReactiveResult;
import org.neo4j.driver.reactive.ReactiveTransaction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

public class InternalReactiveTransaction extends AbstractReactiveTransaction
Expand All @@ -43,7 +45,7 @@ public Publisher<ReactiveResult> run(Query query) {
cursorStage = Futures.failedFuture(t);
}

return Mono.fromCompletionStage(cursorStage)
return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage)
.flatMap(cursor -> {
Mono<RxResultCursor> publisher;
Throwable runError = cursor.getRunError();
Expand All @@ -55,7 +57,7 @@ public Publisher<ReactiveResult> run(Query query) {
}
return publisher;
})
.map(InternalReactiveResult::new);
.map(InternalReactiveResult::new));
}

/**
Expand All @@ -66,6 +68,26 @@ public Publisher<ReactiveResult> run(Query query) {
* @return {@code RESET} response publisher
*/
public Publisher<Void> interrupt() {
return Mono.fromCompletionStage(tx.interruptAsync());
return publisherToFlowPublisher(Mono.fromCompletionStage(tx.interruptAsync()));
}

@Override
public <T> Publisher<T> commit() {
return publisherToFlowPublisher(doCommit());
}

@Override
public <T> Publisher<T> rollback() {
return publisherToFlowPublisher(doRollback());
}

@Override
public Publisher<Void> close() {
return publisherToFlowPublisher(doClose());
}

@Override
public Publisher<Boolean> isOpen() {
return publisherToFlowPublisher(doIsOpen());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal.reactive;

import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.neo4j.driver.AccessMode;
Expand Down Expand Up @@ -54,6 +52,11 @@ Publisher<Void> closeTransaction(RxTransaction transaction, boolean commit) {
return ((InternalRxTransaction) transaction).close(commit);
}

@Override
public Publisher<RxTransaction> beginTransaction(TransactionConfig config) {
return doBeginTransaction(config);
}

@Override
public <T> Publisher<T> readTransaction(RxTransactionWork<? extends Publisher<T>> work) {
return readTransaction(work, TransactionConfig.empty());
Expand Down Expand Up @@ -128,6 +131,6 @@ public Bookmark lastBookmark() {

@Override
public <T> Publisher<T> close() {
return createEmptyPublisher(session::closeAsync);
return doClose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxTransaction;
import org.reactivestreams.Publisher;

@Deprecated
public class InternalRxTransaction extends AbstractReactiveTransaction implements RxTransaction {
Expand Down Expand Up @@ -53,4 +54,24 @@ public RxResult run(Query query) {
return cursorFuture;
});
}

@Override
public <T> Publisher<T> commit() {
return doCommit();
}

@Override
public <T> Publisher<T> rollback() {
return doRollback();
}

@Override
public Publisher<Void> close() {
return doClose();
}

@Override
public Publisher<Boolean> isOpen() {
return doIsOpen();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.neo4j.driver.reactive;

import java.util.Map;
import java.util.concurrent.Flow.Publisher;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.reactivestreams.Publisher;

/**
* Common interface for components that can execute Neo4j queries using Reactive API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.neo4j.driver.reactive;

import java.util.List;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.ResultConsumedException;
import org.neo4j.driver.summary.ResultSummary;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
* A reactive result provides a reactive way to execute query on the server and receives records back. This reactive result consists of a result key publisher,
Expand Down
Loading

0 comments on commit e21be5c

Please sign in to comment.