Skip to content

Commit

Permalink
JBTM-3964 test for parallel recovery scans
Browse files Browse the repository at this point in the history
  • Loading branch information
mmusgrov committed Dec 12, 2024
1 parent a725ff2 commit c5b5dfb
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public List<LRAData> getAllRecovering() {
}

public void addTransaction(LongRunningAction lra) {
lras.put(lra.getId(), lra);
lras.putIfAbsent(lra.getId(), lra);
}

public void finished(LongRunningAction transaction, boolean fromHierarchy) {
Expand Down Expand Up @@ -211,6 +211,11 @@ public void recover() {
getRM().recover();
}

// perform a recovery scan to load any recovering LRAs from the store
public void scan() {
getRM().periodicWorkSecondPass(); // periodicWorkFirstPass is a no-op
}

public boolean updateRecoveryURI(URI lraId, String compensatorUrl, String recoveryURI, boolean persist) {
assert recoveryURI != null;
assert compensatorUrl != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private synchronized void recoverTransactions() {
}

private void doRecoverTransaction(Uid recoverUid) {
// Retrieve the transaction status from its original process.
// Retrieve the transaction status from its original process // TODO remove because it is not needed
int theStatus = _transactionStatusConnectionMgr.getTransactionStatus(_transactionType, recoverUid);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.narayana.lra.Current;
import io.narayana.lra.LRAConstants;
import jakarta.ws.rs.ServiceUnavailableException;
import org.eclipse.microprofile.lra.annotation.LRAStatus;
Expand Down Expand Up @@ -163,6 +164,57 @@ public void after() {
server.stop();
}

// test that it is safe to run periodic recovery scans on the LRARecoveryModule in parallel
@Test
public void testParallelScan() {
final int LRA_COUNT = 3; // number of LRAs to add to the store
final int SCAN_COUNT = 100; // number of parallel periodic recovery scans

URI[] ids = new URI[LRA_COUNT];
URI[] ids2 = new URI[SCAN_COUNT];

try {
// start LRA_COUNT LRAs so that there is something already in the store when the scans run
IntStream.range(0, LRA_COUNT)
.forEach(i -> {
try {
ids[i] = lraClient.startLRA(testName.getMethodName() + i);
Current.pop(); // disassociate the LRA from the current thread
assertNotNull(ids[i]);
} catch (WebApplicationException e) {
fail(String.format("%s: step %d failed with HTTP status %d (%s)",
testName.getMethodName(), i, e.getResponse().getStatus(), e.getMessage()));
}
});

// run SCAN_COUNT recovery passes and LRAs in parallel to verify parallelism support
IntStream.range(0, SCAN_COUNT)
.parallel()
.forEach(i -> {
try {
// start an LRA while a scan is in progress to test parallelism
ids2[i] = lraClient.startLRA(testName.getMethodName() + i);
service.scan();
} finally {
lraClient.cancelLRA(ids2[i]);
}
});
} finally {
// it's clearer and faster to use a standard for loop instead of the more elegant Java Streams API
// Stream.of(ids).filter(Objects::nonNull).forEach(lra -> lraClient.cancelLRA(lra))
for (int i = 0; i < LRA_COUNT; i++) {
if (ids[i] != null) {
try {
lraClient.cancelLRA(ids[i]);
} catch (WebApplicationException e) {
System.out.printf("%s: cancel %s failed with %s%n",
testName.getMethodName(), ids[i], e.getMessage());
}
}
}
}
}

@Test
public void joinWithVersionTest() {
URI lraId = lraClient.startLRA(testName.getMethodName());
Expand Down

0 comments on commit c5b5dfb

Please sign in to comment.