Skip to content

Commit

Permalink
Await ReadyForQuery before emitting errors from transactional contr…
Browse files Browse the repository at this point in the history
…ol methods.

commitTransaction, rollbackTransaction and other methods now await completion of the exchange before emitting error signals to properly synchronize completion.

Previously, error signals were emitted before updating the transaction state which could lead to invalid cleanup states if e.g. the commit failed.

[resolves #541]

Signed-off-by: Mark Paluch <[email protected]>
  • Loading branch information
mp911de committed Aug 25, 2022
1 parent 67806b6 commit 3bf6a42
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/r2dbc/postgresql/ExceptionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static ExceptionFactory withSql(String sql) {
* @return the {@link R2dbcException}.
* @see ErrorResponse
*/
private static R2dbcException createException(ErrorResponse response, String sql) {
static R2dbcException createException(ErrorResponse response, String sql) {
return createException(new ErrorDetails(response.getFields()), sql);
}

Expand Down
30 changes: 25 additions & 5 deletions src/main/java/io/r2dbc/postgresql/PostgresqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
import io.r2dbc.postgresql.codec.Codecs;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.NotificationResponse;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -184,6 +186,8 @@ public Mono<Void> cancelRequest() {

@Override
public Mono<Void> commitTransaction() {

AtomicReference<R2dbcException> ref = new AtomicReference<>();
return useTransactionStatus(transactionStatus -> {
if (IDLE != transactionStatus) {
return Flux.from(exchange("COMMIT"))
Expand All @@ -198,13 +202,17 @@ public Mono<Void> commitTransaction() {
// See discussion in pgsql-hackers: https://www.postgresql.org/message-id/b9fb50dc-0f6e-15fb-6555-8ddb86f4aa71%40postgresfriends.org

if ("ROLLBACK".equalsIgnoreCase(message.getCommand())) {
sink.error(new ExceptionFactory.PostgresqlRollbackException(ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction" +
" " +
"failure is not known (check server logs?)"), "COMMIT"));
ErrorDetails details = ErrorDetails.fromMessage("The database returned ROLLBACK, so the transaction cannot be committed. Transaction " +
"failure is not known (check server logs?)");
ref.set(new ExceptionFactory.PostgresqlRollbackException(details, "COMMIT"));
return;
}

sink.next(message);
}).doOnComplete(() -> {
if (ref.get() != null) {
throw ref.get();
}
});
} else {
this.logger.debug(this.connectionContext.getMessage("Skipping commit transaction because status is {}"), transactionStatus);
Expand Down Expand Up @@ -442,9 +450,21 @@ private <T> Mono<T> withTransactionStatus(Function<TransactionStatus, T> f) {

@SuppressWarnings("unchecked")
private <T> Flux<T> exchange(String sql) {
ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
AtomicReference<R2dbcException> ref = new AtomicReference<>();
return (Flux<T>) SimpleQueryMessageFlow.exchange(this.client, sql)
.handle(exceptionFactory::handleErrorResponse);
.handle((backendMessage, synchronousSink) -> {

if (backendMessage instanceof ErrorResponse) {
ref.set(ExceptionFactory.createException((ErrorResponse) backendMessage, sql));
} else {
synchronousSink.next(backendMessage);
}
})
.doOnComplete(() -> {
if (ref.get() != null) {
throw ref.get();
}
});
}

private void cleanupIsolationLevel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

import io.r2dbc.postgresql.api.PostgresqlConnection;
import io.r2dbc.postgresql.api.PostgresqlResult;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.TransactionStatus;
import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.R2dbcRollbackException;
import org.awaitility.Awaitility;
import io.r2dbc.spi.R2dbcException;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

import java.lang.reflect.Field;

import static org.assertj.core.api.Assertions.assertThat;

/**
Expand All @@ -37,12 +40,26 @@ void commitShouldRecoverFromFailedTransaction() {
this.connection.beginTransaction().as(StepVerifier::create).verifyComplete();
this.connection.createStatement("error").execute().flatMap(PostgresqlResult::getRowsUpdated).as(StepVerifier::create).verifyError(R2dbcBadGrammarException.class);

this.connection.commitTransaction().as(StepVerifier::create).verifyError(R2dbcRollbackException.class);
this.connection.commitTransaction().as(StepVerifier::create).verifyErrorSatisfies(throwable -> {
assertThat(throwable).isInstanceOf(R2dbcException.class);

Client client = extractClient();
assertThat(client.getTransactionStatus()).isEqualTo(TransactionStatus.IDLE);
});

Awaitility.await().until(() -> this.connection.isAutoCommit());
assertThat(this.connection.isAutoCommit()).isTrue();
}

private Client extractClient() {
try {
Field field = io.r2dbc.postgresql.PostgresqlConnection.class.getDeclaredField("client");
field.setAccessible(true);
return (Client) field.get(this.connection);
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

@Test
void rollbackShouldRecoverFromFailedTransaction() {

Expand Down

0 comments on commit 3bf6a42

Please sign in to comment.