Skip to content

Commit

Permalink
feat(webhook): Add support for document upload
Browse files Browse the repository at this point in the history
  • Loading branch information
johnBgood committed Nov 14, 2024
1 parent 9bceee7 commit c49c998
Show file tree
Hide file tree
Showing 23 changed files with 362 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.EvictingQueue;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.CorrelationResult;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorDefinition;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.api.inbound.*;
import io.camunda.connector.api.secret.SecretProvider;
import io.camunda.connector.api.validation.ValidationProvider;
import io.camunda.connector.feel.FeelEngineWrapperException;
Expand Down Expand Up @@ -101,6 +96,11 @@ public InboundConnectorContextImpl(
logs);
}

@Override
public ActivationCheckResult canActivate(Object variables) {
return correlationHandler.canActivate(connectorDetails.connectorElements(), variables);
}

@Override
public CorrelationResult correlateWithResult(Object variables) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@
package io.camunda.connector.runtime.core.inbound;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.CorrelationResult;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorDefinition;
import io.camunda.connector.api.inbound.InboundIntermediateConnectorContext;
import io.camunda.connector.api.inbound.ProcessInstanceContext;
import io.camunda.connector.api.inbound.*;
import io.camunda.connector.api.validation.ValidationProvider;
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.correlation.MessageCorrelationPoint.BoundaryEventCorrelationPoint;
Expand Down Expand Up @@ -93,6 +87,11 @@ public CorrelationResult correlateWithResult(Object variables) {
return inboundContext.correlateWithResult(variables);
}

@Override
public ActivationCheckResult canActivate(Object variables) {
return inboundContext.canActivate(variables);
}

@Override
public void cancel(final Throwable exception) {
inboundContext.cancel(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.camunda.connector.runtime.core.inbound.correlation;

import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.ActivationCheckResult;
import io.camunda.connector.api.inbound.CorrelationResult;
import io.camunda.connector.api.inbound.CorrelationResult.Failure;
import io.camunda.connector.api.inbound.CorrelationResult.Failure.ActivationConditionNotMet;
Expand Down Expand Up @@ -70,29 +71,24 @@ public CorrelationResult correlate(List<InboundConnectorElement> elements, Objec
public CorrelationResult correlate(
List<InboundConnectorElement> elements, Object variables, String messageId) {

final List<InboundConnectorElement> matchingElements;
final ActivationCheckResult activationCheckResult;
try {
matchingElements =
elements.stream().filter(e -> isActivationConditionMet(e, variables)).toList();
activationCheckResult = canActivate(elements, variables);
} catch (ConnectorInputException e) {
LOG.info("Failed to evaluate activation condition", e);
return new CorrelationResult.Failure.InvalidInput(
"Failed to evaluate activation condition against the provided input", e);
}

if (matchingElements.isEmpty()) {
var discardUnmatchedEvents =
elements.stream()
.map(InboundConnectorElement::consumeUnmatchedEvents)
.anyMatch(e -> e.equals(Boolean.TRUE));
return new ActivationConditionNotMet(discardUnmatchedEvents);
}
if (matchingElements.size() > 1) {
return new Failure.InvalidInput("Multiple connectors are activated for the same input", null);
}

var activatedElement = matchingElements.getFirst();
return correlateInternal(activatedElement, variables, messageId);
return switch (activationCheckResult) {
case ActivationCheckResult.Failure.NoMatchingElement noMatchingElement ->
new ActivationConditionNotMet(noMatchingElement.discardUnmatchedEvents());
case ActivationCheckResult.Failure.TooManyMatchingElements ignored ->

Check notice

Code scanning / CodeQL

Unread local variable Note

Variable 'TooManyMatchingElements ignored' is never read.
new Failure.InvalidInput("Multiple connectors are activated for the same input", null);
case ActivationCheckResult.Success.CanActivate canActivate ->
correlateInternal(
findMatchingElement(elements, canActivate.activatedElement()), variables, messageId);
};
}

protected CorrelationResult correlateInternal(
Expand Down Expand Up @@ -243,6 +239,36 @@ private CorrelationResult publishMessage(
return result;
}

private List<InboundConnectorElement> getMatchingElements(
List<InboundConnectorElement> elements, Object variables) {
return elements.stream().filter(e -> isActivationConditionMet(e, variables)).toList();
}

private InboundConnectorElement findMatchingElement(
List<InboundConnectorElement> elements, ProcessElementContext contentElement) {
return elements.stream()
.filter(e -> e.element().elementId().equals(contentElement.getElement().elementId()))
.findFirst()
.get();
}

public ActivationCheckResult canActivate(List<InboundConnectorElement> elements, Object context) {
var matchingElements = getMatchingElements(elements, context);

if (matchingElements.isEmpty()) {
var discardUnmatchedEvents =
elements.stream()
.map(InboundConnectorElement::consumeUnmatchedEvents)
.anyMatch(e -> e.equals(Boolean.TRUE));
return new ActivationCheckResult.Failure.NoMatchingElement(discardUnmatchedEvents);
}
if (matchingElements.size() > 1) {
return new ActivationCheckResult.Failure.TooManyMatchingElements();
}
return new ActivationCheckResult.Success.CanActivate(
processElementContextFactory.createContext(matchingElements.getFirst()));
}

protected boolean isActivationConditionMet(InboundConnectorElement definition, Object context) {

var maybeCondition = definition.activationCondition();
Expand Down
5 changes: 5 additions & 0 deletions connector-runtime/connector-runtime-spring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public MeteredInboundCorrelationHandler(
}

@Override
protected boolean isActivationConditionMet(InboundConnectorElement def, Object context) {
public boolean isActivationConditionMet(InboundConnectorElement def, Object context) {
boolean isConditionMet = super.isActivationConditionMet(def, context);
if (!isConditionMet) {
metricsRecorder.increase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import static org.springframework.web.bind.annotation.RequestMethod.PUT;

import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.inbound.ActivationCheckResult;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.ForwardErrorToUpstream;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy.Ignore;
import io.camunda.connector.api.inbound.CorrelationResult;
import io.camunda.connector.api.inbound.CorrelationResult.Success.MessagePublished;
import io.camunda.connector.api.inbound.CorrelationResult.Success.ProcessInstanceCreated;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.webhook.MappedHttpRequest;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorException;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorException.WebhookSecurityException;
Expand All @@ -40,11 +42,14 @@
import io.camunda.connector.feel.FeelEngineWrapperException;
import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable;
import io.camunda.connector.runtime.inbound.webhook.model.HttpServletRequestWebhookProcessingPayload;
import io.camunda.document.DocumentMetadata;
import io.camunda.document.reference.DocumentReference;
import io.camunda.document.store.DocumentCreationRequest;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.Part;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -107,7 +112,11 @@ public ResponseEntity<?> inbound(
connector -> {
WebhookProcessingPayload payload =
new HttpServletRequestWebhookProcessingPayload(
httpServletRequest, params, headers, bodyAsByteArray);
httpServletRequest,
params,
headers,
bodyAsByteArray,
getParts(httpServletRequest));
return processWebhook(connector, payload);
})
.orElseGet(() -> ResponseEntity.notFound().build());
Expand All @@ -128,9 +137,12 @@ private ResponseEntity<?> processWebhook(
// when verification was skipped
// Step 2: trigger and correlate
var webhookResult = connectorHook.triggerWebhook(payload);
var ctxData = toWebhookTriggerResultContext(webhookResult);
// create documents if the connector is activable
var documentReferences = createDocuments(connector.context(), payload);
var ctxData = toWebhookTriggerResultContext(webhookResult, documentReferences);
// correlate
var correlationResult = connector.context().correlateWithResult(ctxData);
response = buildResponse(webhookResult, correlationResult);
response = buildResponse(webhookResult, documentReferences, correlationResult);
}
} catch (Exception e) {
LOG.info("Webhook: {} failed with exception", connector.context().getDefinition(), e);
Expand All @@ -139,6 +151,30 @@ private ResponseEntity<?> processWebhook(
return response;
}

private List<DocumentReference> createDocuments(
InboundConnectorContext context, WebhookProcessingPayload payload) {
if (!(context.canActivate(payload) instanceof ActivationCheckResult.Success)) {
return List.of();
}

return payload.parts().stream()
.map(
part -> {
Map<String, Object> metadata = new HashMap<>();
if (part.submittedFileName() != null) {
metadata.put(DocumentMetadata.FILE_NAME, part.submittedFileName());
}
metadata.put(DocumentMetadata.CONTENT_TYPE, part.contentType());
return context
.createDocument(
DocumentCreationRequest.from(part.inputStream())
.metadata(new DocumentMetadata(metadata))
.build())
.reference();
})
.toList();
}

protected ResponseEntity<?> verify(
WebhookConnectorExecutable connectorHook, WebhookProcessingPayload payload) {
WebhookHttpResponse verificationResponse = connectorHook.verify(payload);
Expand All @@ -150,15 +186,18 @@ protected ResponseEntity<?> verify(
}

private ResponseEntity<?> buildResponse(
WebhookResult webhookResult, CorrelationResult correlationResult) {
WebhookResult webhookResult,
List<DocumentReference> documentReferences,
CorrelationResult correlationResult) {
ResponseEntity<?> response;
if (correlationResult instanceof CorrelationResult.Success success) {
response = buildSuccessfulResponse(webhookResult, success);
response = buildSuccessfulResponse(webhookResult, documentReferences, success);
} else {
if (correlationResult instanceof CorrelationResult.Failure failure) {
switch (failure.handlingStrategy()) {
case ForwardErrorToUpstream ignored -> response = buildErrorResponse(failure);
case Ignore ignored -> response = buildSuccessfulResponse(webhookResult, null);
case Ignore ignored ->

Check notice

Code scanning / CodeQL

Unread local variable Note

Variable 'Ignore ignored' is never read.
response = buildSuccessfulResponse(webhookResult, documentReferences, null);
}
} else {
throw new IllegalStateException("Illegal correlation result : " + correlationResult);
Expand All @@ -178,10 +217,13 @@ private ResponseEntity<?> buildErrorResponse(CorrelationResult.Failure failure)
}

private ResponseEntity<?> buildSuccessfulResponse(
WebhookResult webhookResult, CorrelationResult.Success correlationResult) {
WebhookResult webhookResult,
List<DocumentReference> documentReferences,
CorrelationResult.Success correlationResult) {
ResponseEntity<?> response;
if (webhookResult.response() != null) {
var processVariablesContext = toWebhookResultContext(webhookResult, correlationResult);
var processVariablesContext =
toWebhookResultContext(webhookResult, documentReferences, correlationResult);
var httpResponseData = webhookResult.response().apply(processVariablesContext);
if (httpResponseData != null) {
response = toResponseEntity(httpResponseData);
Expand Down Expand Up @@ -217,18 +259,53 @@ protected ResponseEntity<?> buildErrorResponse(Exception e) {
return response;
}

private Collection<io.camunda.connector.api.inbound.webhook.Part> getParts(
HttpServletRequest httpServletRequest) {
try {
return httpServletRequest.getParts().stream()
.map(InboundWebhookRestController::mapToCamundaPart)
.filter(Objects::nonNull)
.toList();
} catch (IOException e) {
LOG.error("Failed to get parts from request", e);
throw new RuntimeException("Failed to get parts from request", e);
} catch (ServletException e) {
LOG.debug("The request is not multipart/form-data, silently ignoring", e);
return List.of();
} catch (IllegalStateException e) {
LOG.error("Size limits are exceeded or no multipart configuration is provided", e);
throw new RuntimeException(
"Size limits are exceeded or no multipart configuration is provided", e);
}
}

private static io.camunda.connector.api.inbound.webhook.Part mapToCamundaPart(Part part) {
try {
return new io.camunda.connector.api.inbound.webhook.Part(
part.getName(),
part.getSubmittedFileName(),
part.getInputStream(),
part.getContentType());
} catch (IOException e) {
LOG.warn("Failed to process part: {}", part.getName(), e);
return null;
}
}

// This will be used to correlate data returned from connector.
// In other words, we pass this data to Zeebe.
private WebhookTriggerResultContext toWebhookTriggerResultContext(WebhookResult processedResult) {
WebhookTriggerResultContext ctx = new WebhookTriggerResultContext(null, null);
private WebhookTriggerResultContext toWebhookTriggerResultContext(
WebhookResult processedResult, List<DocumentReference> documentReferences) {
WebhookTriggerResultContext ctx = new WebhookTriggerResultContext(null, null, List.of());
if (processedResult != null) {
ctx =
new WebhookTriggerResultContext(
new MappedHttpRequest(
Optional.ofNullable(processedResult.request().body()).orElse(emptyMap()),
Optional.ofNullable(processedResult.request().headers()).orElse(emptyMap()),
Optional.ofNullable(processedResult.request().params()).orElse(emptyMap())),
Optional.ofNullable(processedResult.connectorData()).orElse(emptyMap()));
Optional.ofNullable(processedResult.connectorData()).orElse(emptyMap()),
documentReferences);
}
return ctx;
}
Expand All @@ -237,7 +314,9 @@ private WebhookTriggerResultContext toWebhookTriggerResultContext(WebhookResult
// In other words, depending on the response body expression,
// this data may be returned to the webhook caller.
private WebhookResultContext toWebhookResultContext(
WebhookResult processedResult, CorrelationResult.Success correlationResult) {
WebhookResult processedResult,
List<DocumentReference> documents,
CorrelationResult.Success correlationResult) {
WebhookResultContext ctx = new WebhookResultContext(null, null, null);
if (processedResult != null) {
Object correlation = null;
Expand All @@ -252,7 +331,8 @@ private WebhookResultContext toWebhookResultContext(
Optional.ofNullable(processedResult.request().headers()).orElse(emptyMap()),
Optional.ofNullable(processedResult.request().params()).orElse(emptyMap())),
Optional.ofNullable(processedResult.connectorData()).orElse(emptyMap()),
Optional.ofNullable(correlation).orElse(emptyMap()));
Optional.ofNullable(correlation).orElse(emptyMap()),
documents);
}
return ctx;
}
Expand Down
Loading

0 comments on commit c49c998

Please sign in to comment.