diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/AbstractTestkitRequestWithTransactionConfig.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/AbstractTestkitRequestWithTransactionConfig.java new file mode 100644 index 0000000000..e34890aa6b --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/AbstractTestkitRequestWithTransactionConfig.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.requests; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import lombok.Getter; +import lombok.Setter; +import neo4j.org.testkit.backend.CustomDriverError; +import neo4j.org.testkit.backend.messages.requests.deserializer.TestkitCypherParamDeserializer; +import org.neo4j.driver.TransactionConfig; + +@Setter +@Getter +abstract class AbstractTestkitRequestWithTransactionConfig< + T extends AbstractTestkitRequestWithTransactionConfig.TransactionConfigBody> + implements TestkitRequest { + protected T data; + + protected TransactionConfig buildTxConfig() { + return configureTx(TransactionConfig.builder()).build(); + } + + private TransactionConfig.Builder configureTx(TransactionConfig.Builder builder) { + return configureTxMetadata(configureTxTimeout(builder)); + } + + private TransactionConfig.Builder configureTxMetadata(TransactionConfig.Builder builder) { + data.getTxMeta().ifPresent(builder::withMetadata); + return builder; + } + + private TransactionConfig.Builder configureTxTimeout(TransactionConfig.Builder builder) { + try { + data.getTimeout().ifPresent((timeout) -> builder.withTimeout(Duration.ofMillis(timeout))); + } catch (IllegalArgumentException e) { + throw new CustomDriverError(e); + } + return builder; + } + + @Setter + abstract static class TransactionConfigBody { + protected Integer timeout; + + @JsonDeserialize(using = TestkitCypherParamDeserializer.class) + protected Map txMeta; + + Optional getTimeout() { + return Optional.ofNullable(timeout); + } + + Optional> getTxMeta() { + return Optional.ofNullable(txMeta); + } + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java index 7b8c1dcf34..6df030315c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java @@ -20,13 +20,9 @@ import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; -import java.time.Duration; -import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletionStage; import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.CustomDriverError; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.AsyncTransactionHolder; import neo4j.org.testkit.backend.holder.ReactiveTransactionHolder; @@ -43,35 +39,14 @@ import org.neo4j.driver.reactive.RxSession; import reactor.core.publisher.Mono; -@Setter -@Getter -public class SessionBeginTransaction implements TestkitRequest { - private SessionBeginTransactionBody data; - - private void configureTimeout(TransactionConfig.Builder builder) { - if (data.getTimeoutPresent()) { - try { - if (data.getTimeout() != null) { - builder.withTimeout(Duration.ofMillis(data.getTimeout())); - } else { - builder.withDefaultTimeout(); - } - } catch (IllegalArgumentException e) { - throw new CustomDriverError(e); - } - } - } - +public class SessionBeginTransaction + extends AbstractTestkitRequestWithTransactionConfig { @Override public TestkitResponse process(TestkitState testkitState) { SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId()); Session session = sessionHolder.getSession(); - TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - configureTimeout(builder); - - org.neo4j.driver.Transaction transaction = session.beginTransaction(builder.build()); + org.neo4j.driver.Transaction transaction = session.beginTransaction(buildTxConfig()); return transaction(testkitState.addTransactionHolder(new TransactionHolder(sessionHolder, transaction))); } @@ -80,11 +55,8 @@ public CompletionStage processAsync(TestkitState testkitState) return testkitState.getAsyncSessionHolder(data.getSessionId()).thenCompose(sessionHolder -> { AsyncSession session = sessionHolder.getSession(); TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - - configureTimeout(builder); - return session.beginTransactionAsync(builder.build()) + return session.beginTransactionAsync(buildTxConfig()) .thenApply(tx -> transaction( testkitState.addAsyncTransactionHolder(new AsyncTransactionHolder(sessionHolder, tx)))); }); @@ -96,11 +68,8 @@ public Mono processRx(TestkitState testkitState) { return testkitState.getRxSessionHolder(data.getSessionId()).flatMap(sessionHolder -> { RxSession session = sessionHolder.getSession(); TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - configureTimeout(builder); - - return Mono.fromDirect(session.beginTransaction(builder.build())) + return Mono.fromDirect(session.beginTransaction(buildTxConfig())) .map(tx -> transaction( testkitState.addRxTransactionHolder(new RxTransactionHolder(sessionHolder, tx)))); }); @@ -111,11 +80,8 @@ public Mono processReactive(TestkitState testkitState) { return testkitState.getReactiveSessionHolder(data.getSessionId()).flatMap(sessionHolder -> { ReactiveSession session = sessionHolder.getSession(); TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - - configureTimeout(builder); - return Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(builder.build()))) + return Mono.fromDirect(flowPublisherToFlux(session.beginTransaction(buildTxConfig()))) .map(tx -> transaction(testkitState.addReactiveTransactionHolder( new ReactiveTransactionHolder(sessionHolder, tx)))); }); @@ -126,11 +92,8 @@ public Mono processReactiveStreams(TestkitState testkitState) { return testkitState.getReactiveSessionStreamsHolder(data.getSessionId()).flatMap(sessionHolder -> { var session = sessionHolder.getSession(); TransactionConfig.Builder builder = TransactionConfig.builder(); - Optional.ofNullable(data.txMeta).ifPresent(builder::withMetadata); - configureTimeout(builder); - - return Mono.fromDirect(session.beginTransaction(builder.build())) + return Mono.fromDirect(session.beginTransaction(buildTxConfig())) .map(tx -> transaction(testkitState.addReactiveTransactionStreamsHolder( new ReactiveTransactionStreamsHolder(sessionHolder, tx)))); }); @@ -144,15 +107,8 @@ private Transaction transaction(String txId) { @Getter @Setter - public static class SessionBeginTransactionBody { + public static class SessionBeginTransactionBody + extends AbstractTestkitRequestWithTransactionConfig.TransactionConfigBody { private String sessionId; - private Map txMeta; - private Integer timeout; - private Boolean timeoutPresent = false; - - public void setTimeout(Integer timeout) { - this.timeout = timeout; - timeoutPresent = true; - } } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java index 2d13c08e81..60167f1e6c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java @@ -47,17 +47,14 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -@Setter -@Getter -public class SessionReadTransaction implements TestkitRequest { - private SessionReadTransactionBody data; - +public class SessionReadTransaction + extends AbstractTestkitRequestWithTransactionConfig { @Override @SuppressWarnings("deprecation") public TestkitResponse process(TestkitState testkitState) { SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId()); Session session = sessionHolder.getSession(); - session.readTransaction(handle(testkitState, sessionHolder)); + session.readTransaction(handle(testkitState, sessionHolder), buildTxConfig()); return retryableDone(); } @@ -78,7 +75,7 @@ public CompletionStage processAsync(TestkitState testkitState) return txWorkFuture; }; - return session.readTransactionAsync(workWrapper); + return session.readTransactionAsync(workWrapper, buildTxConfig()); }) .thenApply(nothing -> retryableDone()); } @@ -97,7 +94,7 @@ public Mono processRx(TestkitState testkitState) { return Mono.fromCompletionStage(tryResult); }; - return Mono.fromDirect(sessionHolder.getSession().readTransaction(workWrapper)); + return Mono.fromDirect(sessionHolder.getSession().readTransaction(workWrapper, buildTxConfig())); }) .then(Mono.just(retryableDone())); } @@ -117,7 +114,7 @@ public Mono processReactive(TestkitState testkitState) { }; return Mono.fromDirect( - flowPublisherToFlux(sessionHolder.getSession().executeRead(workWrapper))); + flowPublisherToFlux(sessionHolder.getSession().executeRead(workWrapper, buildTxConfig()))); }) .then(Mono.just(retryableDone())); } @@ -137,7 +134,7 @@ public Mono processReactiveStreams(TestkitState testkitState) { return Mono.fromCompletionStage(tryResult); }; - return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper)); + return Mono.fromDirect(sessionHolder.getSession().executeRead(workWrapper, buildTxConfig())); }) .then(Mono.just(retryableDone())); } @@ -177,7 +174,8 @@ private RetryableDone retryableDone() { @Setter @Getter - public static class SessionReadTransactionBody { + public static class SessionReadTransactionBody + extends AbstractTestkitRequestWithTransactionConfig.TransactionConfigBody { private String sessionId; } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java index 2e0eb237c4..01758e8610 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java @@ -21,14 +21,12 @@ import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletionStage; import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.CustomDriverError; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.ReactiveResultHolder; import neo4j.org.testkit.backend.holder.ReactiveResultStreamsHolder; @@ -41,32 +39,13 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import org.neo4j.driver.Query; import org.neo4j.driver.Session; -import org.neo4j.driver.TransactionConfig; import org.neo4j.driver.async.AsyncSession; import org.neo4j.driver.reactive.ReactiveSession; import org.neo4j.driver.reactive.RxResult; import org.neo4j.driver.reactive.RxSession; import reactor.core.publisher.Mono; -@Setter -@Getter -public class SessionRun implements TestkitRequest { - private SessionRunBody data; - - private void configureTimeout(TransactionConfig.Builder builder) { - if (data.getTimeoutPresent()) { - try { - if (data.getTimeout() != null) { - builder.withTimeout(Duration.ofMillis(data.getTimeout())); - } else { - builder.withDefaultTimeout(); - } - } catch (IllegalArgumentException e) { - throw new CustomDriverError(e); - } - } - } - +public class SessionRun extends AbstractTestkitRequestWithTransactionConfig { @Override public TestkitResponse process(TestkitState testkitState) { SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId()); @@ -74,10 +53,7 @@ public TestkitResponse process(TestkitState testkitState) { Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - org.neo4j.driver.Result result = session.run(query, transactionConfig.build()); + org.neo4j.driver.Result result = session.run(query, buildTxConfig()); String id = testkitState.addResultHolder(new ResultHolder(sessionHolder, result)); return createResponse(id, result.keys()); @@ -90,11 +66,8 @@ public CompletionStage processAsync(TestkitState testkitState) Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - return session.runAsync(query, transactionConfig.build()).thenApply(resultCursor -> { + return session.runAsync(query, buildTxConfig()).thenApply(resultCursor -> { String id = testkitState.addAsyncResultHolder(new ResultCursorHolder(sessionHolder, resultCursor)); return createResponse(id, resultCursor.keys()); }); @@ -109,11 +82,8 @@ public Mono processRx(TestkitState testkitState) { Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - RxResult result = session.run(query, transactionConfig.build()); + RxResult result = session.run(query, buildTxConfig()); String id = testkitState.addRxResultHolder(new RxResultHolder(sessionHolder, result)); // The keys() method causes RUN message exchange. @@ -129,11 +99,8 @@ public Mono processReactive(TestkitState testkitState) { Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - return Mono.fromDirect(flowPublisherToFlux(session.run(query, transactionConfig.build()))) + return Mono.fromDirect(flowPublisherToFlux(session.run(query, buildTxConfig()))) .map(result -> { String id = testkitState.addReactiveResultHolder(new ReactiveResultHolder(sessionHolder, result)); @@ -149,16 +116,12 @@ public Mono processReactiveStreams(TestkitState testkitState) { Query query = Optional.ofNullable(data.params) .map(params -> new Query(data.cypher, data.params)) .orElseGet(() -> new Query(data.cypher)); - TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); - Optional.ofNullable(data.getTxMeta()).ifPresent(transactionConfig::withMetadata); - configureTimeout(transactionConfig); - return Mono.fromDirect(session.run(query, transactionConfig.build())) - .map(result -> { - String id = testkitState.addReactiveResultStreamsHolder( - new ReactiveResultStreamsHolder(sessionHolder, result)); - return createResponse(id, result.keys()); - }); + return Mono.fromDirect(session.run(query, buildTxConfig())).map(result -> { + String id = testkitState.addReactiveResultStreamsHolder( + new ReactiveResultStreamsHolder(sessionHolder, result)); + return createResponse(id, result.keys()); + }); }); } @@ -170,19 +133,11 @@ private Result createResponse(String resultId, List keys) { @Setter @Getter - public static class SessionRunBody { + public static class SessionRunBody extends AbstractTestkitRequestWithTransactionConfig.TransactionConfigBody { @JsonDeserialize(using = TestkitCypherParamDeserializer.class) private Map params; private String sessionId; private String cypher; - private Map txMeta; - private Integer timeout; - private Boolean timeoutPresent = false; - - public void setTimeout(Integer timeout) { - this.timeout = timeout; - timeoutPresent = true; - } } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java index 88abf48685..d942af21e4 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java @@ -21,7 +21,6 @@ import static reactor.adapter.JdkFlowAdapter.flowPublisherToFlux; import static reactor.adapter.JdkFlowAdapter.publisherToFlowPublisher; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; @@ -48,17 +47,14 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -@Setter -@Getter -public class SessionWriteTransaction implements TestkitRequest { - private SessionWriteTransactionBody data; - +public class SessionWriteTransaction + extends AbstractTestkitRequestWithTransactionConfig { @Override @SuppressWarnings("deprecation") public TestkitResponse process(TestkitState testkitState) { SessionHolder sessionHolder = testkitState.getSessionHolder(data.getSessionId()); Session session = sessionHolder.getSession(); - session.writeTransaction(handle(testkitState, sessionHolder)); + session.writeTransaction(handle(testkitState, sessionHolder), buildTxConfig()); return retryableDone(); } @@ -79,7 +75,7 @@ public CompletionStage processAsync(TestkitState testkitState) return tryResult; }; - return session.writeTransactionAsync(workWrapper); + return session.writeTransactionAsync(workWrapper, buildTxConfig()); }) .thenApply(nothing -> retryableDone()); } @@ -98,7 +94,7 @@ public Mono processRx(TestkitState testkitState) { return Mono.fromCompletionStage(tryResult); }; - return Mono.fromDirect(sessionHolder.getSession().writeTransaction(workWrapper)); + return Mono.fromDirect(sessionHolder.getSession().writeTransaction(workWrapper, buildTxConfig())); }) .then(Mono.just(retryableDone())); } @@ -118,7 +114,7 @@ public Mono processReactive(TestkitState testkitState) { }; return Mono.fromDirect( - flowPublisherToFlux(sessionHolder.getSession().executeWrite(workWrapper))); + flowPublisherToFlux(sessionHolder.getSession().executeWrite(workWrapper, buildTxConfig()))); }) .then(Mono.just(retryableDone())); } @@ -138,7 +134,7 @@ public Mono processReactiveStreams(TestkitState testkitState) { return Mono.fromCompletionStage(tryResult); }; - return Mono.fromDirect(sessionHolder.getSession().executeWrite(workWrapper)); + return Mono.fromDirect(sessionHolder.getSession().executeWrite(workWrapper, buildTxConfig())); }) .then(Mono.just(retryableDone())); } @@ -178,9 +174,8 @@ private RetryableDone retryableDone() { @Setter @Getter - public static class SessionWriteTransactionBody { + public static class SessionWriteTransactionBody + extends AbstractTestkitRequestWithTransactionConfig.TransactionConfigBody { private String sessionId; - private Map txMeta; - private String timeout; } }