diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java index 079598c789..6bb412dc07 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveSession.java @@ -49,7 +49,7 @@ public AbstractReactiveSession(NetworkSession session) { abstract Publisher closeTransaction(S transaction, boolean commit); - public Publisher beginTransaction(TransactionConfig config) { + Publisher doBeginTransaction(TransactionConfig config) { return createSingleItemPublisher( () -> { CompletableFuture txFuture = new CompletableFuture<>(); @@ -115,7 +115,7 @@ public Set lastBookmarks() { return session.lastBookmarks(); } - public Publisher close() { + Publisher doClose() { return createEmptyPublisher(session::closeAsync); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java index b9a20174b4..97939c87db 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/AbstractReactiveTransaction.java @@ -31,19 +31,19 @@ protected AbstractReactiveTransaction(UnmanagedTransaction tx) { this.tx = tx; } - public Publisher commit() { + Publisher doCommit() { return createEmptyPublisher(tx::commitAsync); } - public Publisher rollback() { + Publisher doRollback() { return createEmptyPublisher(tx::rollbackAsync); } - public Publisher close() { + Publisher doClose() { return close(false); } - public Publisher isOpen() { + Publisher doIsOpen() { return Mono.just(tx.isOpen()); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/BaseReactiveQueryRunner.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/BaseReactiveQueryRunner.java index 15513acc5c..ac08cf13d2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/BaseReactiveQueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/BaseReactiveQueryRunner.java @@ -18,7 +18,10 @@ */ package org.neo4j.driver.internal.reactive; +import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; + import java.util.Map; +import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.Query; import org.neo4j.driver.Record; import org.neo4j.driver.Value; @@ -27,7 +30,6 @@ import org.neo4j.driver.internal.value.MapValue; import org.neo4j.driver.reactive.ReactiveQueryRunner; import org.neo4j.driver.reactive.ReactiveResult; -import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; interface BaseReactiveQueryRunner extends ReactiveQueryRunner { @@ -37,7 +39,7 @@ default Publisher run(String queryStr, Value parameters) { Query query = new Query(queryStr, parameters); return run(query); } catch (Throwable t) { - return Mono.error(t); + return publisherToFlowPublisher(Mono.error(t)); } } @@ -57,7 +59,7 @@ default Publisher run(String queryStr) { Query query = new Query(queryStr); return run(query); } catch (Throwable t) { - return Mono.error(t); + return publisherToFlowPublisher(Mono.error(t)); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingReactiveTransactionContext.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingReactiveTransactionContext.java index cf2a805321..75914986b3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingReactiveTransactionContext.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/DelegatingReactiveTransactionContext.java @@ -19,13 +19,13 @@ package org.neo4j.driver.internal.reactive; import java.util.Map; +import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.Query; import org.neo4j.driver.Record; import org.neo4j.driver.Value; import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.reactive.ReactiveTransaction; import org.neo4j.driver.reactive.ReactiveTransactionContext; -import org.reactivestreams.Publisher; final class DelegatingReactiveTransactionContext implements ReactiveTransactionContext { private final ReactiveTransaction delegate; diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveResult.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveResult.java index 8ca3e5b40b..01aafc1e34 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveResult.java @@ -19,16 +19,17 @@ package org.neo4j.driver.internal.reactive; import static org.neo4j.driver.internal.util.ErrorUtil.newResultConsumedError; +import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; import static reactor.core.publisher.FluxSink.OverflowStrategy.IGNORE; import java.util.List; +import java.util.concurrent.Flow.Publisher; import java.util.function.BiConsumer; import org.neo4j.driver.Record; import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.summary.ResultSummary; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; @@ -47,7 +48,7 @@ public List keys() { @Override public Publisher records() { - return Flux.create( + return publisherToFlowPublisher(Flux.create( sink -> { if (cursor.isDone()) { sink.error(newResultConsumedError()); @@ -57,24 +58,25 @@ public Publisher records() { sink.onRequest(cursor::request); } }, - IGNORE); + IGNORE)); } @Override public Publisher consume() { - return Mono.create(sink -> cursor.summaryAsync().whenComplete((summary, summaryCompletionError) -> { - Throwable error = Futures.completionExceptionCause(summaryCompletionError); - if (summary != null) { - sink.success(summary); - } else { - sink.error(error); - } - })); + return publisherToFlowPublisher( + Mono.create(sink -> cursor.summaryAsync().whenComplete((summary, summaryCompletionError) -> { + Throwable error = Futures.completionExceptionCause(summaryCompletionError); + if (summary != null) { + sink.success(summary); + } else { + sink.error(error); + } + }))); } @Override public Publisher isOpen() { - return Mono.just(!cursor.isDone()); + return publisherToFlowPublisher(Mono.just(!cursor.isDone())); } /** diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java index 2435152b14..6f21d40a47 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveSession.java @@ -18,9 +18,13 @@ */ package org.neo4j.driver.internal.reactive; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; +import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; + import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; @@ -33,7 +37,6 @@ import org.neo4j.driver.reactive.ReactiveSession; import org.neo4j.driver.reactive.ReactiveTransaction; import org.neo4j.driver.reactive.ReactiveTransactionCallback; -import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; public class InternalReactiveSession extends AbstractReactiveSession @@ -48,22 +51,31 @@ ReactiveTransaction createTransaction(UnmanagedTransaction unmanagedTransaction) } @Override - Publisher closeTransaction(ReactiveTransaction transaction, boolean commit) { + org.reactivestreams.Publisher closeTransaction(ReactiveTransaction transaction, boolean commit) { return ((InternalReactiveTransaction) transaction).close(commit); } + @Override + public Publisher beginTransaction(TransactionConfig config) { + return publisherToFlowPublisher(doBeginTransaction(config)); + } + @Override public Publisher executeRead( ReactiveTransactionCallback> callback, TransactionConfig config) { - return runTransaction( - AccessMode.READ, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config); + return publisherToFlowPublisher(runTransaction( + AccessMode.READ, + tx -> flowPublisherToFlux(callback.execute(new DelegatingReactiveTransactionContext(tx))), + config)); } @Override public Publisher executeWrite( ReactiveTransactionCallback> callback, TransactionConfig config) { - return runTransaction( - AccessMode.WRITE, tx -> callback.execute(new DelegatingReactiveTransactionContext(tx)), config); + return publisherToFlowPublisher(runTransaction( + AccessMode.WRITE, + tx -> flowPublisherToFlux(callback.execute(new DelegatingReactiveTransactionContext(tx))), + config)); } @Override @@ -80,7 +92,7 @@ public Publisher run(Query query, TransactionConfig config) { cursorStage = Futures.failedFuture(t); } - return Mono.fromCompletionStage(cursorStage) + return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage) .onErrorResume(error -> Mono.fromCompletionStage(session.releaseConnectionAsync()) .onErrorMap(releaseError -> Futures.combineErrors(error, releaseError)) .then(Mono.error(error))) @@ -96,11 +108,16 @@ public Publisher run(Query query, TransactionConfig config) { } return publisher; }) - .map(InternalReactiveResult::new); + .map(InternalReactiveResult::new)); } @Override public Set lastBookmarks() { return new HashSet<>(session.lastBookmarks()); } + + @Override + public Publisher close() { + return publisherToFlowPublisher(doClose()); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java index adebf16337..c5a089cded 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalReactiveTransaction.java @@ -18,14 +18,16 @@ */ package org.neo4j.driver.internal.reactive; +import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; + import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.Query; import org.neo4j.driver.internal.async.UnmanagedTransaction; import org.neo4j.driver.internal.cursor.RxResultCursor; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.reactive.ReactiveTransaction; -import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; public class InternalReactiveTransaction extends AbstractReactiveTransaction @@ -43,7 +45,7 @@ public Publisher run(Query query) { cursorStage = Futures.failedFuture(t); } - return Mono.fromCompletionStage(cursorStage) + return publisherToFlowPublisher(Mono.fromCompletionStage(cursorStage) .flatMap(cursor -> { Mono publisher; Throwable runError = cursor.getRunError(); @@ -55,7 +57,7 @@ public Publisher run(Query query) { } return publisher; }) - .map(InternalReactiveResult::new); + .map(InternalReactiveResult::new)); } /** @@ -66,6 +68,26 @@ public Publisher run(Query query) { * @return {@code RESET} response publisher */ public Publisher interrupt() { - return Mono.fromCompletionStage(tx.interruptAsync()); + return publisherToFlowPublisher(Mono.fromCompletionStage(tx.interruptAsync())); + } + + @Override + public Publisher commit() { + return publisherToFlowPublisher(doCommit()); + } + + @Override + public Publisher rollback() { + return publisherToFlowPublisher(doRollback()); + } + + @Override + public Publisher close() { + return publisherToFlowPublisher(doClose()); + } + + @Override + public Publisher isOpen() { + return publisherToFlowPublisher(doIsOpen()); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java index f6aa28938e..52aa03337e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxSession.java @@ -18,8 +18,6 @@ */ package org.neo4j.driver.internal.reactive; -import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; - import java.util.Map; import java.util.concurrent.CompletableFuture; import org.neo4j.driver.AccessMode; @@ -54,6 +52,11 @@ Publisher closeTransaction(RxTransaction transaction, boolean commit) { return ((InternalRxTransaction) transaction).close(commit); } + @Override + public Publisher beginTransaction(TransactionConfig config) { + return doBeginTransaction(config); + } + @Override public Publisher readTransaction(RxTransactionWork> work) { return readTransaction(work, TransactionConfig.empty()); @@ -128,6 +131,6 @@ public Bookmark lastBookmark() { @Override public Publisher close() { - return createEmptyPublisher(session::closeAsync); + return doClose(); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java index e1eee6b45f..dcde113bd4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java @@ -25,6 +25,7 @@ import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactive.RxTransaction; +import org.reactivestreams.Publisher; @Deprecated public class InternalRxTransaction extends AbstractReactiveTransaction implements RxTransaction { @@ -53,4 +54,24 @@ public RxResult run(Query query) { return cursorFuture; }); } + + @Override + public Publisher commit() { + return doCommit(); + } + + @Override + public Publisher rollback() { + return doRollback(); + } + + @Override + public Publisher close() { + return doClose(); + } + + @Override + public Publisher isOpen() { + return doIsOpen(); + } } diff --git a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveQueryRunner.java b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveQueryRunner.java index 5a15f3f14d..f45c3c21d2 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveQueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveQueryRunner.java @@ -19,11 +19,11 @@ package org.neo4j.driver.reactive; import java.util.Map; +import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.Query; import org.neo4j.driver.Record; import org.neo4j.driver.Value; import org.neo4j.driver.Values; -import org.reactivestreams.Publisher; /** * Common interface for components that can execute Neo4j queries using Reactive API. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveResult.java b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveResult.java index e6a047b33d..ac7a07235a 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveResult.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveResult.java @@ -19,13 +19,13 @@ package org.neo4j.driver.reactive; import java.util.List; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; import org.neo4j.driver.Query; import org.neo4j.driver.Record; import org.neo4j.driver.exceptions.ResultConsumedException; import org.neo4j.driver.summary.ResultSummary; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; /** * A reactive result provides a reactive way to execute query on the server and receives records back. This reactive result consists of a result key publisher, diff --git a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java index 0dbc744284..6b220a643e 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveSession.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Query; @@ -28,7 +29,6 @@ import org.neo4j.driver.Session; import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.Values; -import org.reactivestreams.Publisher; /** * A reactive session is the same as {@link Session} except it provides a reactive API. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveTransaction.java b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveTransaction.java index fc693a644f..f1c37bf6ea 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/ReactiveTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/ReactiveTransaction.java @@ -18,8 +18,8 @@ */ package org.neo4j.driver.reactive; +import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.Transaction; -import org.reactivestreams.Publisher; /** * Same as {@link Transaction} except this reactive transaction exposes a reactive API. diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxQueryRunner.java b/driver/src/main/java/org/neo4j/driver/reactive/RxQueryRunner.java index 5ac04ff0d3..2757f82e49 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxQueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxQueryRunner.java @@ -31,6 +31,7 @@ * @see RxSession * @see RxTransaction * @since 4.0 + * @deprecated superseded by {@link ReactiveQueryRunner} */ @Deprecated public interface RxQueryRunner { diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java b/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java index 772fd49862..b245c8fa27 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxResult.java @@ -38,6 +38,7 @@ * @see Subscriber * @see Subscription * @since 4.0 + * @deprecated superseded by {@link ReactiveResult} */ @Deprecated public interface RxResult { diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java b/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java index cf2d5f22f4..3d2583e4ca 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxSession.java @@ -35,6 +35,7 @@ * @see RxTransaction * @see Publisher * @since 4.0 + * @deprecated superseded by {@link ReactiveSession} */ @Deprecated public interface RxSession extends RxQueryRunner { diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java index 77ef0ca01b..aa5f4cff7a 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransaction.java @@ -27,6 +27,7 @@ * @see RxSession * @see Publisher * @since 4.0 + * @deprecated superseded by {@link ReactiveTransaction} */ @Deprecated public interface RxTransaction extends RxQueryRunner { diff --git a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java index e76cac0467..82bfbbca90 100644 --- a/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java +++ b/driver/src/main/java/org/neo4j/driver/reactive/RxTransactionWork.java @@ -22,9 +22,9 @@ * Callback that executes operations against a given {@link RxTransaction}. To be used with {@link RxSession#readTransaction(RxTransactionWork)} and {@link * RxSession#writeTransaction(RxTransactionWork)} methods. * - * @param the return type of this work. + * @param the return type of this work * @since 4.0 - * @deprecated superseded by {@link ReactiveTransactionCallback}. + * @deprecated superseded by {@link ReactiveTransactionCallback} */ @Deprecated public interface RxTransactionWork { diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingReactiveTransactionContextTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingReactiveTransactionContextTest.java index 84e59ad698..9fbc1d3ecd 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingReactiveTransactionContextTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/DelegatingReactiveTransactionContextTest.java @@ -22,9 +22,11 @@ import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; +import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Flow.Publisher; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.neo4j.driver.Query; @@ -32,7 +34,6 @@ import org.neo4j.driver.Value; import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.reactive.ReactiveTransaction; -import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; public class DelegatingReactiveTransactionContextTest { @@ -50,11 +51,10 @@ void shouldDelegateRunWithValueParams() { // GIVEN String query = "something"; Value params = mock(Value.class); - Publisher expected = Mono.empty(); + Publisher expected = publisherToFlowPublisher(Mono.empty()); given(transaction.run(query, params)).willReturn(expected); // WHEN - @SuppressWarnings("ReactiveStreamsUnusedPublisher") Publisher actual = context.run(query, params); // THEN @@ -67,11 +67,10 @@ void shouldDelegateRunWithMapParams() { // GIVEN String query = "something"; Map params = Collections.emptyMap(); - Publisher expected = Mono.empty(); + Publisher expected = publisherToFlowPublisher(Mono.empty()); given(transaction.run(query, params)).willReturn(expected); // WHEN - @SuppressWarnings("ReactiveStreamsUnusedPublisher") Publisher actual = context.run(query, params); // THEN @@ -84,11 +83,10 @@ void shouldDelegateRunWithRecordParams() { // GIVEN String query = "something"; Record params = mock(Record.class); - Publisher expected = Mono.empty(); + Publisher expected = publisherToFlowPublisher(Mono.empty()); given(transaction.run(query, params)).willReturn(expected); // WHEN - @SuppressWarnings("ReactiveStreamsUnusedPublisher") Publisher actual = context.run(query, params); // THEN @@ -100,11 +98,10 @@ void shouldDelegateRunWithRecordParams() { void shouldDelegateRun() { // GIVEN String query = "something"; - Publisher expected = Mono.empty(); + Publisher expected = publisherToFlowPublisher(Mono.empty()); given(transaction.run(query)).willReturn(expected); // WHEN - @SuppressWarnings("ReactiveStreamsUnusedPublisher") Publisher actual = context.run(query); // THEN @@ -116,11 +113,10 @@ void shouldDelegateRun() { void shouldDelegateRunWithQueryType() { // GIVEN Query query = mock(Query.class); - Publisher expected = Mono.empty(); + Publisher expected = publisherToFlowPublisher(Mono.empty()); given(transaction.run(query)).willReturn(expected); // WHEN - @SuppressWarnings("ReactiveStreamsUnusedPublisher") Publisher actual = context.run(query); // THEN diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java index 175ad56354..be0770611d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveSessionTest.java @@ -35,11 +35,14 @@ import static org.neo4j.driver.TransactionConfig.empty; import static org.neo4j.driver.Values.parameters; import static org.neo4j.driver.internal.util.Futures.completedWithNull; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; +import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Flow.Publisher; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Stream; @@ -64,7 +67,6 @@ import org.neo4j.driver.reactive.ReactiveSession; import org.neo4j.driver.reactive.ReactiveTransaction; import org.neo4j.driver.reactive.ReactiveTransactionCallback; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -100,7 +102,7 @@ void shouldDelegateRun(Function> runR InternalReactiveSession rxSession = new InternalReactiveSession(session); // When - Publisher result = runReturnOne.apply(rxSession); + var result = flowPublisherToFlux(runReturnOne.apply(rxSession)); // Then verify(session).runRx(any(Query.class), any(TransactionConfig.class)); @@ -121,7 +123,7 @@ void shouldReleaseConnectionIfFailedToRun(Function result = runReturnOne.apply(rxSession); + var result = flowPublisherToFlux(runReturnOne.apply(rxSession)); // Then StepVerifier.create(result).expectErrorMatches(t -> error == t).verify(); @@ -140,7 +142,7 @@ void shouldDelegateBeginTx(Function rxTx = beginTx.apply(rxSession); + var rxTx = flowPublisherToFlux(beginTx.apply(rxSession)); StepVerifier.create(Mono.from(rxTx)).expectNextCount(1).verifyComplete(); // Then @@ -161,7 +163,7 @@ void shouldReleaseConnectionIfFailedToBeginTx(Function rxTx = beginTx.apply(rxSession); + var rxTx = flowPublisherToFlux(beginTx.apply(rxSession)); CompletableFuture txFuture = Mono.from(rxTx).toFuture(); // Then @@ -186,8 +188,8 @@ void shouldRetryOnError() { InternalRxSession rxSession = new InternalRxSession(session); // When - Publisher strings = - rxSession.readTransaction(t -> Flux.just("a").then(Mono.error(new RuntimeException("Errored")))); + var strings = rxSession.readTransaction( + t -> Flux.just("a").then(Mono.error(new RuntimeException("Errored")))); StepVerifier.create(Flux.from(strings)) // we lost the "a"s too as the user only see the last failure .expectError(RuntimeException.class) @@ -216,7 +218,7 @@ void shouldObtainResultIfRetrySucceed() { // When AtomicInteger count = new AtomicInteger(); - Publisher strings = rxSession.readTransaction(t -> { + var strings = rxSession.readTransaction(t -> { // we fail for the first few retries, and then success on the last run. if (count.getAndIncrement() == retryCount) { return Flux.just("a"); @@ -272,7 +274,7 @@ void shouldDelegateClose() { InternalRxSession rxSession = new InternalRxSession(session); // When - Publisher mono = rxSession.close(); + var mono = rxSession.close(); // Then StepVerifier.create(mono).verifyComplete(); @@ -289,8 +291,9 @@ void shouldDelegateExecuteReadToRetryLogic(ExecuteVariation executeVariation) { RetryLogic logic = mock(RetryLogic.class); String expected = ""; given(networkSession.retryLogic()).willReturn(logic); - ReactiveTransactionCallback> tc = (ignored) -> Mono.justOrEmpty(expected); - given(logic.retryRx(any())).willReturn(tc.execute(null)); + ReactiveTransactionCallback> tc = + (ignored) -> publisherToFlowPublisher(Mono.justOrEmpty(expected)); + given(logic.retryRx(any())).willReturn(flowPublisherToFlux(tc.execute(null))); TransactionConfig config = TransactionConfig.builder().build(); // WHEN @@ -299,7 +302,7 @@ void shouldDelegateExecuteReadToRetryLogic(ExecuteVariation executeVariation) { : (executeVariation.explicitTxConfig ? session.executeWrite(tc, config) : session.executeWrite(tc)); // THEN - assertEquals(expected, Mono.from(actual).block()); + assertEquals(expected, Mono.from(flowPublisherToFlux(actual)).block()); then(networkSession).should().retryLogic(); then(logic).should().retryRx(any()); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java index c8c426a72d..a4ab8117d0 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/reactive/InternalReactiveTransactionTest.java @@ -23,6 +23,7 @@ import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.neo4j.driver.internal.util.Futures.failedFuture; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; import org.junit.jupiter.api.Test; import org.neo4j.driver.internal.async.UnmanagedTransaction; @@ -39,7 +40,9 @@ void shouldDelegateInterrupt() { tx = new InternalReactiveTransaction(utx); // When - StepVerifier.create(tx.interrupt()).expectComplete().verify(); + StepVerifier.create(flowPublisherToFlux(tx.interrupt())) + .expectComplete() + .verify(); // Then then(utx).should().interruptAsync(); @@ -54,7 +57,9 @@ void shouldDelegateInterruptAndReportError() { tx = new InternalReactiveTransaction(utx); // When - StepVerifier.create(tx.interrupt()).expectErrorMatches(ar -> ar == e).verify(); + StepVerifier.create(flowPublisherToFlux(tx.interrupt())) + .expectErrorMatches(ar -> ar == e) + .verify(); // Then then(utx).should().interruptAsync(); diff --git a/driver/src/test/java/org/neo4j/driver/tck/reactive/ReactiveResultPublisherVerificationIT.java b/driver/src/test/java/org/neo4j/driver/tck/reactive/ReactiveResultPublisherVerificationIT.java index a7c202e4bd..c88f84757b 100644 --- a/driver/src/test/java/org/neo4j/driver/tck/reactive/ReactiveResultPublisherVerificationIT.java +++ b/driver/src/test/java/org/neo4j/driver/tck/reactive/ReactiveResultPublisherVerificationIT.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.tck.reactive; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; + import java.time.Duration; import org.neo4j.driver.Driver; import org.neo4j.driver.reactive.ReactiveResult; @@ -65,13 +67,13 @@ public long maxElementsFromPublisher() { @Override public Publisher createPublisher(long elements) { ReactiveSession session = driver.reactiveSession(); - return Mono.fromDirect(session.run("RETURN 1")); + return Mono.from(flowPublisherToFlux(session.run("RETURN 1"))); } @Override public Publisher createFailedPublisher() { ReactiveSession session = driver.reactiveSession(); // Please note that this publisher fails on run stage. - return Mono.fromDirect(session.run("RETURN 5/0")); + return Mono.from(flowPublisherToFlux(session.run("RETURN 5/0"))); } } diff --git a/driver/src/test/java/org/neo4j/driver/tck/reactive/ReactiveResultRecordPublisherVerificationIT.java b/driver/src/test/java/org/neo4j/driver/tck/reactive/ReactiveResultRecordPublisherVerificationIT.java index bd0b7d6f6d..e634234bb0 100644 --- a/driver/src/test/java/org/neo4j/driver/tck/reactive/ReactiveResultRecordPublisherVerificationIT.java +++ b/driver/src/test/java/org/neo4j/driver/tck/reactive/ReactiveResultRecordPublisherVerificationIT.java @@ -19,6 +19,7 @@ package org.neo4j.driver.tck.reactive; import static org.neo4j.driver.Values.parameters; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; import java.time.Duration; import org.neo4j.driver.Driver; @@ -72,14 +73,15 @@ public long maxElementsFromPublisher() { @Override public Publisher createPublisher(long elements) { ReactiveSession session = driver.reactiveSession(); - return Mono.fromDirect(session.run(QUERY, parameters("numberOfRecords", elements))) - .flatMapMany(r -> Flux.from(r.records())); + return Mono.fromDirect(flowPublisherToFlux(session.run(QUERY, parameters("numberOfRecords", elements)))) + .flatMapMany(r -> Flux.from(flowPublisherToFlux(r.records()))); } @Override public Publisher createFailedPublisher() { ReactiveSession session = driver.reactiveSession(); // Please note that this publisher fails on run stage. - return Mono.fromDirect(session.run("RETURN 5/0")).flatMapMany(r -> Flux.from(r.records())); + return Mono.fromDirect(flowPublisherToFlux(session.run("RETURN 5/0"))) + .flatMapMany(r -> Flux.from(flowPublisherToFlux(r.records()))); } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveBufferedSubscriber.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveBufferedSubscriber.java deleted file mode 100644 index ad00c54e45..0000000000 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveBufferedSubscriber.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package neo4j.org.testkit.backend; - -public class ReactiveBufferedSubscriber {} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveTransactionContextAdapter.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveTransactionContextAdapter.java index 7e57c63c5a..68df30d9ff 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveTransactionContextAdapter.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/ReactiveTransactionContextAdapter.java @@ -19,13 +19,13 @@ package neo4j.org.testkit.backend; import java.util.Map; +import java.util.concurrent.Flow.Publisher; import org.neo4j.driver.Query; import org.neo4j.driver.Record; import org.neo4j.driver.Value; import org.neo4j.driver.reactive.ReactiveResult; import org.neo4j.driver.reactive.ReactiveTransaction; import org.neo4j.driver.reactive.ReactiveTransactionContext; -import org.reactivestreams.Publisher; public class ReactiveTransactionContextAdapter implements ReactiveTransaction { private final ReactiveTransactionContext delegate; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/AbstractResultNext.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/AbstractResultNext.java index d5aa376f4d..21c5033fb5 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/AbstractResultNext.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/AbstractResultNext.java @@ -18,6 +18,8 @@ */ package neo4j.org.testkit.backend.messages; +import static org.reactivestreams.FlowAdapters.toFlowSubscriber; + import java.util.concurrent.CompletionStage; import neo4j.org.testkit.backend.RxBufferedSubscriber; import neo4j.org.testkit.backend.TestkitState; @@ -79,7 +81,7 @@ public Mono processReactive(TestkitState testkitState) { RxBufferedSubscriber subscriberInstance = new RxBufferedSubscriber<>(getFetchSize(resultHolder)); resultHolder.setSubscriber(subscriberInstance); - resultHolder.getResult().records().subscribe(subscriberInstance); + resultHolder.getResult().records().subscribe(toFlowSubscriber(subscriberInstance)); return subscriberInstance; }); return subscriber diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java index c47146ae13..962542788a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java @@ -18,6 +18,8 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,8 +80,8 @@ public Mono processRx(TestkitState testkitState) { public Mono processReactive(TestkitState testkitState) { return testkitState .getReactiveResultHolder(data.getResultId()) - .flatMap( - resultHolder -> Mono.fromDirect(resultHolder.getResult().consume())) + .flatMap(resultHolder -> Mono.fromDirect( + flowPublisherToFlux(resultHolder.getResult().consume()))) .map(this::createResponse); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java index 608a273814..4bbde0053c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java @@ -18,6 +18,8 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; + import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -112,7 +114,7 @@ public Mono processReactive(TestkitState testkitState) { configureTimeout(builder); - return Mono.fromDirect(session.beginTransaction(builder.build())) + return Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(builder.build()))) .map(tx -> transaction(testkitState.addReactiveTransactionHolder( new ReactiveTransactionHolder(sessionHolder, tx)))); }); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index 92cffd57a5..269391fe56 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -18,6 +18,8 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; + import java.util.concurrent.CompletionStage; import lombok.Getter; import lombok.Setter; @@ -58,8 +60,8 @@ public Mono processRx(TestkitState testkitState) { public Mono processReactive(TestkitState testkitState) { return testkitState .getReactiveSessionHolder(data.getSessionId()) - .flatMap(sessionHolder -> - Mono.fromDirect(sessionHolder.getSession().close())) + .flatMap(sessionHolder -> Mono.fromDirect( + flowPublisherToFlux(sessionHolder.getSession().close()))) .then(Mono.just(createResponse())); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java index 13a00fd509..b657d61ed8 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java @@ -18,6 +18,9 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; +import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -102,16 +105,17 @@ public Mono processReactive(TestkitState testkitState) { return testkitState .getReactiveSessionHolder(data.getSessionId()) .flatMap(sessionHolder -> { - ReactiveTransactionCallback> workWrapper = tx -> { + ReactiveTransactionCallback> workWrapper = tx -> { String txId = testkitState.addReactiveTransactionHolder(new ReactiveTransactionHolder( sessionHolder, new ReactiveTransactionContextAdapter(tx))); testkitState.getResponseWriter().accept(retryableTry(txId)); CompletableFuture tryResult = new CompletableFuture<>(); sessionHolder.setTxWorkFuture(tryResult); - return Mono.fromCompletionStage(tryResult); + return publisherToFlowPublisher(Mono.fromCompletionStage(tryResult)); }; - return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper)); + return Mono.fromDirect( + flowPublisherToFlux(sessionHolder.getSession().executeRead(workWrapper))); }) .then(Mono.just(retryableDone())); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java index 8624fd3e39..967ee3453e 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java @@ -18,6 +18,8 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; + import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import java.time.Duration; import java.util.List; @@ -130,7 +132,7 @@ public Mono processReactive(TestkitState testkitState) { Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); configureTimeout(transactionConfig); - return Mono.fromDirect(session.run(query, transactionConfig.build())) + return Mono.fromDirect(flowPublisherToFlux(session.run(query, transactionConfig.build()))) .map(result -> { String id = testkitState.addReactiveResultHolder(new ReactiveResultHolder(sessionHolder, result)); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java index 6effd93b95..7f26847b0a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java @@ -18,6 +18,9 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; +import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; + import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -103,16 +106,17 @@ public Mono processReactive(TestkitState testkitState) { return testkitState .getReactiveSessionHolder(data.getSessionId()) .flatMap(sessionHolder -> { - ReactiveTransactionCallback> workWrapper = tx -> { + ReactiveTransactionCallback> workWrapper = tx -> { String txId = testkitState.addReactiveTransactionHolder(new ReactiveTransactionHolder( sessionHolder, new ReactiveTransactionContextAdapter(tx))); testkitState.getResponseWriter().accept(retryableTry(txId)); CompletableFuture tryResult = new CompletableFuture<>(); sessionHolder.setTxWorkFuture(tryResult); - return Mono.fromCompletionStage(tryResult); + return publisherToFlowPublisher(Mono.fromCompletionStage(tryResult)); }; - return Mono.fromDirect(sessionHolder.getSession().executeWrite(workWrapper)); + return Mono.fromDirect( + flowPublisherToFlux(sessionHolder.getSession().executeWrite(workWrapper))); }) .then(Mono.just(retryableDone())); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java index 8a75db4f10..9adb2e8999 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java @@ -18,6 +18,8 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; + import java.util.concurrent.CompletionStage; import lombok.Getter; import lombok.Setter; @@ -63,7 +65,7 @@ public Mono processReactive(TestkitState testkitState) { return testkitState .getReactiveTransactionHolder(data.getTxId()) .map(AbstractTransactionHolder::getTransaction) - .flatMap(tx -> Mono.fromDirect(tx.close())) + .flatMap(tx -> Mono.fromDirect(flowPublisherToFlux(tx.close()))) .then(Mono.just(createResponse(data.getTxId()))); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java index 520220f0e1..7299cd58e0 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java @@ -18,6 +18,8 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; + import java.util.concurrent.CompletionStage; import lombok.Getter; import lombok.Setter; @@ -57,7 +59,8 @@ public Mono processRx(TestkitState testkitState) { public Mono processReactive(TestkitState testkitState) { return testkitState .getReactiveTransactionHolder(data.getTxId()) - .flatMap(tx -> Mono.fromDirect(tx.getTransaction().commit())) + .flatMap(tx -> + Mono.fromDirect(flowPublisherToFlux(tx.getTransaction().commit()))) .then(Mono.just(createResponse(data.getTxId()))); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java index 1a7fd399be..d4da1568df 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java @@ -18,6 +18,8 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; + import java.util.concurrent.CompletionStage; import lombok.Getter; import lombok.Setter; @@ -57,7 +59,8 @@ public Mono processRx(TestkitState testkitState) { public Mono processReactive(TestkitState testkitState) { return testkitState .getReactiveTransactionHolder(data.getTxId()) - .flatMap(tx -> Mono.fromDirect(tx.getTransaction().rollback())) + .flatMap(tx -> + Mono.fromDirect(flowPublisherToFlux(tx.getTransaction().rollback()))) .then(Mono.just(createResponse(data.getTxId()))); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java index b7d07c4fb7..22d8da6bca 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java @@ -18,6 +18,8 @@ */ package neo4j.org.testkit.backend.messages.requests; +import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; + import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import java.util.Collections; import java.util.List; @@ -85,10 +87,12 @@ public Mono processReactive(TestkitState testkitState) { ReactiveTransaction tx = transactionHolder.getTransaction(); Map params = data.getParams() != null ? data.getParams() : Collections.emptyMap(); - return Mono.fromDirect(tx.run(data.getCypher(), params)).map(result -> { - String id = testkitState.addReactiveResultHolder(new ReactiveResultHolder(transactionHolder, result)); - return createResponse(id, result.keys()); - }); + return Mono.fromDirect(flowPublisherToFlux(tx.run(data.getCypher(), params))) + .map(result -> { + String id = testkitState.addReactiveResultHolder( + new ReactiveResultHolder(transactionHolder, result)); + return createResponse(id, result.keys()); + }); }); }