Skip to content

Commit

Permalink
FINERACT-1909: Retry feature for commands - deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-jankovics authored and adamsaghy committed Nov 20, 2023
1 parent 3acb2b3 commit ee79c58
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected JournalEntry() {
//
}

public JournalEntry(final Office office, final PaymentDetail paymentDetail, final GLAccount glAccount, final String currencyCode,
protected JournalEntry(final Office office, final PaymentDetail paymentDetail, final GLAccount glAccount, final String currencyCode,
final String transactionId, final boolean manualEntry, final LocalDate transactionDate, final Integer type,
final BigDecimal amount, final String description, final Integer entityType, final Long entityId, final String referenceNumber,
final Long loanTransactionId, final Long savingsTransactionId, final Long clientTransactionId, final Long shareTransactionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ private List<BatchResponse> callInTransaction(Consumer<TransactionTemplate> tran
} catch (RuntimeException ex) {
status.setRollbackOnly();
return buildErrorResponses(ex, responseList);
} finally {
BatchRequestContextHolder.setEnclosingTransaction(Optional.empty());
}
});
} catch (TransactionException | NonTransientDataAccessException ex) {
Expand Down Expand Up @@ -329,7 +331,7 @@ private BatchResponse buildErrorResponse(Throwable ex, BatchRequest request) {
String body = null;
Set<Header> headers = new HashSet<>();
if (ex != null) {
ErrorInfo errorInfo = errorHandler.handle(errorHandler.getMappable(ex));
ErrorInfo errorInfo = errorHandler.handle(ErrorHandler.getMappable(ex));
statusCode = errorInfo.getStatusCode();
body = errorInfo.getMessage();
headers = Optional.ofNullable(errorInfo.getHeaders()).orElse(new HashSet<>());
Expand Down Expand Up @@ -358,7 +360,7 @@ private List<BatchResponse> buildErrorResponses(Throwable ex, @NotNull List<Batc
String body = null;
Set<Header> headers = new HashSet<>();
if (ex != null) {
ErrorInfo errorInfo = errorHandler.handle(errorHandler.getMappable(ex));
ErrorInfo errorInfo = errorHandler.handle(ErrorHandler.getMappable(ex));
statusCode = errorInfo.getStatusCode();
body = errorInfo.getMessage();
headers = Optional.ofNullable(errorInfo.getHeaders()).orElse(new HashSet<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ public CommandSource saveInitialSameTransaction(CommandWrapper wrapper, JsonComm
@NotNull
private CommandSource saveInitial(CommandWrapper wrapper, JsonCommand jsonCommand, AppUser maker, String idempotencyKey) {
CommandSource initialCommandSource = getInitialCommandSource(wrapper, jsonCommand, maker, idempotencyKey);
if (initialCommandSource.getCommandJson() == null) {
initialCommandSource.setCommandJson("{}");
}
return commandSourceRepository.saveAndFlush(initialCommandSource);
}

Expand All @@ -84,19 +81,22 @@ private CommandSource saveResult(@NotNull CommandSource commandSource) {
}

public ErrorInfo generateErrorInfo(Throwable t) {
return errorHandler.handle(errorHandler.getMappable(t));
return errorHandler.handle(ErrorHandler.getMappable(t));
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public CommandSource getCommandSource(Long commandSourceId) {
return commandSourceRepository.findById(commandSourceId).orElseThrow(() -> new CommandNotFoundException(commandSourceId));
}

@Transactional(propagation = Propagation.REQUIRED)
public CommandSource findCommandSource(CommandWrapper wrapper, String idempotencyKey) {
return commandSourceRepository.findByActionNameAndEntityNameAndIdempotencyKey(wrapper.actionName(), wrapper.entityName(),
idempotencyKey);
}

private CommandSource getInitialCommandSource(CommandWrapper wrapper, JsonCommand jsonCommand, AppUser maker, String idempotencyKey) {
@Transactional(propagation = Propagation.REQUIRED)
public CommandSource getInitialCommandSource(CommandWrapper wrapper, JsonCommand jsonCommand, AppUser maker, String idempotencyKey) {
CommandSource commandSourceResult;
if (jsonCommand.commandId() != null) {
commandSourceResult = commandSourceRepository.findById(jsonCommand.commandId())
Expand All @@ -105,6 +105,9 @@ private CommandSource getInitialCommandSource(CommandWrapper wrapper, JsonComman
} else {
commandSourceResult = CommandSource.fullEntryFrom(wrapper, jsonCommand, maker, idempotencyKey, UNDER_PROCESSING.getValue());
}
if (commandSourceResult.getCommandJson() == null) {
commandSourceResult.setCommandJson("{}");
}
return commandSourceResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,29 +106,34 @@ public CommandProcessingResult executeCommand(final CommandWrapper wrapper, fina
boolean sameTransaction = BatchRequestContextHolder.getEnclosingTransaction().isPresent();
if (commandSource == null) {
AppUser user = context.authenticatedUser(wrapper);
commandSource = sameTransaction ? commandSourceService.saveInitialSameTransaction(wrapper, command, user, idempotencyKey)
: commandSourceService.saveInitialNewTransaction(wrapper, command, user, idempotencyKey);
storeCommandIdInContext(commandSource); // Store command id as a request attribute
if (sameTransaction) {
commandSource = commandSourceService.getInitialCommandSource(wrapper, command, user, idempotencyKey);
} else {
commandSource = commandSourceService.saveInitialNewTransaction(wrapper, command, user, idempotencyKey);
storeCommandIdInContext(commandSource); // Store command id as a request attribute
}
}
setIdempotencyKeyStoreFlag(true);

final CommandProcessingResult result;
try {
result = findCommandHandler(wrapper).processCommand(command);
} catch (Throwable t) { // NOSONAR
ErrorInfo errorInfo = commandSourceService.generateErrorInfo(t);
RuntimeException mappable = ErrorHandler.getMappable(t);
ErrorInfo errorInfo = commandSourceService.generateErrorInfo(mappable);
commandSource.setResultStatusCode(errorInfo.getStatusCode());
commandSource.setResult(errorInfo.getMessage());
commandSource.setStatus(ERROR);
commandSource = sameTransaction ? commandSourceService.saveResultSameTransaction(commandSource)
: commandSourceService.saveResultNewTransaction(commandSource);
publishHookErrorEvent(wrapper, command, errorInfo);
throw t;
if (!sameTransaction) { // TODO: temporary solution
commandSource = commandSourceService.saveResultNewTransaction(commandSource);
}
publishHookErrorEvent(wrapper, command, errorInfo); // TODO must be performed in a new transaction
throw mappable;
}

commandSource.updateForAudit(result);
commandSource.setResult(toApiJsonSerializer.serializeResult(result));
commandSource.setResultStatusCode(SC_OK);
commandSource.setResult(toApiJsonSerializer.serializeResult(result));
commandSource.setStatus(PROCESSED);

boolean isRollback = !isApprovedByChecker && (result.isRollbackTransaction()
Expand All @@ -139,6 +144,9 @@ public CommandProcessingResult executeCommand(final CommandWrapper wrapper, fina
}

commandSource = commandSourceService.saveResultSameTransaction(commandSource);
if (sameTransaction) {
storeCommandIdInContext(commandSource); // Store command id as a request attribute
}

if (isRollback) {
/*
Expand All @@ -147,14 +155,14 @@ public CommandProcessingResult executeCommand(final CommandWrapper wrapper, fina
* when checker approves the transaction
*/
commandSource.setTransactionId(command.getTransactionId());
// TODO: this should be removed together with lines 133-135
// TODO: this should be removed together with lines 147-149
commandSource.setCommandJson(command.json()); // Set back CommandSource json data
throw new RollbackTransactionAsCommandIsNotApprovedByCheckerException(commandSource);
}

result.setRollbackTransaction(null);
publishHookEvent(wrapper.entityName(), wrapper.actionName(), command, result);

publishHookEvent(wrapper.entityName(), wrapper.actionName(), command, result); // TODO must be performed in a
// new transaction
return result;
}

Expand Down Expand Up @@ -211,7 +219,7 @@ private CommandProcessingResult fallbackExecuteCommand(Exception e) {
if (e instanceof RollbackTransactionAsCommandIsNotApprovedByCheckerException ex) {
return logCommand(ex.getCommandSourceResult());
}
throw errorHandler.getMappable(e);
throw ErrorHandler.getMappable(e);
}

private NewCommandSourceHandler findCommandHandler(final CommandWrapper wrapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public static ApiGlobalErrorResponse locked(String type, String identifier) {
if (identifier != null) {
details += " [" + identifier + ']';
}
String msg = "The server is currently unable to handle the request due to concurrent modification" + details + ", please try again";
String msg = "The server is currently unable to handle the request due to concurrent modification " + details
+ ", please try again";
return create(SC_LOCKED, "error.msg.platform.service." + type + ".conflict", msg, msg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import jakarta.ws.rs.core.MultivaluedMap;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.ExceptionMapper;
import java.sql.SQLException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -35,6 +37,7 @@
import lombok.extern.slf4j.Slf4j;
import net.fortuna.ical4j.validate.ValidationException;
import org.apache.commons.collections4.SetUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.fineract.batch.domain.Header;
import org.apache.fineract.batch.exception.ErrorInfo;
import org.apache.fineract.infrastructure.core.data.ApiParameterError;
Expand All @@ -45,6 +48,8 @@
import org.springframework.context.ApplicationContext;
import org.springframework.core.NestedRuntimeException;
import org.springframework.dao.NonTransientDataAccessException;
import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;

/**
Expand All @@ -60,14 +65,55 @@
@AllArgsConstructor
public final class ErrorHandler {

private static final Gson JSON_HELPER = GoogleGsonSerializerHelper.createGsonBuilder(true).create();

private enum PessimisticLockingFailureCode {

ROLLBACK("40"), // Transaction rollback
DEADLOCK("60"), // Oracle: deadlock
HY00("HY", "Lock wait timeout exceeded"), // MySql deadlock HY00
;

private final String code;
private final String msg;

PessimisticLockingFailureCode(String code, String msg) {
this.code = code;
this.msg = msg;
}

PessimisticLockingFailureCode(String code) {
this(code, null);
}

private static Throwable match(Throwable t) {
Throwable rootCause = ExceptionUtils.getRootCause(t);
return rootCause instanceof SQLException sqle && Arrays.stream(values()).anyMatch(e -> e.matches(sqle)) ? rootCause : null;
}

private boolean matches(SQLException ex) {
return code.equals(getSqlClassCode(ex)) && (msg == null || ex.getMessage().contains(msg));
}

@Nullable
private static String getSqlClassCode(SQLException ex) {
String sqlState = ex.getSQLState();
if (sqlState == null) {
SQLException nestedEx = ex.getNextException();
if (nestedEx != null) {
sqlState = nestedEx.getSQLState();
}
}
return sqlState != null && sqlState.length() > 2 ? sqlState.substring(0, 2) : sqlState;
}
}

@Autowired
private final ApplicationContext ctx;

@Autowired
private final DefaultExceptionMapper defaultExceptionMapper;

private static final Gson JSON_HELPER = GoogleGsonSerializerHelper.createGsonBuilder(true).create();

@NotNull
public <T extends RuntimeException> ExceptionMapper<T> findMostSpecificExceptionHandler(T exception) {
Class<?> clazz = exception.getClass();
Expand Down Expand Up @@ -120,8 +166,13 @@ public static RuntimeException getMappable(@NotNull Throwable t, String msgCode,
String msg = defaultMsg == null ? t.getMessage() : defaultMsg;
String codePfx = "error.msg" + (param == null ? "" : ("." + param));
Object[] args = defaultMsgArgs == null ? new Object[] { t } : defaultMsgArgs;

Throwable cause;
if ((cause = PessimisticLockingFailureCode.match(t)) != null) {
return new PessimisticLockingFailureException(msg, cause); // deadlock
}
if (t instanceof NestedRuntimeException nre) {
Throwable cause = nre.getMostSpecificCause();
cause = nre.getMostSpecificCause();
msg = defaultMsg == null ? cause.getMessage() : defaultMsg;
if (nre instanceof NonTransientDataAccessException) {
msgCode = msgCode == null ? codePfx + ".data.integrity.issue" : msgCode;
Expand Down Expand Up @@ -150,7 +201,7 @@ public static RuntimeException getMappable(@NotNull Throwable t, String msgCode,
return new RuntimeException(msg, t);
}

private <T> Set<T> createSet(T[] array) {
private static <T> Set<T> createSet(T[] array) {
if (array == null) {
return Set.of();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@ public class ConcurrencyFailureExceptionMapper implements FineractExceptionMappe
@Override
public Response toResponse(final ConcurrencyFailureException exception) {
log.warn("Exception: {}, Message: {}", exception.getClass().getName(), exception.getMessage());
String type = "unknown";
String identifier = "unknown";
String type;
String identifier;
if (exception instanceof ObjectOptimisticLockingFailureException olex) {
type = olex.getPersistentClassName();
identifier = olex.getIdentifier() == null ? null : String.valueOf(olex.getIdentifier());
} else {
type = "lock";
identifier = null;
}
final ApiGlobalErrorResponse dataIntegrityError = ApiGlobalErrorResponse.locked(type, identifier);
return Response.status(SC_LOCKED).entity(dataIntegrityError).type(MediaType.APPLICATION_JSON).build();
Expand Down
6 changes: 3 additions & 3 deletions fineract-provider/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ resilience4j.retry.instances.executeCommand.max-attempts=${FINERACT_COMMAND_PROC
resilience4j.retry.instances.executeCommand.wait-duration=${FINERACT_COMMAND_PROCESSING_RETRY_WAIT_DURATION:1s}
resilience4j.retry.instances.executeCommand.enable-exponential-backoff=${FINERACT_COMMAND_PROCESSING_RETRY_ENABLE_EXPONENTIAL_BACKOFF:true}
resilience4j.retry.instances.executeCommand.exponential-backoff-multiplier=${FINERACT_COMMAND_PROCESSING_RETRY_EXPONENTIAL_BACKOFF_MULTIPLIER:2}
resilience4j.retry.instances.executeCommand.retryExceptions=${FINERACT_COMMAND_PROCESSING_RETRY_EXCEPTIONS:org.springframework.dao.CannotAcquireLockException,org.springframework.orm.ObjectOptimisticLockingFailureException,org.eclipse.persistence.exceptions.OptimisticLockException,org.apache.fineract.infrastructure.core.exception.IdempotentCommandProcessUnderProcessingException}
resilience4j.retry.instances.executeCommand.retryExceptions=${FINERACT_COMMAND_PROCESSING_RETRY_EXCEPTIONS:org.springframework.dao.ConcurrencyFailureException,org.eclipse.persistence.exceptions.OptimisticLockException,org.apache.fineract.infrastructure.core.exception.IdempotentCommandProcessUnderProcessingException}

resilience4j.retry.instances.processJobDetailForExecution.max-attempts=${FINERACT_PROCESS_JOB_DETAIL_RETRY_MAX_ATTEMPTS:3}
resilience4j.retry.instances.processJobDetailForExecution.wait-duration=${FINERACT_PROCESS_JOB_DETAIL_RETRY_WAIT_DURATION:1s}
Expand All @@ -331,10 +331,10 @@ resilience4j.retry.instances.recalculateInterest.max-attempts=${FINERACT_PROCESS
resilience4j.retry.instances.recalculateInterest.wait-duration=${FINERACT_PROCESS_RECALCULATE_INTEREST_RETRY_WAIT_DURATION:1s}
resilience4j.retry.instances.recalculateInterest.enable-exponential-backoff=${FINERACT_PROCESS_RECALCULATE_INTEREST_RETRY_ENABLE_EXPONENTIAL_BACKOFF:true}
resilience4j.retry.instances.recalculateInterest.exponential-backoff-multiplier=${FINERACT_PROCESS_RECALCULATE_INTEREST_RETRY_EXPONENTIAL_BACKOFF_MULTIPLIER:2}
resilience4j.retry.instances.recalculateInterest.retryExceptions=${FINERACT_PROCESS_RECALCULATE_INTEREST_RETRY_EXCEPTIONS:org.springframework.dao.CannotAcquireLockException,org.springframework.orm.ObjectOptimisticLockingFailureException,org.eclipse.persistence.exceptions.OptimisticLockException}
resilience4j.retry.instances.recalculateInterest.retryExceptions=${FINERACT_PROCESS_RECALCULATE_INTEREST_RETRY_EXCEPTIONS:org.springframework.dao.ConcurrencyFailureException,org.eclipse.persistence.exceptions.OptimisticLockException}

resilience4j.retry.instances.postInterest.max-attempts=${FINERACT_PROCESS_POST_INTEREST_RETRY_MAX_ATTEMPTS:3}
resilience4j.retry.instances.postInterest.wait-duration=${FINERACT_PROCESS_POST_INTEREST_RETRY_WAIT_DURATION:1s}
resilience4j.retry.instances.postInterest.enable-exponential-backoff=${FINERACT_PROCESS_POST_INTEREST_RETRY_ENABLE_EXPONENTIAL_BACKOFF:true}
resilience4j.retry.instances.postInterest.exponential-backoff-multiplier=${FINERACT_PROCESS_POST_INTEREST_RETRY_EXPONENTIAL_BACKOFF_MULTIPLIER:2}
resilience4j.retry.instances.postInterest.retryExceptions=${FINERACT_PROCESS_POST_INTEREST_RETRY_EXCEPTIONS:org.springframework.dao.CannotAcquireLockException,org.springframework.orm.ObjectOptimisticLockingFailureException,org.eclipse.persistence.exceptions.OptimisticLockException}
resilience4j.retry.instances.postInterest.retryExceptions=${FINERACT_PROCESS_POST_INTEREST_RETRY_EXCEPTIONS:org.springframework.dao.ConcurrencyFailureException,org.eclipse.persistence.exceptions.OptimisticLockException}
Loading

0 comments on commit ee79c58

Please sign in to comment.