Skip to content

Commit

Permalink
Apply fixes/improvements regarding code inspection findings.
Browse files Browse the repository at this point in the history
Signed-off-by: Carsten Lohmann <[email protected]>
  • Loading branch information
calohmn committed May 19, 2022
1 parent 3eb369a commit b814ba9
Show file tree
Hide file tree
Showing 14 changed files with 29 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,11 @@ protected Buffer getPayload(final JsonObject loraMessage) {
switch (encodingType) {
case ENCODING_TYPE_HEX:
return payload
.filter(String.class::isInstance)
.map(String.class::cast)
.map(s -> Buffer.buffer(BaseEncoding.base16().decode(s.toUpperCase())))
.orElseThrow(() -> new LoraProviderMalformedPayloadException("message does not contain HEX encoded payload property"));
default:
case ENCODING_TYPE_BASE64:
return payload
.filter(String.class::isInstance)
.map(String.class::cast)
.map(s -> Buffer.buffer(Base64.getDecoder().decode(s)))
.orElseThrow(() -> new LoraProviderMalformedPayloadException("message does not contain BASE64 encoded payload property"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ protected String getDevEui(final JsonObject loraMessage) {

Objects.requireNonNull(loraMessage);
return LoraUtils.getChildObject(loraMessage, FIELD_ORBIWISE_DEVICE_EUI, String.class)
.filter(String.class::isInstance)
.map(String.class::cast)
.map(String::toUpperCase)
.orElseThrow(() -> new LoraProviderMalformedPayloadException("message does not contain String valued device ID property"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,7 @@ private Future<Void> uploadMessage(

}).recover(t -> {

if (ClientErrorException.class.isInstance(t)) {
final ClientErrorException e = (ClientErrorException) t;
if (t instanceof ClientErrorException e) {
log.debug("cannot process message [endpoint: {}] from device [tenantId: {}, deviceId: {}]: {} - {}",
endpoint, tenantObject.getTenantId(), deviceId, e.getErrorCode(), e.getMessage());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,19 +541,17 @@ protected void handleSendMessageTimeout(

private Future<ProtonDelivery> mapUnacceptedOutcomeToErrorResult(final ProtonDelivery delivery) {
final DeliveryState remoteState = delivery.getRemoteState();
if (Accepted.class.isInstance(remoteState)) {
if (remoteState instanceof Accepted) {
throw new IllegalStateException("delivery is expected to be rejected, released or modified here, not accepted");
}
ServiceInvocationException e = null;
if (Rejected.class.isInstance(remoteState)) {
final Rejected rejected = (Rejected) remoteState;
if (remoteState instanceof Rejected rejected) {
e = Optional.ofNullable(rejected.getError())
.map(ErrorConverter::fromTransferError)
.orElseGet(() -> new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST));
} else if (Released.class.isInstance(remoteState)) {
} else if (remoteState instanceof Released) {
e = new MessageNotProcessedException();
} else if (Modified.class.isInstance(remoteState)) {
final Modified modified = (Modified) remoteState;
} else if (remoteState instanceof Modified modified) {
if (modified.getUndeliverableHere()) {
e = new MessageUndeliverableException();
} else {
Expand Down Expand Up @@ -593,14 +591,13 @@ protected final void logUpdatedDeliveryState(final Span currentSpan, final Messa
final String messageId = message.getMessageId() != null ? message.getMessageId().toString() : "";
final String messageAddress = getMessageAddress(message);
final DeliveryState remoteState = delivery.getRemoteState();
if (Accepted.class.isInstance(remoteState)) {
if (remoteState instanceof Accepted) {
log.trace("message [ID: {}, address: {}] accepted by peer", messageId, messageAddress);
currentSpan.log("message accepted by peer");
Tags.HTTP_STATUS.set(currentSpan, HttpURLConnection.HTTP_ACCEPTED);
} else {
final Map<String, Object> events = new HashMap<>();
if (Rejected.class.isInstance(remoteState)) {
final Rejected rejected = (Rejected) delivery.getRemoteState();
if (remoteState instanceof Rejected rejected) {
Tags.HTTP_STATUS.set(currentSpan, HttpURLConnection.HTTP_BAD_REQUEST);
if (rejected.getError() == null) {
logMessageSendingError("message [ID: {}, address: {}] rejected by peer", messageId, messageAddress);
Expand All @@ -611,13 +608,12 @@ protected final void logUpdatedDeliveryState(final Span currentSpan, final Messa
events.put(Fields.MESSAGE, String.format("message rejected by peer: %s, %s",
rejected.getError().getCondition(), rejected.getError().getDescription()));
}
} else if (Released.class.isInstance(remoteState)) {
} else if (remoteState instanceof Released) {
logMessageSendingError("message [ID: {}, address: {}] not accepted by peer, remote state: {}",
messageId, messageAddress, remoteState.getClass().getSimpleName());
Tags.HTTP_STATUS.set(currentSpan, HttpURLConnection.HTTP_UNAVAILABLE);
events.put(Fields.MESSAGE, "message not accepted by peer, remote state: " + remoteState);
} else if (Modified.class.isInstance(remoteState)) {
final Modified modified = (Modified) delivery.getRemoteState();
} else if (remoteState instanceof Modified modified) {
logMessageSendingError("message [ID: {}, address: {}] not accepted by peer, remote state: {}",
messageId, messageAddress, modified);
Tags.HTTP_STATUS.set(currentSpan, modified.getUndeliverableHere() ? HttpURLConnection.HTTP_NOT_FOUND
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,7 @@ private Future<R> sendRequest(
final Promise<R> failedResult = Promise.promise();
final DeliveryState remoteState = deliveryUpdated.getRemoteState();
sample.completed(remoteState);
if (Rejected.class.isInstance(remoteState)) {
final Rejected rejected = (Rejected) remoteState;
if (remoteState instanceof Rejected rejected) {
if (rejected.getError() != null) {
LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}]: {}",
requestTargetAddress, request.getSubject(), correlationId, rejected.getError());
Expand All @@ -660,7 +659,7 @@ private Future<R> sendRequest(
failedResult.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST));
cancelRequest(correlationId, failedResult.future());
}
} else if (Accepted.class.isInstance(remoteState)) {
} else if (remoteState instanceof Accepted) {
LOG.trace("service has accepted request [target address: {}, subject: {}, correlation ID: {}]",
requestTargetAddress, request.getSubject(), correlationId);
currentSpan.log("request accepted by peer");
Expand All @@ -673,15 +672,14 @@ private Future<R> sendRequest(
requestTargetAddress, request.getSubject(), correlationId);
}
}
} else if (Released.class.isInstance(remoteState)) {
} else if (remoteState instanceof Released) {
LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}], remote state: {}",
requestTargetAddress, request.getSubject(), correlationId, remoteState);
failedResult.fail(new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE));
cancelRequest(correlationId, failedResult.future());
} else if (Modified.class.isInstance(remoteState)) {
} else if (remoteState instanceof Modified modified) {
LOG.debug("service did not accept request [target address: {}, subject: {}, correlation ID: {}], remote state: {}",
requestTargetAddress, request.getSubject(), correlationId, remoteState);
final Modified modified = (Modified) deliveryUpdated.getRemoteState();
failedResult.fail(modified.getUndeliverableHere() ? new ClientErrorException(HttpURLConnection.HTTP_NOT_FOUND)
: new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE));
cancelRequest(correlationId, failedResult.future());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,22 @@ private void updateDelivery(final DeliveryState deliveryState) {
deliveryState, delivery.getRemoteState());
TracingHelper.logError(getTracingSpan(), msg);
LOG.info("{} [{}]", msg, getCommand());
} else if (Accepted.class.isInstance(deliveryState)) {
} else if (deliveryState instanceof Accepted) {
LOG.trace("accepted command message [{}]", getCommand());
span.log("accepted command for device");

} else if (Released.class.isInstance(deliveryState)) {
} else if (deliveryState instanceof Released) {
LOG.debug("released command message [{}]", getCommand());
TracingHelper.logError(span, "released command for device");

} else if (Modified.class.isInstance(deliveryState)) {
final Modified modified = (Modified) deliveryState;
} else if (deliveryState instanceof Modified modified) {
LOG.debug("modified command message [{}]", getCommand());
TracingHelper.logError(span, "modified command for device"
+ (Boolean.TRUE.equals(modified.getDeliveryFailed()) ? "; delivery failed" : "")
+ (Boolean.TRUE.equals(modified.getUndeliverableHere()) ? "; undeliverable here" : ""));

} else if (Rejected.class.isInstance(deliveryState)) {
final ErrorCondition errorCondition = ((Rejected) deliveryState).getError();
} else if (deliveryState instanceof Rejected rejected) {
final ErrorCondition errorCondition = rejected.getError();
LOG.debug("rejected command message [error: {}, command: {}]", errorCondition, getCommand());
TracingHelper.logError(span, "rejected command for device"
+ ((errorCondition != null && errorCondition.getDescription() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,15 @@ public Future<Void> sendCommand(
final DeliveryState remoteState = delivery.getRemoteState();
LOG.trace("command [{}] sent to downstream peer; remote state of delivery: {}",
commandContext.getCommand(), remoteState);
if (Accepted.class.isInstance(remoteState)) {
if (remoteState instanceof Accepted) {
commandContext.accept();
} else if (Rejected.class.isInstance(remoteState)) {
final Rejected rejected = (Rejected) remoteState;
} else if (remoteState instanceof Rejected rejected) {
commandContext.reject(Optional.ofNullable(rejected.getError())
.map(ErrorCondition::getDescription)
.orElse(null));
} else if (Released.class.isInstance(remoteState)) {
} else if (remoteState instanceof Released) {
commandContext.release();
} else if (Modified.class.isInstance(remoteState)) {
final Modified modified = (Modified) remoteState;
} else if (remoteState instanceof Modified modified) {
commandContext.modify(modified.getDeliveryFailed(), modified.getUndeliverableHere());
}
return (Void) null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class KafkaBasedInternalCommandConsumer implements InternalCommandConsume
private final Supplier<Future<AsyncHandlingAutoCommitKafkaConsumer<Buffer>>> consumerCreator;
private final Supplier<Future<Admin>> kafkaAdminClientCreator;
private final String adapterInstanceId;
private final Duration pollTimeout;
private final CommandHandlers commandHandlers;
private final Tracer tracer;
private final CommandResponseSender commandResponseSender;
Expand All @@ -99,8 +100,6 @@ public class KafkaBasedInternalCommandConsumer implements InternalCommandConsume
private Admin adminClient;
private Context context;
private KafkaClientMetricsSupport metricsSupport;

private final Duration pollTimeout;
private long retryCreateTopicTimerId;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ protected void consumeData() {
Optional.ofNullable(telemetryConsumer)
.map(MessageConsumer::close)
.ifPresent(closeFutures::add);
Optional.ofNullable(client)
Optional.of(client)
.map(Lifecycle::stop)
.ifPresent(closeFutures::add);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public enum QoS {

static final String TAG_NAME = "qos";

private Tag tag;
private final Tag tag;

QoS() {
this.tag = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public JdbcProperties(final JdbcOptions options) {
setDriverClass(options.driverClass());
options.maximumPoolSize().ifPresent(this::setMaximumPoolSize);
options.password().ifPresent(this::setPassword);
options.tableName().ifPresent(this::setTableName);;
options.tableName().ifPresent(this::setTableName);
setUrl(options.url());
options.username().ifPresent(this::setUsername);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public final Future<OperationResult<Id>> createTenant(
.compose(ok -> processCreateTenant(tenantIdValue, tenantObj, span))
.onSuccess(result -> notificationSender.handle(new TenantChangeNotification(LifecycleChange.CREATE,
tenantIdValue, Instant.now(), tenantObj.isEnabled())))
.recover(t -> DeviceRegistryUtils.mapError(t, tenantId.get()));
.recover(t -> DeviceRegistryUtils.mapError(t, tenantIdValue));
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion site/documentation/content/api/Metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Additional tags used for metrics reported by protocol adapters are:
| *direction* | `one-way`, `request`, `response` | The direction in which a Command &amp; Control message is being sent:<br>`one-way` indicates a command sent to a device for which the sending application doesn't expect to receive a response.<br>`request` indicates a command request message sent to a device.<br>`response` indicates a command response received from a device. |
| *qos* | `0`, `1`, `unknown` | The quality of service used for a telemetry or event message.<br>`0` indicates *at most once*,<br>`1` indicates *at least once* and<br> `none` indicates unknown delivery semantics. |
| *status* | `forwarded`, `unprocessable`, `undeliverable` | The processing status of a message.<br>`forwarded` indicates that the message has been forwarded to a downstream consumer<br>`unprocessable` indicates that the message has not been processed not forwarded, e.g. because the message was malformed<br>`undeliverable` indicates that the message could not be forwarded, e.g. because there is no downstream consumer or due to an infrastructure problem |
| *tenant* | *string* | The identifier of the tenant that the metric is being reported for |
| *tenant* | *string* | The identifier of the tenant that the metric is being reported for. |
| *ttd* | `command`, `expired`, `none` | A status indicating the outcome of processing a TTD value contained in a message received from a device.<br>`command` indicates that a command for the device has been included in the response to the device's request for uploading the message.<br>`expired` indicates that a response without a command has been sent to the device.<br>`none` indicates that either no TTD value has been specified by the device or that the protocol adapter does not support it. |
| *type* | `telemetry`, `event` | The type of (downstream) message that the metric is being reported for. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public final class ConfigMappingSupport {

private ConfigMappingSupport() {
// prevent instantation
// prevent instantiation
}

/**
Expand Down

0 comments on commit b814ba9

Please sign in to comment.