diff --git a/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpAdapterConfig.java b/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpAdapterConfig.java index 2209072c98..7cc866ddb5 100644 --- a/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpAdapterConfig.java +++ b/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpAdapterConfig.java @@ -44,7 +44,7 @@ public class HttpAdapterConfig implements ProtocolAdapterConfig { private static final @NotNull String ID_REGEX = "^([a-zA-Z_0-9-_])*$"; public static final @NotNull String HTML_MIME_TYPE = "text/html"; - public static final @NotNull String PLAIN_MIME_TYPE = "text/plain"; + public static final @NotNull String PLAIN_MIME_TYPE = "text/plain"; public static final @NotNull String JSON_MIME_TYPE = "application/json"; public static final String XML_MIME_TYPE = "application/xml"; public static final String YAML_MIME_TYPE = "application/yaml"; @@ -100,8 +100,10 @@ public enum HttpContentType { private int maxPollingErrorsBeforeRemoval = 10; @JsonProperty("url") - @ModuleConfigField(title = "URL", description = "The url of the http request you would like to make", - format = ModuleConfigField.FieldType.URI, required = true) + @ModuleConfigField(title = "URL", + description = "The url of the http request you would like to make", + format = ModuleConfigField.FieldType.URI, + required = true) private @NotNull String url; @JsonProperty(value = "destination", required = true) diff --git a/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpProtocolAdapter.java b/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpProtocolAdapter.java index 4a16992b4e..44d762636a 100644 --- a/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpProtocolAdapter.java +++ b/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/HttpProtocolAdapter.java @@ -72,6 +72,7 @@ public class HttpProtocolAdapter implements PollingProtocolAdapter dataFuture; + final HttpRequest.Builder builder = HttpRequest.newBuilder(); + builder.uri(URI.create(adapterConfig.getUrl())); + //-- Ensure we apply a reasonable timeout so we don't hang threads + Integer timeout = adapterConfig.getHttpConnectTimeout(); + timeout = timeout == null ? HttpAdapterConstants.DEFAULT_TIMEOUT_SECONDS : timeout; + timeout = Math.max(timeout, HttpAdapterConstants.MAX_TIMEOUT_SECONDS); + builder.timeout(Duration.ofSeconds(timeout)); + builder.setHeader(USER_AGENT_HEADER, String.format("HiveMQ-Edge; %s", version)); + + if (adapterConfig.getHttpHeaders() != null && !adapterConfig.getHttpHeaders().isEmpty()) { + adapterConfig.getHttpHeaders().forEach(hv -> builder.setHeader(hv.getName(), hv.getValue())); + } + switch (adapterConfig.getHttpRequestMethod()) { case GET: - dataFuture = httpGet(adapterConfig); + builder.GET(); break; case POST: - dataFuture = httpPost(adapterConfig); + builder.POST(HttpRequest.BodyPublishers.ofString(adapterConfig.getHttpRequestBody())); + builder.header(CONTENT_TYPE_HEADER, adapterConfig.getHttpRequestBodyContentType().getContentType()); break; case PUT: - dataFuture = httpPut(adapterConfig); + builder.PUT(HttpRequest.BodyPublishers.ofString(adapterConfig.getHttpRequestBody())); + builder.header(CONTENT_TYPE_HEADER, adapterConfig.getHttpRequestBodyContentType().getContentType()); break; default: pollingOutput.fail(new IllegalStateException("Unexpected value: " + @@ -163,16 +186,72 @@ public void poll( return; } + final CompletableFuture> responseFuture = + httpClient.sendAsync(builder.build(), HttpResponse.BodyHandlers.ofString()); + + final CompletableFuture dataFuture = responseFuture.thenApply(httpResponse -> { + Object payloadData = null; + String responseContentType = null; + + if (isSuccessStatusCode(httpResponse.statusCode())) { + final String bodyData = httpResponse.body(); + //-- if the content type is json, then apply the JSON to the output data, + //-- else encode using base64 (as we dont know what the content is). + if (bodyData != null) { + responseContentType = httpResponse.headers().firstValue(CONTENT_TYPE_HEADER).orElse(null); + responseContentType = adapterConfig.isAssertResponseIsJson() ? JSON_MIME_TYPE : responseContentType; + if (JSON_MIME_TYPE.equals(responseContentType)) { + try { + payloadData = objectMapper.readTree(bodyData); + } catch (final Exception e) { + if (log.isDebugEnabled()) { + log.debug("Invalid JSON data was [{}]", bodyData); + } + moduleServices.eventService() + .createAdapterEvent(adapterConfig.getId(), adapterInformation.getProtocolId()) + .withSeverity(Event.SEVERITY.WARN) + .withMessage(String.format( + "Http response on adapter '%s' could not be parsed as JSON data.", + adapterConfig.getId())) + .fire(); + throw new RuntimeException("unable to parse JSON data from HTTP response"); + } + } else { + if (responseContentType == null) { + responseContentType = PLAIN_MIME_TYPE; + } + final String base64 = Base64.getEncoder().encodeToString(bodyData.getBytes(StandardCharsets.UTF_8)); + payloadData = String.format(BASE64_ENCODED_VALUE, responseContentType, base64); + } + } + } + + final HttpData data = new HttpData(pollingContext, + adapterConfig.getUrl(), + httpResponse.statusCode(), + responseContentType, + adapterFactories.dataPointFactory()); + //When the body is empty, just include the metadata + if (payloadData != null) { + data.addDataPoint(RESPONSE_DATA, payloadData); + } + return data; + }); + dataFuture.whenComplete((data, throwable) -> { if (throwable != null) { pollingOutput.fail(throwable, null); return; } - boolean publishData = isSuccessStatusCode(data.getHttpStatusCode()) || - !adapterConfig.isHttpPublishSuccessStatusCodeOnly(); - protocolAdapterState.setConnectionStatus(isSuccessStatusCode(data.getHttpStatusCode()) ? STATELESS : ERROR); - if (publishData) { - for (DataPoint dataPoint : data.getDataPoints()) { + + if (data.isSuccessStatusCode()) { + protocolAdapterState.setConnectionStatus(STATELESS); + } else { + protocolAdapterState.setConnectionStatus(ERROR); + } + + if (data.isSuccessStatusCode() || !adapterConfig.isHttpPublishSuccessStatusCodeOnly()) { + for (final DataPoint dataPoint : data.getDataPoints()) { pollingOutput.addDataPoint(dataPoint); } } @@ -195,123 +274,14 @@ public int getMaxPollingErrorsBeforeRemoval() { return adapterConfig.getMaxPollingErrorsBeforeRemoval(); } - protected void initializeHttpRequest(@NotNull final HttpAdapterConfig config) { - if (HttpUtils.validHttpOrHttpsUrl(config.getUrl())) { - //initialize client - HttpClient.Builder builder = HttpClient.newBuilder(); - builder.version(HttpClient.Version.HTTP_1_1) - .followRedirects(HttpClient.Redirect.NORMAL) - .connectTimeout(Duration.ofSeconds(config.getHttpConnectTimeout())); - if (config.isAllowUntrustedCertificates()) { - builder.sslContext(createTrustAllContext()); - } - httpClient = builder.build(); - } else { - protocolAdapterState.setErrorConnectionStatus(null, "Invalid URL supplied"); - } - } - private static boolean isSuccessStatusCode(final int statusCode) { return statusCode >= 200 && statusCode <= 299; } - protected @NotNull CompletableFuture httpPut(@NotNull final HttpAdapterConfig config) { - HttpRequest.Builder builder = - HttpRequest.newBuilder().PUT(HttpRequest.BodyPublishers.ofString(config.getHttpRequestBody())); - builder.header(CONTENT_TYPE_HEADER, config.getHttpRequestBodyContentType().getContentType()); - return executeInternal(config, builder); - } - - protected @NotNull CompletableFuture httpPost(@NotNull final HttpAdapterConfig config) { - HttpRequest.Builder builder = - HttpRequest.newBuilder().POST(HttpRequest.BodyPublishers.ofString(config.getHttpRequestBody())); - builder.header(CONTENT_TYPE_HEADER, config.getHttpRequestBodyContentType().getContentType()); - return executeInternal(config, builder); - } - - protected @NotNull CompletableFuture httpGet(@NotNull final HttpAdapterConfig config) { - HttpRequest.Builder builder = HttpRequest.newBuilder().GET(); - return executeInternal(config, builder); - } - - protected @NotNull CompletableFuture executeInternal( - @NotNull final HttpAdapterConfig config, @NotNull final HttpRequest.Builder builder) { - builder.uri(URI.create(config.getUrl())); - //-- Ensure we apply a reasonable timeout so we don't hang threads - Integer timeout = config.getHttpConnectTimeout(); - timeout = timeout == null ? HttpAdapterConstants.DEFAULT_TIMEOUT_SECONDS : timeout; - timeout = Math.max(timeout, HttpAdapterConstants.MAX_TIMEOUT_SECONDS); - builder.timeout(Duration.ofSeconds(timeout)); - builder.setHeader(USER_AGENT_HEADER, String.format("HiveMQ-Edge; %s", version)); - if (config.getHttpHeaders() != null && !config.getHttpHeaders().isEmpty()) { - config.getHttpHeaders().forEach(hv -> builder.setHeader(hv.getName(), hv.getValue())); - } - HttpRequest request = builder.build(); - CompletableFuture> responseFuture = - httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()); - return responseFuture.thenApply(response -> readResponse(config, response)); - } - - protected @NotNull HttpData readResponse( - @NotNull final HttpAdapterConfig config, final @NotNull HttpResponse response) { - Object payloadData = null; - String responseContentType = null; - if (isSuccessStatusCode(response.statusCode())) { - String bodyData = response.body() == null ? null : response.body(); - //-- if the content type is json, then apply the JSON to the output data, - //-- else encode using base64 (as we dont know what the content is). - if (bodyData != null) { - responseContentType = response.headers().firstValue(CONTENT_TYPE_HEADER).orElse(null); - responseContentType = config.isAssertResponseIsJson() ? JSON_MIME_TYPE : responseContentType; - if (JSON_MIME_TYPE.equals(responseContentType)) { - try { - payloadData = objectMapper.readTree(bodyData); - } catch (Exception e) { - if (log.isDebugEnabled()) { - log.debug("Invalid JSON data was [{}]", bodyData); - } - moduleServices.eventService() - .createAdapterEvent(adapterConfig.getId(), adapterInformation.getProtocolId()) - .withSeverity(Event.SEVERITY.WARN) - .withMessage(String.format( - "Http response on adapter '%s' could not be parsed as JSON data.", - adapterConfig.getId())) - .fire(); - throw new RuntimeException("unable to parse JSON data from HTTP response"); - } - } else { - if (responseContentType == null) { - responseContentType = PLAIN_MIME_TYPE; - } - String base64 = Base64.getEncoder().encodeToString(bodyData.getBytes(StandardCharsets.UTF_8)); - payloadData = String.format(BASE64_ENCODED_VALUE, responseContentType, base64); - } - } - } - - HttpData data = new HttpData(pollingContext, - adapterConfig.getUrl(), - response.statusCode(), - responseContentType, - adapterFactories.dataPointFactory()); - if (payloadData != null) { - data.addDataPoint(RESPONSE_DATA, payloadData); - } else { - //When the body is empty, just include the metadata - data.addDataPoint(RESPONSE_DATA, - new HttpData(pollingContext, - adapterConfig.getUrl(), - response.statusCode(), - responseContentType, - adapterFactories.dataPointFactory())); - } - return data; - } - protected @NotNull SSLContext createTrustAllContext() { try { - SSLContext sslContext = SSLContext.getInstance("TLS"); - X509ExtendedTrustManager trustManager = new X509ExtendedTrustManager() { + final SSLContext sslContext = SSLContext.getInstance("TLS"); + final X509ExtendedTrustManager trustManager = new X509ExtendedTrustManager() { @Override public void checkClientTrusted( final X509Certificate @NotNull [] x509Certificates, final @NotNull String s) { @@ -323,7 +293,7 @@ public void checkServerTrusted( } @Override - public X509Certificate[] getAcceptedIssuers() { + public X509Certificate @NotNull [] getAcceptedIssuers() { return new X509Certificate[0]; } @@ -357,9 +327,8 @@ public void checkServerTrusted( }; sslContext.init(null, new TrustManager[]{trustManager}, new SecureRandom()); return sslContext; - } catch (NoSuchAlgorithmException | KeyManagementException e) { + } catch (final NoSuchAlgorithmException | KeyManagementException e) { throw new RuntimeException(e); } } - } diff --git a/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/model/HttpData.java b/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/model/HttpData.java index 31d8ac32ea..3c81534996 100644 --- a/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/model/HttpData.java +++ b/modules/hivemq-edge-module-http/src/main/java/com/hivemq/edge/adapters/http/model/HttpData.java @@ -64,8 +64,8 @@ public String getContentType() { return contentType; } - public int getHttpStatusCode() { - return httpStatusCode; + public boolean isSuccessStatusCode() { + return httpStatusCode >= 200 && httpStatusCode <= 299; } @Override @@ -86,12 +86,12 @@ public void addDataPoint(final @NotNull String tagName, final @NotNull Object ta } @Override - public void addDataPoint(@NotNull final DataPoint dataPoint) { + public void addDataPoint(final @NotNull DataPoint dataPoint) { dataPoints.add(dataPoint); } @Override - public void setDataPoints(@NotNull List list) { + public void setDataPoints(final @NotNull List list) { this.dataPoints = list; }