Skip to content

Commit

Permalink
Add suggested changes and add ack mechanism for HTTP adapter
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Kaemmer <[email protected]>
  • Loading branch information
mattkaem committed Jun 5, 2024
1 parent aabafce commit 0fa5e87
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -28,6 +28,7 @@
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.AbstractCommandContext;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponse;
Expand Down Expand Up @@ -706,13 +707,25 @@ private void doUploadMessage(
endpoint, tenant, deviceId);
if (commandContext != null) {
commandContext.getTracingSpan().log("forwarded command to device in HTTP response body");
commandContext.accept();
final Command command = commandContext.getCommand();
if (command.isAckRequired() && command.isValid()
&& commandContext instanceof AbstractCommandContext<?> abstractCommandContext) {
abstractCommandContext
.sendDeliverySuccessCommandResponseMessage(HttpURLConnection.HTTP_ACCEPTED,
"Command successfully forwarded to device",
currentSpan,
command.getCorrelationId(),
command.getMessagingType())
.onComplete(v -> commandContext.accept());
} else {
commandContext.accept();
}
metrics.reportCommand(
commandContext.getCommand().isOneWay() ? Direction.ONE_WAY : Direction.REQUEST,
command.isOneWay() ? Direction.ONE_WAY : Direction.REQUEST,
tenant,
tenantTracker.result(),
ProcessingOutcome.FORWARDED,
commandContext.getCommand().getPayloadSize(),
command.getPayloadSize(),
getMicrometerSample(commandContext));
}
currentSpan.finish();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2024 Contributors to the Eclipse Foundation
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -138,6 +138,9 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage,
final StringJoiner validationErrorJoiner = new StringJoiner(", ");
final boolean responseRequired = PubSubMessageHelper.isResponseRequired(attributes);
final boolean ackRequired = PubSubMessageHelper.isAckRequired(attributes);
if (responseRequired && ackRequired) {
validationErrorJoiner.add("response-required and ack-required must not both true");
}
final String correlationId = PubSubMessageHelper.getCorrelationId(attributes)
.filter(id -> !id.isEmpty())
.orElseGet(() -> {
Expand Down Expand Up @@ -176,7 +179,7 @@ public PubsubMessage getPubsubMessage() {

@Override
public boolean isAckRequired() {
return !responseRequired && ackRequired;
return ackRequired;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void release(final Throwable error) {
final ServiceInvocationException mappedError = StatusCodeMapper.toServerError(error);
final int status = mappedError.getErrorCode();
Tags.HTTP_STATUS.set(span, status);
if (isRequestResponseCommand() && !(error instanceof CommandAlreadyProcessedException)
if ((isRequestResponseCommand() || isAckRequiredCommand()) && !(error instanceof CommandAlreadyProcessedException)
&& !(error instanceof CommandToBeReprocessedException)) {
final String errorMessage = Optional
.ofNullable(ServiceInvocationException.getErrorMessageForExternalClient(mappedError))
Expand Down Expand Up @@ -90,7 +90,7 @@ public void modify(final boolean deliveryFailed, final boolean undeliverableHere
TracingHelper.logError(span, String.format("command for device handled with outcome 'modified' %s %s",
deliveryFailedReason, undeliverableHereReason));
Tags.HTTP_STATUS.set(span, status);
if (isRequestResponseCommand()) {
if (isRequestResponseCommand() || isAckRequiredCommand()) {
final String error = String.format("command not processed %s %s", deliveryFailedReason,
undeliverableHereReason);
final String correlationId = getCorrelationId();
Expand All @@ -112,7 +112,7 @@ public void reject(final Throwable error) {
TracingHelper.logError(getTracingSpan(), "client error trying to deliver or process command", error);
final Span span = getTracingSpan();
Tags.HTTP_STATUS.set(span, status);
if (isRequestResponseCommand()) {
if (isRequestResponseCommand() || isAckRequiredCommand()) {
final String nonNullCause = Optional.ofNullable(error.getMessage()).orElse("Command message rejected");
final String correlationId = getCorrelationId();
sendDeliveryFailureCommandResponseMessage(status, nonNullCause, span, null, correlationId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -118,6 +118,15 @@ protected boolean isRequestResponseCommand() {
return !command.isOneWay();
}

/**
* Checks if the command is an ack-required command.
*
* @return True if it is an ack-required command, false otherwise.
*/
protected boolean isAckRequiredCommand() {
return command.isAckRequired();
}

/**
* Sends a command response if the command response message represents an error message.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation
* Copyright (c) 2016 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down
Loading

0 comments on commit 0fa5e87

Please sign in to comment.