Skip to content

Commit

Permalink
Ensure isolation level is applied to subsequent transactions after se…
Browse files Browse the repository at this point in the history
…tting (#202)

Motivation:

Aligning Connection#setTransactionIsolationLevel Behavior with r2dbc-spi
Specification. See #192
for more.

Modification:

Change session isolation level as well after invoking
`Connection#setTransactionIsolationLevel`.

Result:

All subsequent transactions will be applied the isolation level set by
`Connection#setTransactionIsolationLevel`.
  • Loading branch information
JohnNiang authored Jan 19, 2024
1 parent b3a0fc9 commit 0b3814c
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 4 deletions.
22 changes: 18 additions & 4 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS

private final MySqlConnectionMetadata metadata;

private final IsolationLevel sessionLevel;
private volatile IsolationLevel sessionLevel;

private final QueryCache queryCache;

Expand Down Expand Up @@ -298,13 +298,27 @@ public IsolationLevel getTransactionIsolationLevel() {
return currentLevel;
}

/**
* Gets session transaction isolation level(Only for testing).
*
* @return session transaction isolation level.
*/
IsolationLevel getSessionTransactionIsolationLevel() {
return sessionLevel;
}

@Override
public Mono<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
requireNonNull(isolationLevel, "isolationLevel must not be null");

// Set next transaction isolation level.
return QueryFlow.executeVoid(client, "SET TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql())
.doOnSuccess(ignored -> setIsolationLevel(isolationLevel));
// Set subsequent transaction isolation level.
return QueryFlow.executeVoid(client, "SET SESSION TRANSACTION ISOLATION LEVEL " + isolationLevel.asSql())
.doOnSuccess(ignored -> {
this.sessionLevel = isolationLevel;
if (!this.isInTransaction()) {
this.currentLevel = isolationLevel;
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.asyncer.r2dbc.mysql.ConnectionContext;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.Objects;
import reactor.core.publisher.Mono;

import java.nio.charset.Charset;
Expand Down Expand Up @@ -67,6 +68,23 @@ public Mono<ByteBuf> encode(ByteBufAllocator allocator, ConnectionContext contex
});
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TextQueryMessage that = (TextQueryMessage) o;
return Objects.equals(sql, that.sql);
}

@Override
public int hashCode() {
return Objects.hash(sql);
}

@Override
public String toString() {
return "TextQueryMessage{sql=REDACTED}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,51 @@ void transactionDefinitionIsolationLevel() {
}));
}

@Test
void setTransactionLevelNotInTransaction() {
complete(connection ->
// check initial session isolation level
Mono.fromSupplier(connection::getTransactionIsolationLevel)
.doOnSuccess(it -> assertThat(it).isEqualTo(REPEATABLE_READ))
.then(connection.beginTransaction())
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue())
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isEqualTo(REPEATABLE_READ))
.then(connection.rollbackTransaction())
.then(connection.setTransactionIsolationLevel(READ_COMMITTED))
// ensure that session isolation level is changed
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isEqualTo(READ_COMMITTED))
.then(connection.beginTransaction())
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue())
// ensure transaction isolation level applies to subsequent transactions
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isEqualTo(READ_COMMITTED))
);
}

@Test
void setTransactionLevelInTransaction() {
complete(connection ->
// check initial session transaction isolation level
Mono.fromSupplier(connection::getTransactionIsolationLevel)
.doOnSuccess(it -> assertThat(it).isEqualTo(REPEATABLE_READ))
.then(connection.beginTransaction())
.then(connection.setTransactionIsolationLevel(READ_COMMITTED))
// ensure that current transaction isolation level is not changed
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isNotEqualTo(READ_COMMITTED))
.then(connection.rollbackTransaction())
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse())
// ensure that session isolation level is changed after rollback
.then(Mono.fromSupplier(connection::getTransactionIsolationLevel))
.doOnSuccess(it -> assertThat(it).isEqualTo(READ_COMMITTED))
// ensure transaction isolation level applies to subsequent transactions
.then(connection.beginTransaction())
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue())
);
}

@Test
void transactionDefinition() {
// The WITH CONSISTENT SNAPSHOT phrase can only be used with the REPEATABLE READ isolation level.
Expand Down
19 changes: 19 additions & 0 deletions src/test/java/io/asyncer/r2dbc/mysql/MySqlConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@
import io.asyncer.r2dbc.mysql.cache.Caches;
import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.codec.Codecs;
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
import io.asyncer.r2dbc.mysql.message.client.TextQueryMessage;
import io.r2dbc.spi.IsolationLevel;
import org.assertj.core.api.ThrowableTypeAssert;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Unit tests for {@link MySqlConnection}.
Expand Down Expand Up @@ -125,6 +132,18 @@ void badSetTransactionIsolationLevel() {
assertThatIllegalArgumentException().isThrownBy(() -> noPrepare.setTransactionIsolationLevel(null));
}

@Test
void shouldSetTransactionIsolationLevelSuccessfully() {
ClientMessage message = new TextQueryMessage("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE");
when(client.exchange(eq(message), any())).thenReturn(Flux.empty());

noPrepare.setTransactionIsolationLevel(IsolationLevel.SERIALIZABLE)
.as(StepVerifier::create)
.verifyComplete();

assertThat(noPrepare.getSessionTransactionIsolationLevel()).isEqualTo(IsolationLevel.SERIALIZABLE);
}

@SuppressWarnings("ConstantConditions")
@Test
void badValidate() {
Expand Down

0 comments on commit 0b3814c

Please sign in to comment.