diff --git a/driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java b/driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java index 9501747f85..b2cfe57c01 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java +++ b/driver/src/test/java/org/neo4j/driver/stress/AbstractStressTestBase.java @@ -31,7 +31,7 @@ import java.lang.reflect.Method; import java.net.URI; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -159,7 +159,7 @@ void asyncApiBigDataTest() throws Throwable } @Test - void rxApiBigDataTest() throws Throwable + void rxApiBigDataTest() { assertRxIsAvailable(); Bookmark bookmark = createNodesRx( bigDataTestBatchCount(), BIG_DATA_TEST_BATCH_SIZE, driver ); @@ -221,7 +221,17 @@ Config config() abstract C createContext(); - abstract List> createTestSpecificBlockingCommands(); + List> createTestSpecificBlockingCommands() { + return Collections.emptyList(); + } + + List> createTestSpecificAsyncCommands() { + return Collections.emptyList(); + } + + List> createTestSpecificRxCommands() { + return Collections.emptyList(); + } abstract boolean handleWriteFailure( Throwable error, C context ); @@ -245,23 +255,15 @@ private List> createBlockingCommands() { List> commands = new ArrayList<>(); - commands.add( new BlockingReadQuery<>( driver, false ) ); - commands.add( new BlockingReadQuery<>( driver, true ) ); - - commands.add( new BlockingReadQueryInTx<>( driver, false ) ); - commands.add( new BlockingReadQueryInTx<>( driver, true ) ); + commands.add( new BlockingReadQueryWithRetries<>( driver, false ) ); + commands.add( new BlockingReadQueryWithRetries<>( driver, true ) ); - commands.add( new BlockingWriteQuery<>( this, driver, false ) ); - commands.add( new BlockingWriteQuery<>( this, driver, true ) ); + commands.add( new BlockingWriteQueryWithRetries<>( this, driver, false ) ); + commands.add( new BlockingWriteQueryWithRetries<>( this, driver, true ) ); - commands.add( new BlockingWriteQueryInTx<>( this, driver, false ) ); - commands.add( new BlockingWriteQueryInTx<>( this, driver, true ) ); + commands.add( new BlockingWrongQueryWithRetries<>( driver ) ); - commands.add( new BlockingWrongQuery<>( driver ) ); - commands.add( new BlockingWrongQueryInTx<>( driver ) ); - - commands.add( new BlockingFailingQuery<>( driver ) ); - commands.add( new BlockingFailingQueryInTx<>( driver ) ); + commands.add( new BlockingFailingQueryWithRetries<>( driver ) ); commands.add( new FailedAuth<>( databaseUri(), config() ) ); @@ -299,29 +301,19 @@ private List> launchRxWorkerThreads( C context ) private List> createRxCommands() { - return Arrays.asList( - new RxReadQuery<>( driver, false ), - new RxReadQuery<>( driver, true ), - - new RxWriteQuery<>( this, driver, false ), - new RxWriteQuery<>( this, driver, true ), + List> commands = new ArrayList<>(); - new RxReadQueryInTx<>( driver, false ), - new RxReadQueryInTx<>( driver, true ), + commands.add( new RxReadQueryWithRetries<>( driver, false ) ); + commands.add( new RxReadQueryWithRetries<>( driver, true ) ); - new RxWriteQueryInTx<>( this, driver, false ), - new RxWriteQueryInTx<>( this, driver, true ), + commands.add( new RxWriteQueryWithRetries<>( this, driver, false ) ); + commands.add( new RxWriteQueryWithRetries<>( this, driver, true ) ); - new RxReadQueryWithRetries<>( driver, false ), - new RxReadQueryWithRetries<>( driver, false ), + commands.add( new RxFailingQueryWithRetries<>( driver ) ); - new RxWriteQueryWithRetries<>( this, driver, false ), - new RxWriteQueryWithRetries<>( this, driver, true ), + commands.addAll( createTestSpecificRxCommands() ); - new RxFailingQuery<>( driver ), - new RxFailingQueryInTx<>( driver ), - new RxFailingQueryWithRetries<>( driver ) - ); + return commands; } private Future launchRxWorkerThread( ExecutorService executor, List> commands, C context ) @@ -367,23 +359,15 @@ private List> createAsyncCommands() { List> commands = new ArrayList<>(); - commands.add( new AsyncReadQuery<>( driver, false ) ); - commands.add( new AsyncReadQuery<>( driver, true ) ); - - commands.add( new AsyncReadQueryInTx<>( driver, false ) ); - commands.add( new AsyncReadQueryInTx<>( driver, true ) ); - - commands.add( new AsyncWriteQuery<>( this, driver, false ) ); - commands.add( new AsyncWriteQuery<>( this, driver, true ) ); + commands.add( new AsyncReadQueryWithRetries<>( driver, false ) ); + commands.add( new AsyncReadQueryWithRetries<>( driver, true ) ); - commands.add( new AsyncWriteQueryInTx<>( this, driver, false ) ); - commands.add( new AsyncWriteQueryInTx<>( this, driver, true ) ); + commands.add( new AsyncWriteQueryWithRetries<>( this, driver, false ) ); + commands.add( new AsyncWriteQueryWithRetries<>( this, driver, true ) ); - commands.add( new AsyncWrongQuery<>( driver ) ); - commands.add( new AsyncWrongQueryInTx<>( driver ) ); + commands.add( new AsyncWrongQueryWithRetries<>( driver ) ); - commands.add( new AsyncFailingQuery<>( driver ) ); - commands.add( new AsyncFailingQueryInTx<>( driver ) ); + commands.add( new AsyncFailingQueryWithRetries<>( driver ) ); return commands; } diff --git a/driver/src/test/java/org/neo4j/driver/stress/AsyncFailingQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/AsyncFailingQueryWithRetries.java new file mode 100644 index 0000000000..f441170f0d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/stress/AsyncFailingQueryWithRetries.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.stress; + +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Driver; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.ResultCursor; +import org.neo4j.driver.internal.util.Futures; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.junit.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.neo4j.driver.internal.util.Matchers.arithmeticError; + +public class AsyncFailingQueryWithRetries extends AbstractAsyncQuery +{ + public AsyncFailingQueryWithRetries( Driver driver ) + { + super( driver, false ); + } + + @Override + public CompletionStage execute( C context ) + { + AsyncSession session = newSession( AccessMode.READ, context ); + + return session.readTransactionAsync( tx -> tx.runAsync( "UNWIND [10, 5, 0] AS x RETURN 10 / x" ) + .thenCompose( ResultCursor::listAsync ) + .handle( ( records, error ) -> + { + session.closeAsync(); + + assertNull( records ); + Throwable cause = Futures.completionExceptionCause( error ); + assertThat( cause, is( arithmeticError() ) ); + + return null; + } )); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/stress/AsyncReadQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/AsyncReadQueryWithRetries.java new file mode 100644 index 0000000000..385dab9500 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/stress/AsyncReadQueryWithRetries.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.stress; + +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Driver; +import org.neo4j.driver.Record; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.ResultCursor; +import org.neo4j.driver.summary.ResultSummary; +import org.neo4j.driver.types.Node; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class AsyncReadQueryWithRetries extends AbstractAsyncQuery +{ + public AsyncReadQueryWithRetries( Driver driver, boolean useBookmark ) + { + super( driver, useBookmark ); + } + + @Override + public CompletionStage execute( C context ) + { + AsyncSession session = newSession( AccessMode.READ, context ); + + CompletionStage queryFinished = session.readTransactionAsync( + tx -> tx.runAsync( "MATCH (n) RETURN n LIMIT 1" ) + .thenCompose( + cursor -> cursor.nextAsync() + .thenCompose( record -> processAndGetSummary( record, cursor ) ) ) ); + + queryFinished.whenComplete( ( summary, error ) -> + { + if ( summary != null ) + { + context.readCompleted( summary ); + } + session.closeAsync(); + } ); + + return queryFinished.thenApply( summary -> null ); + } + + private CompletionStage processAndGetSummary( Record record, ResultCursor cursor ) + { + if ( record != null ) + { + Node node = record.get( 0 ).asNode(); + assertNotNull( node ); + } + return cursor.consumeAsync(); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQueryWithRetries.java new file mode 100644 index 0000000000..69ba462ea8 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/stress/AsyncWriteQueryWithRetries.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.stress; + +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Driver; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.ResultCursor; +import org.neo4j.driver.internal.util.Futures; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class AsyncWriteQueryWithRetries extends AbstractAsyncQuery +{ + private final AbstractStressTestBase stressTest; + + public AsyncWriteQueryWithRetries( AbstractStressTestBase stressTest, Driver driver, boolean useBookmark ) + { + super( driver, useBookmark ); + this.stressTest = stressTest; + } + + @Override + public CompletionStage execute( C context ) + { + AsyncSession session = newSession( AccessMode.WRITE, context ); + + return session.writeTransactionAsync( + tx -> tx.runAsync( "CREATE ()" ) + .thenCompose( ResultCursor::consumeAsync ) ) + .handle( ( summary, error ) -> + { + session.closeAsync(); + + if ( error != null ) + { + handleError( Futures.completionExceptionCause( error ), context ); + } + else + { + context.setBookmark( session.lastBookmark() ); + assertEquals( 1, summary.counters().nodesCreated() ); + context.nodeCreated(); + } + + return null; + } ); + } + + private void handleError( Throwable error, C context ) + { + if ( !stressTest.handleWriteFailure( error, context ) ) + { + throw new RuntimeException( error ); + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/stress/AsyncWrongQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/AsyncWrongQueryWithRetries.java new file mode 100644 index 0000000000..fbe05e210d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/stress/AsyncWrongQueryWithRetries.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.stress; + +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Driver; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.ResultCursor; +import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.internal.util.Futures; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.junit.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class AsyncWrongQueryWithRetries extends AbstractAsyncQuery +{ + public AsyncWrongQueryWithRetries( Driver driver ) + { + super( driver, false ); + } + + @Override + public CompletionStage execute( C context ) + { + AsyncSession session = newSession( AccessMode.READ, context ); + + return session.readTransactionAsync(tx -> tx.runAsync("RETURN Wrong" ) + .thenCompose( ResultCursor::nextAsync ) + .handle( ( record, error ) -> + { + session.closeAsync(); + assertNull( record ); + + Throwable cause = Futures.completionExceptionCause( error ); + assertNotNull( cause ); + assertThat( cause, instanceOf( ClientException.class ) ); + assertThat( ((Neo4jException) cause).code(), containsString( "SyntaxError" ) ); + + return null; + } )); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/stress/BlockingFailingQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/BlockingFailingQueryWithRetries.java new file mode 100644 index 0000000000..678fa5250a --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/stress/BlockingFailingQueryWithRetries.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.stress; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Driver; +import org.neo4j.driver.Session; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.junit.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.neo4j.driver.internal.util.Matchers.arithmeticError; + +public class BlockingFailingQueryWithRetries extends AbstractBlockingQuery +{ + public BlockingFailingQueryWithRetries( Driver driver ) + { + super( driver, false ); + } + + @Override + public void execute( C context ) + { + try ( Session session = newSession( AccessMode.READ, context ) ) + { + Exception e = assertThrows( + Exception.class, + () -> session.readTransaction( + tx -> + { + tx.run( "UNWIND [10, 5, 0] AS x RETURN 10 / x" ).consume(); + return 1; + } ) ); + assertThat( e, is( arithmeticError() ) ); + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/stress/BlockingReadQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/BlockingReadQueryWithRetries.java new file mode 100644 index 0000000000..ada27e6fb7 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/stress/BlockingReadQueryWithRetries.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.stress; + +import java.util.List; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Driver; +import org.neo4j.driver.Record; +import org.neo4j.driver.Result; +import org.neo4j.driver.Session; +import org.neo4j.driver.types.Node; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.neo4j.driver.internal.util.Iterables.single; + +public class BlockingReadQueryWithRetries extends AbstractBlockingQuery +{ + public BlockingReadQueryWithRetries( Driver driver, boolean useBookmark ) + { + super( driver, useBookmark ); + } + + @Override + public void execute( C context ) + { + try ( Session session = newSession( AccessMode.READ, context ) ) + { + session.readTransaction( + tx -> + { + Result result = tx.run( "MATCH (n) RETURN n LIMIT 1" ); + List records = result.list(); + if ( !records.isEmpty() ) + { + Record record = single( records ); + Node node = record.get( 0 ).asNode(); + assertNotNull( node ); + } + + context.readCompleted( result.consume() ); + return 1; + } ); + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryUsingReadSessionInTx.java b/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryUsingReadSessionInTx.java deleted file mode 100644 index 2a41e65310..0000000000 --- a/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryUsingReadSessionInTx.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.stress; - -import java.util.concurrent.atomic.AtomicReference; - -import org.neo4j.driver.AccessMode; -import org.neo4j.driver.Driver; -import org.neo4j.driver.Session; -import org.neo4j.driver.Result; -import org.neo4j.driver.Transaction; -import org.neo4j.driver.exceptions.ClientException; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; - -public class BlockingWriteQueryUsingReadSessionInTx extends AbstractBlockingQuery -{ - public BlockingWriteQueryUsingReadSessionInTx( Driver driver, boolean useBookmark ) - { - super( driver, useBookmark ); - } - - @Override - public void execute( C context ) - { - AtomicReference resultRef = new AtomicReference<>(); - assertThrows( ClientException.class, () -> - { - try ( Session session = newSession( AccessMode.READ, context ); - Transaction tx = beginTransaction( session, context ) ) - { - Result result = tx.run( "CREATE ()" ); - resultRef.set( result ); - tx.commit(); - } - } ); - assertNotNull( resultRef.get() ); - assertEquals( 0, resultRef.get().consume().counters().nodesCreated() ); - } -} diff --git a/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryUsingReadSession.java b/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryUsingReadSessionWithRetries.java similarity index 81% rename from driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryUsingReadSession.java rename to driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryUsingReadSessionWithRetries.java index 9c6437586b..39d527e040 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryUsingReadSession.java +++ b/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryUsingReadSessionWithRetries.java @@ -22,17 +22,17 @@ import org.neo4j.driver.AccessMode; import org.neo4j.driver.Driver; -import org.neo4j.driver.Session; import org.neo4j.driver.Result; +import org.neo4j.driver.Session; import org.neo4j.driver.exceptions.ClientException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; -public class BlockingWriteQueryUsingReadSession extends AbstractBlockingQuery +public class BlockingWriteQueryUsingReadSessionWithRetries extends AbstractBlockingQuery { - public BlockingWriteQueryUsingReadSession( Driver driver, boolean useBookmark ) + public BlockingWriteQueryUsingReadSessionWithRetries( Driver driver, boolean useBookmark ) { super( driver, useBookmark ); } @@ -45,8 +45,10 @@ public void execute( C context ) { try ( Session session = newSession( AccessMode.READ, context ) ) { - Result result = session.run( "CREATE ()" ); - resultRef.set( result ); + session.readTransaction( tx -> { + resultRef.set( tx.run( "CREATE ()" )); + return 1; + }); } } ); assertNotNull( resultRef.get() ); diff --git a/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryWithRetries.java new file mode 100644 index 0000000000..f7c36cab65 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/stress/BlockingWriteQueryWithRetries.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.stress; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Driver; +import org.neo4j.driver.Session; +import org.neo4j.driver.summary.ResultSummary; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BlockingWriteQueryWithRetries extends AbstractBlockingQuery +{ + private final AbstractStressTestBase stressTest; + + public BlockingWriteQueryWithRetries( AbstractStressTestBase stressTest, Driver driver, boolean useBookmark ) + { + super( driver, useBookmark ); + this.stressTest = stressTest; + } + + @Override + public void execute( C context ) + { + ResultSummary resultSummary = null; + Throwable queryError = null; + + try ( Session session = newSession( AccessMode.WRITE, context ) ) + { + resultSummary = session.writeTransaction( tx -> tx.run( "CREATE ()" ).consume() ); + context.setBookmark( session.lastBookmark() ); + } + catch ( Throwable error ) + { + queryError = error; + if ( !stressTest.handleWriteFailure( error, context ) ) + { + throw error; + } + } + + if ( queryError == null && resultSummary != null ) + { + assertEquals( 1, resultSummary.counters().nodesCreated() ); + context.nodeCreated(); + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/stress/BlockingWrongQueryWithRetries.java b/driver/src/test/java/org/neo4j/driver/stress/BlockingWrongQueryWithRetries.java new file mode 100644 index 0000000000..a376319f90 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/stress/BlockingWrongQueryWithRetries.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.stress; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Driver; +import org.neo4j.driver.Session; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.junit.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.neo4j.driver.internal.util.Matchers.syntaxError; + +public class BlockingWrongQueryWithRetries extends AbstractBlockingQuery +{ + public BlockingWrongQueryWithRetries( Driver driver ) + { + super( driver, false ); + } + + @Override + public void execute( C context ) + { + try ( Session session = newSession( AccessMode.READ, context ) ) + { + Exception e = assertThrows( Exception.class, () -> session.readTransaction( tx -> tx.run( "RETURN" ).consume() ) ); + assertThat( e, is( syntaxError( "Unexpected end of input" ) ) ); + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/stress/CausalClusteringStressIT.java b/driver/src/test/java/org/neo4j/driver/stress/CausalClusteringStressIT.java index 7f01d74331..d3b67e3c2c 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/CausalClusteringStressIT.java +++ b/driver/src/test/java/org/neo4j/driver/stress/CausalClusteringStressIT.java @@ -59,17 +59,6 @@ Context createContext() return new Context(); } - @Override - List> createTestSpecificBlockingCommands() - { - return Arrays.asList( - new BlockingWriteQueryUsingReadSession<>( driver, false ), - new BlockingWriteQueryUsingReadSession<>( driver, true ), - new BlockingWriteQueryUsingReadSessionInTx<>( driver, false ), - new BlockingWriteQueryUsingReadSessionInTx<>( driver, true ) - ); - } - @Override boolean handleWriteFailure( Throwable error, Context context ) { @@ -101,6 +90,14 @@ void dumpLogs() clusterRule.dumpClusterLogs(); } + @Override + List> createTestSpecificBlockingCommands() + { + return Arrays.asList( + new BlockingWriteQueryUsingReadSessionWithRetries<>( driver, false ), + new BlockingWriteQueryUsingReadSessionWithRetries<>( driver, true ) ); + } + static class Context extends AbstractContext { final AtomicInteger leaderSwitches = new AtomicInteger(); diff --git a/driver/src/test/java/org/neo4j/driver/stress/SingleInstanceStressIT.java b/driver/src/test/java/org/neo4j/driver/stress/SingleInstanceStressIT.java index ef98847f0c..ef5537e184 100644 --- a/driver/src/test/java/org/neo4j/driver/stress/SingleInstanceStressIT.java +++ b/driver/src/test/java/org/neo4j/driver/stress/SingleInstanceStressIT.java @@ -21,7 +21,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.net.URI; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import org.neo4j.driver.AuthToken; @@ -59,12 +59,6 @@ Context createContext() return new Context(); } - @Override - List> createTestSpecificBlockingCommands() - { - return Collections.emptyList(); - } - @Override boolean handleWriteFailure( Throwable error, Context context ) { @@ -81,6 +75,73 @@ void printStats( A context ) System.out.println( "Bookmark failures: " + context.getBookmarkFailures() ); } + @Override + List> createTestSpecificBlockingCommands() + { + return Arrays.asList( + new BlockingReadQuery<>( driver, false ), + new BlockingReadQuery<>( driver, true ), + + new BlockingReadQueryInTx<>( driver, false ), + new BlockingReadQueryInTx<>( driver, true ), + + new BlockingWriteQuery<>( this, driver, false ), + new BlockingWriteQuery<>( this, driver, true ), + + new BlockingWriteQueryInTx<>( this, driver, false ), + new BlockingWriteQueryInTx<>( this, driver, true ), + + new BlockingWrongQuery<>( driver ), + new BlockingWrongQueryInTx<>( driver ), + + new BlockingFailingQuery<>( driver ), + new BlockingFailingQueryInTx<>( driver ) ); + } + + @Override + List> createTestSpecificAsyncCommands() + { + return Arrays.asList( + new AsyncReadQuery<>( driver, false ), + new AsyncReadQuery<>( driver, true ), + + new AsyncReadQueryInTx<>( driver, false ), + new AsyncReadQueryInTx<>( driver, true ), + + new AsyncWriteQuery<>( this, driver, false ), + new AsyncWriteQuery<>( this, driver, true ), + + new AsyncWriteQueryInTx<>( this, driver, false ), + new AsyncWriteQueryInTx<>( this, driver, true ), + + new AsyncWrongQuery<>( driver ), + new AsyncWrongQueryInTx<>( driver ), + + new AsyncFailingQuery<>( driver ), + new AsyncFailingQueryInTx<>( driver ) ); + } + + @Override + List> createTestSpecificRxCommands() + { + return Arrays.asList( + new RxReadQuery<>( driver, false ), + new RxReadQuery<>( driver, true ), + + new RxWriteQuery<>( this, driver, false ), + new RxWriteQuery<>( this, driver, true ), + + new RxReadQueryInTx<>( driver, false ), + new RxReadQueryInTx<>( driver, true ), + + new RxWriteQueryInTx<>( this, driver, false ), + new RxWriteQueryInTx<>( this, driver, true ), + + new RxFailingQuery<>( driver ), + new RxFailingQueryInTx<>( driver ) + ); + } + static class Context extends AbstractContext { }