From d44eadc609f695379ef95558e87a87c64082bee0 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Mon, 18 Jan 2021 23:38:17 +0200 Subject: [PATCH] Updating RX API stress test This update aims to fix the instability in the RX API stress test by ensuring that the transactions are always finalized. --- .../neo4j/driver/stress/RxWriteQueryInTx.java | 39 ++++++++++++++----- .../stress/RxWriteQueryWithRetries.java | 25 ++++++++---- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java index 1d2f9752f6..799e608cb7 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryInTx.java @@ -18,16 +18,20 @@ */ package org.neo4j.driver.stress; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; -import org.neo4j.driver.AccessMode; import org.neo4j.driver.Driver; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactive.RxSession; import org.neo4j.driver.reactive.RxTransaction; +import org.neo4j.driver.summary.ResultSummary; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -45,17 +49,32 @@ public RxWriteQueryInTx( AbstractStressTestBase stressTest, Driver driver, bo public CompletionStage execute( C context ) { CompletableFuture queryFinished = new CompletableFuture<>(); - RxSession session = newSession( AccessMode.WRITE, context ); - Flux.usingWhen( session.beginTransaction(), tx -> tx.run( "CREATE ()" ).consume(), - RxTransaction::commit, ( tx, error ) -> tx.rollback(), null ).subscribe( - summary -> { - context.setBookmark( session.lastBookmark() ); - assertEquals( 1, summary.counters().nodesCreated() ); + + Function> sessionToResultSummaryPublisher = ( RxSession session ) -> Flux.usingWhen( + Mono.from( session.beginTransaction() ), + tx -> tx.run( "CREATE ()" ).consume(), + RxTransaction::commit, + ( tx, error ) -> tx.rollback(), + RxTransaction::rollback + ); + + AtomicInteger createdNodesNum = new AtomicInteger(); + Flux.usingWhen( + Mono.fromSupplier( driver::rxSession ), + sessionToResultSummaryPublisher, + session -> Mono.empty(), + ( session, error ) -> session.close(), + RxSession::close + ).subscribe( + resultSummary -> createdNodesNum.addAndGet( resultSummary.counters().nodesCreated() ), + error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ), + () -> + { + assertEquals( 1, createdNodesNum.get() ); context.nodeCreated(); queryFinished.complete( null ); - }, error -> { - handleError( Futures.completionExceptionCause( error ), context, queryFinished ); - } ); + } + ); return queryFinished; } diff --git a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java index ef209988d6..15c6edd571 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java +++ b/driver/src/test/java/org/neo4j/driver/stress/RxWriteQueryWithRetries.java @@ -23,15 +23,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; -import org.neo4j.driver.AccessMode; import org.neo4j.driver.Driver; import org.neo4j.driver.internal.util.Futures; import org.neo4j.driver.reactive.RxSession; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; public class RxWriteQueryWithRetries extends AbstractRxQuery { @@ -47,13 +45,24 @@ public RxWriteQueryWithRetries( AbstractStressTestBase stressTest, Driver dri public CompletionStage execute( C context ) { CompletableFuture queryFinished = new CompletableFuture<>(); - Flux.usingWhen( Mono.fromSupplier( () -> newSession( AccessMode.WRITE, context ) ), - session -> session.writeTransaction( tx -> tx.run( "CREATE ()" ).consume() ), RxSession::close ) - .subscribe( summary -> { - assertEquals( 1, summary.counters().nodesCreated() ); + + AtomicInteger createdNodesNum = new AtomicInteger(); + Flux.usingWhen( + Mono.fromSupplier( driver::rxSession ), + session -> session.writeTransaction( tx -> tx.run( "CREATE ()" ).consume() ), + session -> Mono.empty(), + ( session, error ) -> session.close(), + RxSession::close + ).subscribe( + resultSummary -> createdNodesNum.addAndGet( resultSummary.counters().nodesCreated() ), + error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ), + () -> + { + assertEquals( 1, createdNodesNum.get() ); context.nodeCreated(); queryFinished.complete( null ); - }, error -> handleError( Futures.completionExceptionCause( error ), context, queryFinished ) ); + } + ); return queryFinished; }