From 252d26c9a23bef31deac0e7300095dccb9c5acf2 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Fri, 19 Aug 2022 13:49:25 +0200 Subject: [PATCH] Add support for optimistic locking in the new Redis API. --- docs/src/main/asciidoc/redis-reference.adoc | 49 +++ .../datasource/ReactiveRedisDataSource.java | 50 ++- .../redis/datasource/RedisDataSource.java | 43 +++ .../OptimisticLockingTransactionResult.java | 18 + .../BlockingRedisDataSourceImpl.java | 38 +- ...ptimisticLockingTransactionResultImpl.java | 56 +++ .../ReactiveRedisDataSourceImpl.java | 68 +++- .../datasource/OptimisticLockingTest.java | 350 ++++++++++++++++++ 8 files changed, 659 insertions(+), 13 deletions(-) create mode 100644 extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/transactions/OptimisticLockingTransactionResult.java create mode 100644 extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/OptimisticLockingTransactionResultImpl.java create mode 100644 extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/OptimisticLockingTest.java diff --git a/docs/src/main/asciidoc/redis-reference.adoc b/docs/src/main/asciidoc/redis-reference.adoc index 6c0b3cfff7296..eb9ffcf2344c1 100644 --- a/docs/src/main/asciidoc/redis-reference.adoc +++ b/docs/src/main/asciidoc/redis-reference.adoc @@ -573,6 +573,55 @@ TransactionResult result = ds.withTransaction(tx -> { IMPORTANT: You cannot use the pub/sub feature from within a transaction. +==== Using optimistic locking + +To use optimistic locking, you need to use a variant of the `withTransaction` method, allowing the execution of code before the transaction starts. +In other words, it will be executed as follows: + +[source] +---- +WATCH key + +// Pre-transaction block +// .... +// Produce a result + +MULTI + // In transaction code, receive the result produced by the pre-transaction block. +EXEC +---- + +For example, if you need to update a value in a hash only if the field exists, you will use the following API: + +[source, java] +---- +OptimisticLockingTransactionResult result = blocking.withTransaction(ds -> { + // The pre-transaction block: + HashCommands hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); // Produce a result (boolean in this case) +}, + (exists, tx) -> { // The transactional block, receives the result and the transactional data source + if (exists) { + tx.hash(String.class).hset(key, "field", "new value"); + } else { + tx.discard(); + } + }, + key); // The watched key +---- + +If one of the watched keys is touched before or during the execution of the pre-transaction or transactional blocks, the transaction is aborted. +The pre-transactional block produces a result that the transactional block can use. +This construct is necessary because, within a transaction, the commands do not produce a result. +Results can only be retrieved after the execution of the transaction. + +The pre-transaction and transactional blocks are invoked on the same Redis connection. +Consequently, the pre-transaction block must use the passed data source to execute commands. +Thus, the commands are emitted from that connection. +These commands must not modify the watched keys. + +The transaction is aborted if the pre-transaction block throws an exception (or produces a failure when using the reactive API). + ==== Executing custom commands To execute a custom command, or a command not supported by the API, use the following approach: diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/ReactiveRedisDataSource.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/ReactiveRedisDataSource.java index 643bbd7c740c4..7c1f87c2b814e 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/ReactiveRedisDataSource.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/ReactiveRedisDataSource.java @@ -1,5 +1,6 @@ package io.quarkus.redis.datasource; +import java.util.function.BiFunction; import java.util.function.Function; import io.quarkus.redis.datasource.bitmap.ReactiveBitMapCommands; @@ -12,6 +13,7 @@ import io.quarkus.redis.datasource.set.ReactiveSetCommands; import io.quarkus.redis.datasource.sortedset.ReactiveSortedSetCommands; import io.quarkus.redis.datasource.string.ReactiveStringCommands; +import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult; import io.quarkus.redis.datasource.transactions.ReactiveTransactionalRedisDataSource; import io.quarkus.redis.datasource.transactions.TransactionResult; import io.quarkus.redis.datasource.transactions.TransactionalRedisDataSource; @@ -45,9 +47,9 @@ public interface ReactiveRedisDataSource { * Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}). * Note that transaction acquires a single connection, and all the commands are enqueued in this connection. * The commands are only executed when the passed block emits the {@code null} item. - * + *

* The results of the commands are retrieved using the produced {@link TransactionResult}. - * + *

* The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method. * In this case, the produced {@link TransactionResult} will be empty. * @@ -60,9 +62,9 @@ public interface ReactiveRedisDataSource { * Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}). * Note that transaction acquires a single connection, and all the commands are enqueued in this connection. * The commands are only executed when the passed block emits the {@code null} item. - * + *

* The results of the commands are retrieved using the produced {@link TransactionResult}. - * + *

* The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method. * In this case, the produced {@link TransactionResult} will be empty. * @@ -73,6 +75,46 @@ public interface ReactiveRedisDataSource { */ Uni withTransaction(Function> tx, String... watchedKeys); + /** + * Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}). + * Note that transaction acquires a single connection, and all the commands are enqueued in this connection. + * The commands are only executed when the passed block emits the {@code null} item. + *

+ * This variant also allows executing code before the transaction gets started but after the key being watched: + * + *

+     *     WATCH key
+     *     // preTxBlock
+     *     element = ZRANGE k 0 0
+     *     // TxBlock
+     *     MULTI
+     *        ZREM k element
+     *     EXEC
+     * 
+ *

+ * The {@code preTxBlock} returns a {@link Uni Uni<I>}. The produced value is received by the {@code tx} block, + * which can use that value to execute the appropriate operation in the transaction. The produced value can also be + * retrieved from the produced {@link OptimisticLockingTransactionResult}. Commands issued in the {@code preTxBlock } + * must used the passed (single-connection) {@link ReactiveRedisDataSource} instance. + *

+ * If the {@code preTxBlock} throws an exception or emits a failure, the transaction is not executed, and the returned + * {@link OptimisticLockingTransactionResult} is empty. + *

+ * This construct allows implementing operation relying on optimistic locking. + * The results of the commands are retrieved using the produced {@link OptimisticLockingTransactionResult}. + *

+ * The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method. + * In this case, the produced {@link OptimisticLockingTransactionResult} will be empty. + * + * @param tx the consumer receiving the transactional redis data source. The enqueued commands are only executed + * at the end of the block. + * @param watchedKeys the keys to watch during the execution of the transaction. If one of these key is modified before + * the completion of the transaction, the transaction is discarded. + */ + Uni> withTransaction(Function> preTxBlock, + BiFunction> tx, + String... watchedKeys); + /** * Execute the command SELECT. * Summary: Change the selected database for the current connection diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/RedisDataSource.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/RedisDataSource.java index 2f143e0009af6..585c7d7c616a3 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/RedisDataSource.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/RedisDataSource.java @@ -1,6 +1,8 @@ package io.quarkus.redis.datasource; +import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import io.quarkus.redis.datasource.bitmap.BitMapCommands; import io.quarkus.redis.datasource.geo.GeoCommands; @@ -12,6 +14,7 @@ import io.quarkus.redis.datasource.set.SetCommands; import io.quarkus.redis.datasource.sortedset.SortedSetCommands; import io.quarkus.redis.datasource.string.StringCommands; +import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult; import io.quarkus.redis.datasource.transactions.TransactionResult; import io.quarkus.redis.datasource.transactions.TransactionalRedisDataSource; import io.vertx.mutiny.redis.client.Command; @@ -68,6 +71,46 @@ public interface RedisDataSource { */ TransactionResult withTransaction(Consumer tx, String... watchedKeys); + /** + * Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}). + * Note that transaction acquires a single connection, and all the commands are enqueued in this connection. + * The commands are only executed when the passed block emits the {@code null} item. + *

+ * This variant also allows executing code before the transaction gets started but after the key being watched: + * + *

+     *     WATCH key
+     *     // preTxBlock
+     *     element = ZRANGE k 0 0
+     *     // TxBlock
+     *     MULTI
+     *        ZREM k element
+     *     EXEC
+     * 
+ *

+ * The {@code preTxBlock} returns a {@link I}. The produced value is received by the {@code tx} block, + * which can use that value to execute the appropriate operation in the transaction. The produced value can also be + * retrieved from the produced {@link OptimisticLockingTransactionResult}. Commands issued in the {@code preTxBlock } + * must used the passed (single-connection) {@link RedisDataSource} instance. + *

+ * If the {@code preTxBlock} throws an exception, the transaction is not executed, and the returned + * {@link OptimisticLockingTransactionResult} is empty. + *

+ * This construct allows implementing operation relying on optimistic locking. + * The results of the commands are retrieved using the produced {@link OptimisticLockingTransactionResult}. + *

+ * The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method. + * In this case, the produced {@link OptimisticLockingTransactionResult} will be empty. + * + * @param tx the consumer receiving the transactional redis data source. The enqueued commands are only executed + * at the end of the block. + * @param watchedKeys the keys to watch during the execution of the transaction. If one of these key is modified before + * the completion of the transaction, the transaction is discarded. + */ + OptimisticLockingTransactionResult withTransaction(Function preTxBlock, + BiConsumer tx, + String... watchedKeys); + /** * Execute the command SELECT. * Summary: Change the selected database for the current connection diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/transactions/OptimisticLockingTransactionResult.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/transactions/OptimisticLockingTransactionResult.java new file mode 100644 index 0000000000000..2cde85dbb5301 --- /dev/null +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/datasource/transactions/OptimisticLockingTransactionResult.java @@ -0,0 +1,18 @@ +package io.quarkus.redis.datasource.transactions; + +/** + * A structure holding the result of the commands executed in a transaction. Note that the result are ordered, and the + * (0-based) index of the command must be used to retrieve the result of a specific command. + * + * In addition, it provides the rerult from the pre-transaction block. + */ +public interface OptimisticLockingTransactionResult extends TransactionResult { + + /** + * Retrieves the result from the pre-transaction block + * + * @return the value produces by the pre-transaction block + */ + I getPreTransactionResult(); + +} diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingRedisDataSourceImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingRedisDataSourceImpl.java index 2d8739f2c53ec..59b2bb61ad641 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingRedisDataSourceImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingRedisDataSourceImpl.java @@ -3,7 +3,9 @@ import static io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl.toTransactionResult; import java.time.Duration; +import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import io.quarkus.redis.datasource.ReactiveRedisDataSource; import io.quarkus.redis.datasource.RedisDataSource; @@ -17,6 +19,7 @@ import io.quarkus.redis.datasource.set.SetCommands; import io.quarkus.redis.datasource.sortedset.SortedSetCommands; import io.quarkus.redis.datasource.string.StringCommands; +import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult; import io.quarkus.redis.datasource.transactions.TransactionResult; import io.quarkus.redis.datasource.transactions.TransactionalRedisDataSource; import io.vertx.mutiny.redis.client.Command; @@ -62,7 +65,6 @@ public TransactionResult withTransaction(Consumer } else { return toTransactionResult(null, th); } - } finally { connection.closeAndAwait(); } @@ -98,6 +100,40 @@ public TransactionResult withTransaction(Consumer } } + @Override + public OptimisticLockingTransactionResult withTransaction(Function preTxBlock, + BiConsumer tx, String... watchedKeys) { + RedisConnection connection = reactive.redis.connect().await().atMost(timeout); + ReactiveRedisDataSourceImpl dataSource = new ReactiveRedisDataSourceImpl(reactive.redis, connection); + TransactionHolder th = new TransactionHolder(); + BlockingTransactionalRedisDataSourceImpl source = new BlockingTransactionalRedisDataSourceImpl( + new ReactiveTransactionalRedisDataSourceImpl(dataSource, th), timeout); + + try { + Request cmd = Request.cmd(Command.WATCH); + for (String watchedKey : watchedKeys) { + cmd.arg(watchedKey); + } + connection.send(cmd).await().atMost(timeout); + + I input = preTxBlock.apply(new BlockingRedisDataSourceImpl(reactive.redis, connection, timeout)); + + connection.send(Request.cmd(Command.MULTI)).await().atMost(timeout); + + tx.accept(input, source); + if (!source.discarded()) { + Response response = connection.send(Request.cmd(Command.EXEC)).await().atMost(timeout); + // exec produce null is the transaction has been discarded + return toTransactionResult(response, input, th); + } else { + return toTransactionResult(null, input, th); + } + + } finally { + connection.closeAndAwait(); + } + } + @Override public void withConnection(Consumer consumer) { if (connection != null) { diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/OptimisticLockingTransactionResultImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/OptimisticLockingTransactionResultImpl.java new file mode 100644 index 0000000000000..44b7547fbfb85 --- /dev/null +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/OptimisticLockingTransactionResultImpl.java @@ -0,0 +1,56 @@ +package io.quarkus.redis.runtime.datasource; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult; + +public class OptimisticLockingTransactionResultImpl implements OptimisticLockingTransactionResult { + + private final List results = new ArrayList<>(); + private final boolean discarded; + private final I input; + + public OptimisticLockingTransactionResultImpl(boolean discarded, I input, List res) { + this.results.addAll(res); + this.discarded = discarded; + this.input = input; + } + + public static OptimisticLockingTransactionResult discarded(I input) { + return new OptimisticLockingTransactionResultImpl<>(true, input, Collections.emptyList()); + } + + @Override + public boolean discarded() { + return discarded; + } + + @Override + public int size() { + return results.size(); + } + + @Override + public boolean isEmpty() { + return results.isEmpty(); + } + + @SuppressWarnings("unchecked") + @Override + public T get(int index) { + return (T) results.get(index); + } + + @Override + public Iterator iterator() { + return results.iterator(); + } + + @Override + public I getPreTransactionResult() { + return input; + } +} diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveRedisDataSourceImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveRedisDataSourceImpl.java index b82e07cb4db56..11d0efee4a14e 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveRedisDataSourceImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveRedisDataSourceImpl.java @@ -6,6 +6,7 @@ import static io.smallrye.mutiny.helpers.ParameterValidation.positiveOrZero; import java.util.List; +import java.util.function.BiFunction; import java.util.function.Function; import io.quarkus.redis.datasource.ReactiveRedisDataSource; @@ -19,6 +20,7 @@ import io.quarkus.redis.datasource.set.ReactiveSetCommands; import io.quarkus.redis.datasource.sortedset.ReactiveSortedSetCommands; import io.quarkus.redis.datasource.string.ReactiveStringCommands; +import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult; import io.quarkus.redis.datasource.transactions.ReactiveTransactionalRedisDataSource; import io.quarkus.redis.datasource.transactions.TransactionResult; import io.smallrye.mutiny.Uni; @@ -86,20 +88,18 @@ public Uni withTransaction(Function { ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(redis, connection); - List watched = List.of(keys); TransactionHolder th = new TransactionHolder(); - Request request = Request.cmd(Command.WATCH); - for (String s : watched) { - request.arg(s); - } - return connection.send(request) + return watch(connection, keys) // WATCH keys .chain(() -> connection.send(Request.cmd(Command.MULTI)) .chain(x -> function .apply(new ReactiveTransactionalRedisDataSourceImpl(singleConnectionDS, th))) - .chain(ignored -> { - if (!th.discarded()) { + .onItemOrFailure().transformToUni((x, failure) -> { + if (!th.discarded() && failure == null) { return connection.send(Request.cmd(Command.EXEC)); } else { + if (!th.discarded()) { + return connection.send(Request.cmd(Command.DISCARD)); + } return Uni.createFrom().nullItem(); } }) @@ -108,6 +108,49 @@ public Uni withTransaction(Function watch(RedisConnection connection, String... keys) { + List watched = List.of(keys); + Request request = Request.cmd(Command.WATCH); + for (String s : watched) { + request.arg(s); + } + return connection.send(request) + .replaceWithVoid(); + } + + @Override + public Uni> withTransaction(Function> preTxBlock, + BiFunction> tx, String... watchedKeys) { + nonNull(tx, "tx"); + notNullOrEmpty(watchedKeys, "watchedKeys"); + doesNotContainNull(watchedKeys, "watchedKeys"); + nonNull(preTxBlock, "preTxBlock"); + + return redis.connect() + .onItem().transformToUni(connection -> { + ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(redis, connection); + TransactionHolder th = new TransactionHolder(); + return watch(connection, watchedKeys) // WATCH keys + .chain(x -> preTxBlock.apply(new ReactiveRedisDataSourceImpl(redis, connection)))// Execute the pre-tx-block + .chain(input -> connection.send(Request.cmd(Command.MULTI)) + .chain(x -> tx + .apply(input, new ReactiveTransactionalRedisDataSourceImpl(singleConnectionDS, th))) + .onItemOrFailure().transformToUni((x, failure) -> { + if (!th.discarded() && failure == null) { + return connection.send(Request.cmd(Command.EXEC)); + } else { + if (!th.discarded()) { + return connection.send(Request.cmd(Command.DISCARD)) + .replaceWithNull(); + } + return Uni.createFrom().nullItem(); + } + }) + .onTermination().call(connection::close) + .map(r -> toTransactionResult(r, input, th))); + }); + } + public static TransactionResult toTransactionResult(Response response, TransactionHolder th) { if (response == null) { // Discarded @@ -116,6 +159,15 @@ public static TransactionResult toTransactionResult(Response response, Transacti return new TransactionResultImpl(th.discarded(), th.map(response)); } + public static OptimisticLockingTransactionResult toTransactionResult(Response response, I input, + TransactionHolder th) { + if (response == null) { + // Discarded + return OptimisticLockingTransactionResultImpl.discarded(input); + } + return new OptimisticLockingTransactionResultImpl<>(th.discarded(), input, th.map(response)); + } + @Override public Uni execute(String command, String... args) { nonNull(command, "command"); diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/OptimisticLockingTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/OptimisticLockingTest.java new file mode 100644 index 0000000000000..c4a28a4aff81e --- /dev/null +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/OptimisticLockingTest.java @@ -0,0 +1,350 @@ +package io.quarkus.redis.datasource; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Duration; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.quarkus.redis.datasource.hash.HashCommands; +import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult; +import io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl; +import io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl; +import io.smallrye.mutiny.Uni; + +public class OptimisticLockingTest extends DatasourceTestBase { + + private RedisDataSource blocking; + private ReactiveRedisDataSource reactive; + + @BeforeEach + void initialize() { + blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(redis, api); + } + + @AfterEach + public void clear() { + blocking.flushall(); + } + + @Test + public void hashPutIfPresent() { + OptimisticLockingTransactionResult result = blocking.withTransaction(ds -> { + HashCommands hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); + }, + (i, tx) -> { + if (i) { + tx.hash(String.class).hset(key, "field", "Bar"); + } else { + tx.discard(); + } + }, + key); + + assertThat(result.isEmpty()).isTrue(); + assertThat(result.discarded()).isTrue(); + assertThat(result.getPreTransactionResult()).isFalse(); + + blocking.hash(String.class).hset(key, "field", "Foo"); + + result = blocking.withTransaction(ds -> { + HashCommands hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); + }, + (i, tx) -> { + if (i) { + tx.hash(String.class).hset(key, "field", "Bar"); + } else { + tx.discard(); + } + }, + key); + + assertThat(result.isEmpty()).isFalse(); + assertThat(result.discarded()).isFalse(); + assertThat(result.getPreTransactionResult()).isTrue(); + + assertThat(blocking.hash(String.class).hget(key, "field")).isEqualTo("Bar"); + + } + + @Test + public void hashPutIfPresentReactive() { + OptimisticLockingTransactionResult result = reactive.withTransaction(ds -> { + var hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); + }, + (i, tx) -> { + if (i) { + return tx.hash(String.class).hset(key, "field", "Bar") + .replaceWithVoid(); + } else { + return tx.discard(); + } + }, + key).await().indefinitely(); + + assertThat(result.isEmpty()).isTrue(); + assertThat(result.discarded()).isTrue(); + assertThat(result.getPreTransactionResult()).isFalse(); + + blocking.hash(String.class).hset(key, "field", "Foo"); + + result = reactive.withTransaction(ds -> { + var hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); + }, + (i, tx) -> { + if (i) { + return tx.hash(String.class).hset(key, "field", "Bar") + .replaceWithVoid(); + } else { + return tx.discard(); + } + }, + key).await().indefinitely(); + + assertThat(result.isEmpty()).isFalse(); + assertThat(result.discarded()).isFalse(); + assertThat(result.getPreTransactionResult()).isTrue(); + + assertThat(blocking.hash(String.class).hget(key, "field")).isEqualTo("Bar"); + + } + + @Test + public void hashPutIfPresentWithModificationOfTheWatchKey() { + OptimisticLockingTransactionResult result = blocking.withTransaction(ds -> { + + // Using another connection - update the key + blocking.hash(String.class).hset(key, "another", "hello"); + + HashCommands hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); + }, + (i, tx) -> { + if (i) { + tx.hash(String.class).hset(key, "field", "Bar"); + } else { + tx.discard(); + } + }, + key); + + assertThat(result.isEmpty()).isTrue(); + assertThat(result.discarded()).isTrue(); + assertThat(result.getPreTransactionResult()).isFalse(); + + blocking.hash(String.class).hset(key, "field", "Foo"); + + result = blocking.withTransaction(ds -> { + // Using another connection - update the key + blocking.hash(String.class).hset(key, "yet-another", "hello"); + + HashCommands hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); + }, + (i, tx) -> { + if (i) { + tx.hash(String.class).hset(key, "field", "Bar"); + } else { + tx.discard(); + } + }, + key); + + assertThat(result.isEmpty()).isTrue(); + assertThat(result.discarded()).isTrue(); + assertThat(result.getPreTransactionResult()).isTrue(); + + assertThat(blocking.hash(String.class).hget(key, "field")).isEqualTo("Foo"); + + } + + @Test + public void hashPutIfPresentReactiveWithModificationOfTheWatchKey() { + OptimisticLockingTransactionResult result = reactive.withTransaction(ds -> { + // Using another connection - update the key + return reactive.hash(String.class).hset(key, "another", "hello") + .chain(() -> { + var hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); + }); + }, + (i, tx) -> { + if (i) { + return tx.hash(String.class).hset(key, "field", "Bar") + .replaceWithVoid(); + } else { + return tx.discard(); + } + }, + key).await().indefinitely(); + + assertThat(result.isEmpty()).isTrue(); + assertThat(result.discarded()).isTrue(); + assertThat(result.getPreTransactionResult()).isFalse(); + + blocking.hash(String.class).hset(key, "field", "Foo"); + + result = reactive.withTransaction(ds -> { + // Using another connection - update the key + return reactive.hash(String.class).hset(key, "another", "hello") + .chain(() -> { + var hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); + }); + }, + (i, tx) -> { + if (i) { + return tx.hash(String.class).hset(key, "field", "Bar") + .replaceWithVoid(); + } else { + return tx.discard(); + } + }, + key).await().indefinitely(); + + assertThat(result.isEmpty()).isTrue(); + assertThat(result.discarded()).isTrue(); + assertThat(result.getPreTransactionResult()).isTrue(); + + assertThat(blocking.hash(String.class).hget(key, "field")).isEqualTo("Foo"); + + } + + @Test + public void hashPutIfPresentPreBlockFailing() { + assertThatThrownBy(() -> blocking.withTransaction(ds -> { + Assertions.fail("expected"); + HashCommands hashCommands = ds.hash(String.class); + return hashCommands.hexists(key, "field"); + }, + (i, tx) -> { + if (i) { + tx.hash(String.class).hset(key, "field", "Bar"); + } else { + tx.discard(); + } + }, + key)).hasMessageContaining("expected"); + } + + @Test + public void hashPutIfPresentPreBlockProducingAFailure() { + assertThatThrownBy(() -> reactive. withTransaction(ds -> { + return Uni.createFrom().failure(new RuntimeException("expected")); + }, + (i, tx) -> { + if (i) { + return tx.hash(String.class).hset(key, "field", "Bar") + .replaceWithVoid(); + } else { + return tx.discard(); + } + }, + key).await().indefinitely()).hasMessageContaining("expected"); + } + + @Test + public void hashPutIfPresentPreBlockThrowingAnException() { + assertThatThrownBy(() -> reactive.withTransaction(ds -> { + Assertions.fail("expected"); + return Uni.createFrom().item(true); + }, + (i, tx) -> { + if (i) { + return tx.hash(String.class).hset(key, "field", "Bar") + .replaceWithVoid(); + } else { + return tx.discard(); + } + }, + key).await().indefinitely()).hasMessageContaining("expected"); + } + + @Test + public void testZpop() { + var list = blocking.sortedSet(String.class); + list.zadd(key, 1.0, "a"); + list.zadd(key, 2.0, "b"); + list.zadd(key, 3.0, "c"); + + var res = blocking.withTransaction(ds -> { + var elements = ds.sortedSet(String.class).zrange(key, 0, 0); + if (!elements.isEmpty()) { + return elements.get(0); + } + return null; + }, (element, tx) -> { + if (element == null) { + tx.discard(); + } else { + tx.sortedSet(String.class).zrem(key, element); + } + }, key); + + assertThat(res.discarded()).isFalse(); + assertThat(res.getPreTransactionResult()).isEqualTo("a"); + } + + @Test + public void testZpopReactive() { + var list = blocking.sortedSet(String.class); + list.zadd(key, 1.0, "a"); + list.zadd(key, 2.0, "b"); + list.zadd(key, 3.0, "c"); + + var res = reactive.withTransaction(ds -> { + return ds.sortedSet(String.class).zrange(key, 0, 0) + .map(elements -> { + if (elements.isEmpty()) { + return null; + } + return elements.get(0); + }); + }, (element, tx) -> { + if (element == null) { + return tx.discard(); + } else { + return tx.sortedSet(String.class).zrem(key, element); + } + }, key).await().indefinitely(); + + assertThat(res.discarded()).isFalse(); + assertThat(res.getPreTransactionResult()).isEqualTo("a"); + } + + @Test + public void testZpopWithKeyModification() { + var list = blocking.sortedSet(String.class); + list.zadd(key, 1.0, "a"); + list.zadd(key, 2.0, "b"); + list.zadd(key, 3.0, "c"); + + var res = blocking.withTransaction(ds -> { + blocking.sortedSet(String.class).zadd(key, 4.0, "d"); + var elements = ds.sortedSet(String.class).zrange(key, 0, 0); + if (!elements.isEmpty()) { + return elements.get(0); + } + return null; + }, (element, tx) -> { + if (element == null) { + tx.discard(); + } else { + tx.sortedSet(String.class).zrem(key, element); + } + }, key); + + assertThat(res.discarded()).isTrue(); + assertThat(res.getPreTransactionResult()).isEqualTo("a"); + } + +}