Skip to content

Commit

Permalink
JBTM-3087 Ensure the LRA implementation is up to date (including new …
Browse files Browse the repository at this point in the history
…context propagation semantics)
  • Loading branch information
mmusgrov committed Apr 18, 2019
1 parent 626655c commit 364d90e
Show file tree
Hide file tree
Showing 20 changed files with 290 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
package io.narayana.lra.client;

import org.eclipse.microprofile.lra.client.GenericLRAException;
import io.narayana.lra.GenericLRAException;

import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
package io.narayana.lra.client;

import org.eclipse.microprofile.lra.client.IllegalLRAStateException;
import io.narayana.lra.IllegalLRAStateException;

import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
package io.narayana.lra.client;

import org.eclipse.microprofile.lra.client.InvalidLRAIdException;
import io.narayana.lra.InvalidLRAIdException;

import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import javax.ws.rs.core.Response;

import io.narayana.lra.Current;
import io.narayana.lra.GenericLRAException;
import io.narayana.lra.IllegalLRAStateException;
import org.eclipse.microprofile.lra.annotation.Compensate;

import io.narayana.lra.logging.LRALogger;
Expand All @@ -72,10 +74,8 @@
import org.eclipse.microprofile.lra.annotation.LRAStatus;
import org.eclipse.microprofile.lra.annotation.ws.rs.Leave;
import org.eclipse.microprofile.lra.annotation.Status;
import org.eclipse.microprofile.lra.client.GenericLRAException;
import org.eclipse.microprofile.lra.client.IllegalLRAStateException;

import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_HEADER;
import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_CONTEXT_HEADER;

/**
* A utility class for controlling the lifecycle of Long Running Actions (LRAs) but the prefered mechanism is to use
Expand Down Expand Up @@ -286,7 +286,7 @@ public URI startLRA(URI parentLRA, String clientID, Long timeout, ChronoUnit uni
}

// validate that there is an LRAInfo response header holding the LRAInfo id
Object lraObject = Current.getLast(response.getHeaders().get(LRA_HTTP_HEADER));
Object lraObject = Current.getLast(response.getHeaders().get(LRA_HTTP_CONTEXT_HEADER));

if (lraObject == null) {
LRALogger.i18NLogger.error_nullLraOnCreation(response);
Expand Down Expand Up @@ -358,7 +358,7 @@ public void leaveLRA(URI lraId, String body) throws GenericLRAException {

response = getTarget().path(String.format(leaveFormat, getLRAId(lraId.toString())))
.request()
.header(LRA_HTTP_HEADER, lraId)
.header(LRA_HTTP_CONTEXT_HEADER, lraId)
.put(Entity.entity(body, MediaType.TEXT_PLAIN));

if (Response.Status.OK.getStatusCode() != response.getStatus()) {
Expand Down Expand Up @@ -636,7 +636,7 @@ private URI enlistCompensator(URI uri, long timelimit, String linkHeader, String
.queryParam(TIMELIMIT_PARAM_NAME, timelimit)
.request()
.header(LINK_TEXT, linkHeader)
.header(LRA_HTTP_HEADER, lraId)
.header(LRA_HTTP_CONTEXT_HEADER, lraId)
.put(Entity.entity(compensatorData == null ? linkHeader : compensatorData, MediaType.TEXT_PLAIN));

if (response.getStatus() == Response.Status.PRECONDITION_FAILED.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
package io.narayana.lra.coordinator.api;

import io.narayana.lra.Current;
import io.narayana.lra.GenericLRAException;
import io.narayana.lra.IllegalLRAStateException;
import io.narayana.lra.InvalidLRAIdException;
import io.narayana.lra.coordinator.domain.model.LRAData;
import io.narayana.lra.coordinator.domain.model.LRAStatusHolder;
import io.narayana.lra.coordinator.domain.model.Transaction;
Expand Down Expand Up @@ -69,9 +72,6 @@
import java.util.Map;

import org.eclipse.microprofile.lra.annotation.LRAStatus;
import org.eclipse.microprofile.lra.client.GenericLRAException;
import org.eclipse.microprofile.lra.client.IllegalLRAStateException;
import org.eclipse.microprofile.lra.client.InvalidLRAIdException;

import static io.narayana.lra.LRAConstants.CLIENT_ID_PARAM_NAME;
import static io.narayana.lra.LRAConstants.COMPENSATE;
Expand All @@ -84,7 +84,7 @@
import static io.narayana.lra.LRAConstants.TIMELIMIT_PARAM_NAME;
import static java.util.stream.Collectors.toList;
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_HEADER;
import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_CONTEXT_HEADER;
import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_RECOVERY_HEADER;

@ApplicationScoped
Expand Down Expand Up @@ -238,7 +238,7 @@ public Response startLRA(
@QueryParam(TIMELIMIT_PARAM_NAME) @DefaultValue("0") Long timelimit,
@ApiParam(value = "The enclosing LRA if this new LRA is nested", required = false)
@QueryParam(PARENT_LRA_PARAM_NAME) @DefaultValue("") String parentLRA,
@HeaderParam(LRA_HTTP_HEADER) String parentId) throws WebApplicationException, InvalidLRAIdException {
@HeaderParam(LRA_HTTP_CONTEXT_HEADER) String parentId) throws WebApplicationException, InvalidLRAIdException {

URI parentLRAUrl = null;

Expand Down Expand Up @@ -280,7 +280,7 @@ public Response startLRA(

return Response.status(Response.Status.CREATED)
.entity(lraId)
.header(LRA_HTTP_HEADER, Current.getContexts())
.header(LRA_HTTP_CONTEXT_HEADER, Current.getContexts())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
package io.narayana.lra.coordinator.api;

import io.narayana.lra.Current;
import org.eclipse.microprofile.lra.client.GenericLRAException;
import io.narayana.lra.GenericLRAException;

import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
Expand All @@ -35,7 +35,7 @@
import java.net.URI;
import java.net.URISyntaxException;

import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_HEADER;
import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_CONTEXT_HEADER;

@Provider
public class CoordinatorContainerFilter implements ContainerRequestFilter, ContainerResponseFilter {
Expand All @@ -44,19 +44,19 @@ public void filter(ContainerRequestContext requestContext) throws IOException {
MultivaluedMap<String, String> headers = requestContext.getHeaders();
URI lraId = null;

if (headers.containsKey(LRA_HTTP_HEADER)) {
if (headers.containsKey(LRA_HTTP_CONTEXT_HEADER)) {
try {
lraId = new URI(Current.getLast(headers.get(LRA_HTTP_HEADER)));
lraId = new URI(Current.getLast(headers.get(LRA_HTTP_CONTEXT_HEADER)));
} catch (URISyntaxException e) {
String msg = String.format("header %s contains an invalid URL %s",
LRA_HTTP_HEADER, Current.getLast(headers.get(LRA_HTTP_HEADER)));
LRA_HTTP_CONTEXT_HEADER, Current.getLast(headers.get(LRA_HTTP_CONTEXT_HEADER)));

throw new GenericLRAException(null, Response.Status.PRECONDITION_FAILED.getStatusCode(), msg, e);
}
}

if (!headers.containsKey(LRA_HTTP_HEADER)) {
Object lraContext = requestContext.getProperty(LRA_HTTP_HEADER);
if (!headers.containsKey(LRA_HTTP_CONTEXT_HEADER)) {
Object lraContext = requestContext.getProperty(LRA_HTTP_CONTEXT_HEADER);

if (lraContext != null) {
lraId = (URI) lraContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
package io.narayana.lra.coordinator.api;

import io.narayana.lra.GenericLRAException;
import io.narayana.lra.coordinator.domain.model.LRAStatusHolder;
import io.narayana.lra.coordinator.domain.service.LRAService;
import io.narayana.lra.logging.LRALogger;
Expand All @@ -29,7 +30,6 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.eclipse.microprofile.lra.client.GenericLRAException;
import org.jboss.logging.Logger;

import javax.enterprise.context.ApplicationScoped;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import com.arjuna.ats.arjuna.state.OutputObjectState;

import io.narayana.lra.Current;
import io.narayana.lra.GenericLRAException;
import io.narayana.lra.InvalidLRAIdException;
import io.narayana.lra.coordinator.domain.service.LRAService;
import io.narayana.lra.logging.LRALogger;
import org.eclipse.microprofile.lra.annotation.LRAStatus;
import org.eclipse.microprofile.lra.client.GenericLRAException;
import org.eclipse.microprofile.lra.client.InvalidLRAIdException;
import org.eclipse.microprofile.lra.annotation.ParticipantStatus;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
Expand All @@ -58,7 +59,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_HEADER;
import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_CONTEXT_HEADER;
import static org.eclipse.microprofile.lra.annotation.ws.rs.LRA.LRA_HTTP_RECOVERY_HEADER;

public class LRARecord extends AbstractRecord implements Comparable<AbstractRecord> {
Expand All @@ -82,7 +83,7 @@ public class LRARecord extends AbstractRecord implements Comparable<AbstractReco
private String compensatorData;
private ScheduledFuture<?> scheduledAbort;
private LRAService lraService;
private LRAStatus status;
private ParticipantStatus status;
boolean accepted;

public LRARecord() {
Expand All @@ -106,6 +107,8 @@ public LRARecord() {

if (parseException[0] != null) {
throw new InvalidLRAIdException(lraId, "Invalid link URI", parseException[0]);
} else if (compensateURI == null) {
throw new InvalidLRAIdException(lraId, "Invalid link URI: missing compensator");
}
} else {
this.compensateURI = new URI(String.format("%s/compensate", linkURI));
Expand Down Expand Up @@ -253,7 +256,7 @@ private int tryDoEnd(boolean compensate) {
scheduledAbort = null;
}

if (LRAStatus.Cancelling.equals(status)) {
if (ParticipantStatus.Compensating.equals(status)) {
compensate = true;
}

Expand All @@ -263,16 +266,16 @@ private int tryDoEnd(boolean compensate) {
}

endPath = compensateURI; // we are going to ask the participant to compensate
status = LRAStatus.Cancelling;
status = ParticipantStatus.Compensating;
} else {
if (isCompelete() || completeURI == null) {
status = LRAStatus.Closed;
status = ParticipantStatus.Completed;

return TwoPhaseOutcome.FINISH_OK; // the participant has already completed
}

endPath = completeURI; // we are going to ask the participant to complete
status = LRAStatus.Closing;
status = ParticipantStatus.Completing;
}

// NB trying to compensate when already completed is allowed (for nested LRAs)
Expand Down Expand Up @@ -300,9 +303,9 @@ private int tryDoEnd(boolean compensate) {
try {
// ask the participant to complete or compensate
Future<Response> asyncResponse = target.request()
.header(LRA_HTTP_HEADER, lraId.toASCIIString())
.header(LRA_HTTP_CONTEXT_HEADER, lraId.toASCIIString())
.header(LRA_HTTP_RECOVERY_HEADER, recoveryURI.toASCIIString())
.property(LRA_HTTP_HEADER, lraId) // make the context available to the jaxrs filters
.property(LRA_HTTP_CONTEXT_HEADER, lraId) // make the context available to the jaxrs filters
.async()
.put(Entity.entity(compensatorData, MediaType.APPLICATION_JSON));
// the catch block below catches any Timeout exception
Expand Down Expand Up @@ -365,7 +368,7 @@ private int tryDoEnd(boolean compensate) {
}

if (compensate) {
status = LRAStatus.Cancelling; // recovery will figure out the status via the status url
status = ParticipantStatus.Compensating; // recovery will figure out the status via the status url

/*
* We are mapping compensate onto Abort. TwoPhaseCoordinator uses presumed abort
Expand All @@ -375,7 +378,7 @@ private int tryDoEnd(boolean compensate) {
return TwoPhaseOutcome.HEURISTIC_HAZARD;
}

status = LRAStatus.Closing; // recovery will figure out the status via the status url
status = ParticipantStatus.Completing; // recovery will figure out the status via the status url

return TwoPhaseOutcome.FINISH_ERROR;
}
Expand All @@ -388,14 +391,14 @@ private int tryDoEnd(boolean compensate) {

private void updateStatus(boolean compensate) {
if (compensate) {
status = accepted ? LRAStatus.Cancelling : LRAStatus.Cancelled;
status = accepted ? ParticipantStatus.Compensating : ParticipantStatus.Compensated;
} else {
status = accepted ? LRAStatus.Closing : LRAStatus.Closed;
status = accepted ? ParticipantStatus.Completing : ParticipantStatus.Completed;
}
}

private int reportFailure(boolean compensate, String endPath) {
status = compensate ? LRAStatus.FailedToCancel : LRAStatus.FailedToClose;
status = compensate ? ParticipantStatus.FailedToCompensate : ParticipantStatus.FailedToComplete;

LRALogger.logger.warnf("LRARecord: participant %s reported a failure to %s",
endPath, compensate ? COMPENSATE_REL : COMPLETE_REL);
Expand Down Expand Up @@ -449,9 +452,9 @@ private int retryGetEndStatus(URI endPath, boolean compensate) {

// since this method is called from the recovery thread do not block
Future<Response> asyncResponse = target.request()
.header(LRA_HTTP_HEADER, lraId.toASCIIString())
.header(LRA_HTTP_CONTEXT_HEADER, lraId.toASCIIString())
.header(LRA_HTTP_RECOVERY_HEADER, recoveryURI.toASCIIString())
.property(LRA_HTTP_HEADER, lraId) // make the context available to the jaxrs filters
.property(LRA_HTTP_CONTEXT_HEADER, lraId) // make the context available to the jaxrs filters
.async()
.get();

Expand All @@ -468,35 +471,31 @@ private int retryGetEndStatus(URI endPath, boolean compensate) {
// the participant is available again and has reported its status
String s = response.readEntity(String.class);

status = LRAStatus.valueOf(s);
status = ParticipantStatus.valueOf(s);

switch (status) {
case Closed:
case Cancelled:
case Completed:
case Compensated:
return TwoPhaseOutcome.FINISH_OK;
case Closing:
case Cancelling:
case Completing:
case Compensating:
// still in progress - make sure recovery keeps retrying it
return TwoPhaseOutcome.HEURISTIC_HAZARD;
case FailedToCancel:
case FailedToClose:
case FailedToCompensate:
case FailedToComplete:
// the participant could not finish - log a warning and forget
LRALogger.logger.warnf(
"LRARecord.doEnd(compensate %b) get status %s did not finish: %s: WILL NOT RETRY",
compensate, target.getUri(), status);

if (forgetURI != null) {
forgetURI = statusURI; // forget is equivalent to HTTP delete on the status URI
}

if (forgetURI != null) {
try {
// let the participant know he can clean up
WebTarget target2 = client.target(forgetURI);
Future<Response> asyncResponse2 = target2.request()
.header(LRA_HTTP_HEADER, lraId.toASCIIString())
.header(LRA_HTTP_CONTEXT_HEADER, lraId.toASCIIString())
.header(LRA_HTTP_RECOVERY_HEADER, recoveryURI.toASCIIString())
.property(LRA_HTTP_HEADER, lraId) // make the context available to the jaxrs filters
.property(LRA_HTTP_CONTEXT_HEADER, lraId) // make the context available to the jaxrs filters
.async()
.delete();

Expand Down Expand Up @@ -612,11 +611,11 @@ boolean forget() {
}

private boolean isCompelete() {
return status != null && status == LRAStatus.Closed;
return status != null && status == ParticipantStatus.Completed;
}

private boolean isCompensated() {
return status != null && status == LRAStatus.Cancelled;
return status != null && status == ParticipantStatus.Compensated;
}

String getResponseData() {
Expand Down Expand Up @@ -657,7 +656,7 @@ public boolean restore_state(InputObjectState os, int t) {
unpackStatus(os);
participantPath = os.unpackString();
compensatorData = os.unpackString();
accepted = status == LRAStatus.Closing || status == LRAStatus.Cancelling;
accepted = status == ParticipantStatus.Completing || status == ParticipantStatus.Compensating;
} catch (IOException | URISyntaxException e) {
return false;
}
Expand All @@ -676,7 +675,7 @@ private void packStatus(OutputObjectState os) throws IOException {
}

private void unpackStatus(InputObjectState os) throws IOException {
status = os.unpackBoolean() ? LRAStatus.values()[os.unpackInt()] : null;
status = os.unpackBoolean() ? ParticipantStatus.values()[os.unpackInt()] : null;
}

private void packURI(OutputObjectState os, URI url) throws IOException {
Expand Down
Loading

0 comments on commit 364d90e

Please sign in to comment.