Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for optimistic locking / check-and-set in the new Redis API #27381

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions docs/src/main/asciidoc/redis-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,55 @@ TransactionResult result = ds.withTransaction(tx -> {

IMPORTANT: You cannot use the pub/sub feature from within a transaction.

==== Using optimistic locking

To use optimistic locking, you need to use a variant of the `withTransaction` method, allowing the execution of code before the transaction starts.
In other words, it will be executed as follows:

[source]
----
WATCH key

// Pre-transaction block
// ....
// Produce a result

MULTI
// In transaction code, receive the result produced by the pre-transaction block.
EXEC
----

For example, if you need to update a value in a hash only if the field exists, you will use the following API:

[source, java]
----
OptimisticLockingTransactionResult<Boolean> result = blocking.withTransaction(ds -> {
// The pre-transaction block:
HashCommands<String, String, String> hashCommands = ds.hash(String.class);
return hashCommands.hexists(key, "field"); // Produce a result (boolean in this case)
},
(exists, tx) -> { // The transactional block, receives the result and the transactional data source
if (exists) {
tx.hash(String.class).hset(key, "field", "new value");
} else {
tx.discard();
}
},
key); // The watched key
----

If one of the watched keys is touched before or during the execution of the pre-transaction or transactional blocks, the transaction is aborted.
The pre-transactional block produces a result that the transactional block can use.
This construct is necessary because, within a transaction, the commands do not produce a result.
Results can only be retrieved after the execution of the transaction.

The pre-transaction and transactional blocks are invoked on the same Redis connection.
Consequently, the pre-transaction block must use the passed data source to execute commands.
Thus, the commands are emitted from that connection.
These commands must not modify the watched keys.

The transaction is aborted if the pre-transaction block throws an exception (or produces a failure when using the reactive API).

==== Executing custom commands

To execute a custom command, or a command not supported by the API, use the following approach:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.redis.datasource;

import java.util.function.BiFunction;
import java.util.function.Function;

import io.quarkus.redis.datasource.bitmap.ReactiveBitMapCommands;
Expand All @@ -12,6 +13,7 @@
import io.quarkus.redis.datasource.set.ReactiveSetCommands;
import io.quarkus.redis.datasource.sortedset.ReactiveSortedSetCommands;
import io.quarkus.redis.datasource.string.ReactiveStringCommands;
import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult;
import io.quarkus.redis.datasource.transactions.ReactiveTransactionalRedisDataSource;
import io.quarkus.redis.datasource.transactions.TransactionResult;
import io.quarkus.redis.datasource.transactions.TransactionalRedisDataSource;
Expand Down Expand Up @@ -45,9 +47,9 @@ public interface ReactiveRedisDataSource {
* Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}).
* Note that transaction acquires a single connection, and all the commands are enqueued in this connection.
* The commands are only executed when the passed block emits the {@code null} item.
*
* <p>
* The results of the commands are retrieved using the produced {@link TransactionResult}.
*
* <p>
* The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method.
* In this case, the produced {@link TransactionResult} will be empty.
*
Expand All @@ -60,9 +62,9 @@ public interface ReactiveRedisDataSource {
* Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}).
* Note that transaction acquires a single connection, and all the commands are enqueued in this connection.
* The commands are only executed when the passed block emits the {@code null} item.
*
* <p>
* The results of the commands are retrieved using the produced {@link TransactionResult}.
*
* <p>
* The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method.
* In this case, the produced {@link TransactionResult} will be empty.
*
Expand All @@ -73,6 +75,46 @@ public interface ReactiveRedisDataSource {
*/
Uni<TransactionResult> withTransaction(Function<ReactiveTransactionalRedisDataSource, Uni<Void>> tx, String... watchedKeys);

/**
* Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}).
* Note that transaction acquires a single connection, and all the commands are enqueued in this connection.
* The commands are only executed when the passed block emits the {@code null} item.
* <p>
* This variant also allows executing code before the transaction gets started but after the key being watched:
*
* <pre>
* WATCH key
* // preTxBlock
* element = ZRANGE k 0 0
* // TxBlock
* MULTI
* ZREM k element
* EXEC
* </pre>
* <p>
* The {@code preTxBlock} returns a {@link Uni Uni&lt;I&gt;}. The produced value is received by the {@code tx} block,
* which can use that value to execute the appropriate operation in the transaction. The produced value can also be
* retrieved from the produced {@link OptimisticLockingTransactionResult}. Commands issued in the {@code preTxBlock }
* must used the passed (single-connection) {@link ReactiveRedisDataSource} instance.
* <p>
* If the {@code preTxBlock} throws an exception or emits a failure, the transaction is not executed, and the returned
* {@link OptimisticLockingTransactionResult} is empty.
* <p>
* This construct allows implementing operation relying on optimistic locking.
* The results of the commands are retrieved using the produced {@link OptimisticLockingTransactionResult}.
* <p>
* The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method.
* In this case, the produced {@link OptimisticLockingTransactionResult} will be empty.
*
* @param tx the consumer receiving the transactional redis data source. The enqueued commands are only executed
* at the end of the block.
* @param watchedKeys the keys to watch during the execution of the transaction. If one of these key is modified before
* the completion of the transaction, the transaction is discarded.
*/
<I> Uni<OptimisticLockingTransactionResult<I>> withTransaction(Function<ReactiveRedisDataSource, Uni<I>> preTxBlock,
BiFunction<I, ReactiveTransactionalRedisDataSource, Uni<Void>> tx,
String... watchedKeys);

/**
* Execute the command <a href="https://redis.io/commands/select">SELECT</a>.
* Summary: Change the selected database for the current connection
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.quarkus.redis.datasource;

import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

import io.quarkus.redis.datasource.bitmap.BitMapCommands;
import io.quarkus.redis.datasource.geo.GeoCommands;
Expand All @@ -12,6 +14,7 @@
import io.quarkus.redis.datasource.set.SetCommands;
import io.quarkus.redis.datasource.sortedset.SortedSetCommands;
import io.quarkus.redis.datasource.string.StringCommands;
import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult;
import io.quarkus.redis.datasource.transactions.TransactionResult;
import io.quarkus.redis.datasource.transactions.TransactionalRedisDataSource;
import io.vertx.mutiny.redis.client.Command;
Expand Down Expand Up @@ -68,6 +71,46 @@ public interface RedisDataSource {
*/
TransactionResult withTransaction(Consumer<TransactionalRedisDataSource> tx, String... watchedKeys);

/**
* Retrieves a {@link RedisDataSource} enqueuing commands in a Redis Transaction ({@code MULTI}).
* Note that transaction acquires a single connection, and all the commands are enqueued in this connection.
* The commands are only executed when the passed block emits the {@code null} item.
* <p>
* This variant also allows executing code before the transaction gets started but after the key being watched:
*
* <pre>
* WATCH key
* // preTxBlock
* element = ZRANGE k 0 0
* // TxBlock
* MULTI
* ZREM k element
* EXEC
* </pre>
* <p>
* The {@code preTxBlock} returns a {@link I}. The produced value is received by the {@code tx} block,
* which can use that value to execute the appropriate operation in the transaction. The produced value can also be
* retrieved from the produced {@link OptimisticLockingTransactionResult}. Commands issued in the {@code preTxBlock }
* must used the passed (single-connection) {@link RedisDataSource} instance.
* <p>
* If the {@code preTxBlock} throws an exception, the transaction is not executed, and the returned
* {@link OptimisticLockingTransactionResult} is empty.
* <p>
* This construct allows implementing operation relying on optimistic locking.
* The results of the commands are retrieved using the produced {@link OptimisticLockingTransactionResult}.
* <p>
* The user can discard a transaction using the {@link TransactionalRedisDataSource#discard()} method.
* In this case, the produced {@link OptimisticLockingTransactionResult} will be empty.
*
* @param tx the consumer receiving the transactional redis data source. The enqueued commands are only executed
* at the end of the block.
* @param watchedKeys the keys to watch during the execution of the transaction. If one of these key is modified before
* the completion of the transaction, the transaction is discarded.
*/
<I> OptimisticLockingTransactionResult<I> withTransaction(Function<RedisDataSource, I> preTxBlock,
BiConsumer<I, TransactionalRedisDataSource> tx,
String... watchedKeys);

/**
* Execute the command <a href="https://redis.io/commands/select">SELECT</a>.
* Summary: Change the selected database for the current connection
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.redis.datasource.transactions;

/**
* A structure holding the result of the commands executed in a transaction. Note that the result are ordered, and the
* (0-based) index of the command must be used to retrieve the result of a specific command.
*
* In addition, it provides the rerult from the pre-transaction block.
*/
public interface OptimisticLockingTransactionResult<I> extends TransactionResult {

/**
* Retrieves the result from the pre-transaction block
*
* @return the value produces by the pre-transaction block
*/
I getPreTransactionResult();

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import static io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl.toTransactionResult;

import java.time.Duration;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.RedisDataSource;
Expand All @@ -17,6 +19,7 @@
import io.quarkus.redis.datasource.set.SetCommands;
import io.quarkus.redis.datasource.sortedset.SortedSetCommands;
import io.quarkus.redis.datasource.string.StringCommands;
import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult;
import io.quarkus.redis.datasource.transactions.TransactionResult;
import io.quarkus.redis.datasource.transactions.TransactionalRedisDataSource;
import io.vertx.mutiny.redis.client.Command;
Expand Down Expand Up @@ -62,7 +65,6 @@ public TransactionResult withTransaction(Consumer<TransactionalRedisDataSource>
} else {
return toTransactionResult(null, th);
}

} finally {
connection.closeAndAwait();
}
Expand Down Expand Up @@ -98,6 +100,40 @@ public TransactionResult withTransaction(Consumer<TransactionalRedisDataSource>
}
}

@Override
public <I> OptimisticLockingTransactionResult<I> withTransaction(Function<RedisDataSource, I> preTxBlock,
BiConsumer<I, TransactionalRedisDataSource> tx, String... watchedKeys) {
RedisConnection connection = reactive.redis.connect().await().atMost(timeout);
ReactiveRedisDataSourceImpl dataSource = new ReactiveRedisDataSourceImpl(reactive.redis, connection);
TransactionHolder th = new TransactionHolder();
BlockingTransactionalRedisDataSourceImpl source = new BlockingTransactionalRedisDataSourceImpl(
new ReactiveTransactionalRedisDataSourceImpl(dataSource, th), timeout);

try {
Request cmd = Request.cmd(Command.WATCH);
for (String watchedKey : watchedKeys) {
cmd.arg(watchedKey);
}
connection.send(cmd).await().atMost(timeout);

I input = preTxBlock.apply(new BlockingRedisDataSourceImpl(reactive.redis, connection, timeout));

connection.send(Request.cmd(Command.MULTI)).await().atMost(timeout);

tx.accept(input, source);
if (!source.discarded()) {
Response response = connection.send(Request.cmd(Command.EXEC)).await().atMost(timeout);
// exec produce null is the transaction has been discarded
return toTransactionResult(response, input, th);
} else {
return toTransactionResult(null, input, th);
}

} finally {
connection.closeAndAwait();
}
}

@Override
public void withConnection(Consumer<RedisDataSource> consumer) {
if (connection != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.quarkus.redis.runtime.datasource;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult;

public class OptimisticLockingTransactionResultImpl<I> implements OptimisticLockingTransactionResult<I> {

private final List<Object> results = new ArrayList<>();
private final boolean discarded;
private final I input;

public OptimisticLockingTransactionResultImpl(boolean discarded, I input, List<Object> res) {
this.results.addAll(res);
this.discarded = discarded;
this.input = input;
}

public static <I> OptimisticLockingTransactionResult<I> discarded(I input) {
return new OptimisticLockingTransactionResultImpl<>(true, input, Collections.emptyList());
}

@Override
public boolean discarded() {
return discarded;
}

@Override
public int size() {
return results.size();
}

@Override
public boolean isEmpty() {
return results.isEmpty();
}

@SuppressWarnings("unchecked")
@Override
public <T> T get(int index) {
return (T) results.get(index);
}

@Override
public Iterator<Object> iterator() {
return results.iterator();
}

@Override
public I getPreTransactionResult() {
return input;
}
}
Loading