Skip to content

Commit

Permalink
refactor HttpProtocolAdapter wip#1
Browse files Browse the repository at this point in the history
  • Loading branch information
yannick committed Jul 19, 2024
1 parent 02d78a8 commit 485ac15
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class HttpAdapterConfig implements ProtocolAdapterConfig {
private static final @NotNull String ID_REGEX = "^([a-zA-Z_0-9-_])*$";

public static final @NotNull String HTML_MIME_TYPE = "text/html";
public static final @NotNull String PLAIN_MIME_TYPE = "text/plain";
public static final @NotNull String PLAIN_MIME_TYPE = "text/plain";
public static final @NotNull String JSON_MIME_TYPE = "application/json";
public static final String XML_MIME_TYPE = "application/xml";
public static final String YAML_MIME_TYPE = "application/yaml";
Expand Down Expand Up @@ -100,8 +100,10 @@ public enum HttpContentType {
private int maxPollingErrorsBeforeRemoval = 10;

@JsonProperty("url")
@ModuleConfigField(title = "URL", description = "The url of the http request you would like to make",
format = ModuleConfigField.FieldType.URI, required = true)
@ModuleConfigField(title = "URL",
description = "The url of the http request you would like to make",
format = ModuleConfigField.FieldType.URI,
required = true)
private @NotNull String url;

@JsonProperty(value = "destination", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class HttpProtocolAdapter implements PollingProtocolAdapter<HttpPollingCo
private static final @NotNull String CONTENT_TYPE_HEADER = "Content-Type";
private static final @NotNull String BASE64_ENCODED_VALUE = "data:%s;base64,%s";
private static final @NotNull String USER_AGENT_HEADER = "User-Agent";
private static final @NotNull String RESPONSE_DATA = "httpResponseData";

private final @NotNull ProtocolAdapterInformation adapterInformation;
private final @NotNull HttpAdapterConfig adapterConfig;
Expand All @@ -83,9 +84,7 @@ public class HttpProtocolAdapter implements PollingProtocolAdapter<HttpPollingCo
private final @NotNull HttpPollingContextImpl pollingContext;

private volatile @Nullable HttpClient httpClient = null;
protected @NotNull
final Object lock = new Object();
static final @NotNull String RESPONSE_DATA = "httpResponseData";
private final @NotNull Object lock = new Object();
private final @NotNull ObjectMapper objectMapper = new ObjectMapper();

public HttpProtocolAdapter(
Expand All @@ -107,24 +106,34 @@ public HttpProtocolAdapter(

@Override
public void start(
@NotNull final ProtocolAdapterStartInput input, @NotNull final ProtocolAdapterStartOutput output) {
final @NotNull ProtocolAdapterStartInput input, final @NotNull ProtocolAdapterStartOutput output) {
try {
protocolAdapterState.setConnectionStatus(STATELESS);
if (httpClient == null) {
synchronized (lock) {
if (httpClient == null) {
initializeHttpRequest(adapterConfig);
synchronized (lock) {
if (httpClient == null) {
if (HttpUtils.validHttpOrHttpsUrl(adapterConfig.getUrl())) {
//initialize client
final HttpClient.Builder builder = HttpClient.newBuilder();
builder.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(adapterConfig.getHttpConnectTimeout()));
if (adapterConfig.isAllowUntrustedCertificates()) {
builder.sslContext(createTrustAllContext());
}
httpClient = builder.build();
} else {
protocolAdapterState.setErrorConnectionStatus(null, "Invalid URL supplied");
}
}
}
output.startedSuccessfully();
} catch (Exception e) {
} catch (final Exception e) {
output.failStart(e, "Unable to start http protocol adapter.");
}
}

@Override
public void stop(@NotNull final ProtocolAdapterStopInput input, @NotNull final ProtocolAdapterStopOutput output) {
public void stop(final @NotNull ProtocolAdapterStopInput input, final @NotNull ProtocolAdapterStopOutput output) {
httpClient = null;
output.stoppedSuccessfully();
}
Expand All @@ -137,23 +146,37 @@ public void stop(@NotNull final ProtocolAdapterStopInput input, @NotNull final P

@Override
public void poll(
@NotNull final PollingInput pollingInput, @NotNull final PollingOutput pollingOutput) {
final @NotNull PollingInput pollingInput, final @NotNull PollingOutput pollingOutput) {

if (httpClient == null) {
pollingOutput.fail(new ProtocolAdapterException(), "No response was created, because the client is null.");
return;
}

final CompletableFuture<HttpData> dataFuture;
final HttpRequest.Builder builder = HttpRequest.newBuilder();
builder.uri(URI.create(adapterConfig.getUrl()));
//-- Ensure we apply a reasonable timeout so we don't hang threads
Integer timeout = adapterConfig.getHttpConnectTimeout();
timeout = timeout == null ? HttpAdapterConstants.DEFAULT_TIMEOUT_SECONDS : timeout;
timeout = Math.max(timeout, HttpAdapterConstants.MAX_TIMEOUT_SECONDS);
builder.timeout(Duration.ofSeconds(timeout));
builder.setHeader(USER_AGENT_HEADER, String.format("HiveMQ-Edge; %s", version));

if (adapterConfig.getHttpHeaders() != null && !adapterConfig.getHttpHeaders().isEmpty()) {
adapterConfig.getHttpHeaders().forEach(hv -> builder.setHeader(hv.getName(), hv.getValue()));
}

switch (adapterConfig.getHttpRequestMethod()) {
case GET:
dataFuture = httpGet(adapterConfig);
builder.GET();
break;
case POST:
dataFuture = httpPost(adapterConfig);
builder.POST(HttpRequest.BodyPublishers.ofString(adapterConfig.getHttpRequestBody()));
builder.header(CONTENT_TYPE_HEADER, adapterConfig.getHttpRequestBodyContentType().getContentType());
break;
case PUT:
dataFuture = httpPut(adapterConfig);
builder.PUT(HttpRequest.BodyPublishers.ofString(adapterConfig.getHttpRequestBody()));
builder.header(CONTENT_TYPE_HEADER, adapterConfig.getHttpRequestBodyContentType().getContentType());
break;
default:
pollingOutput.fail(new IllegalStateException("Unexpected value: " +
Expand All @@ -163,16 +186,72 @@ public void poll(
return;
}

final CompletableFuture<HttpResponse<String>> responseFuture =
httpClient.sendAsync(builder.build(), HttpResponse.BodyHandlers.ofString());

final CompletableFuture<HttpData> dataFuture = responseFuture.thenApply(httpResponse -> {
Object payloadData = null;
String responseContentType = null;

if (isSuccessStatusCode(httpResponse.statusCode())) {
final String bodyData = httpResponse.body();
//-- if the content type is json, then apply the JSON to the output data,
//-- else encode using base64 (as we dont know what the content is).
if (bodyData != null) {
responseContentType = httpResponse.headers().firstValue(CONTENT_TYPE_HEADER).orElse(null);
responseContentType = adapterConfig.isAssertResponseIsJson() ? JSON_MIME_TYPE : responseContentType;
if (JSON_MIME_TYPE.equals(responseContentType)) {
try {
payloadData = objectMapper.readTree(bodyData);
} catch (final Exception e) {
if (log.isDebugEnabled()) {
log.debug("Invalid JSON data was [{}]", bodyData);
}
moduleServices.eventService()
.createAdapterEvent(adapterConfig.getId(), adapterInformation.getProtocolId())
.withSeverity(Event.SEVERITY.WARN)
.withMessage(String.format(
"Http response on adapter '%s' could not be parsed as JSON data.",
adapterConfig.getId()))
.fire();
throw new RuntimeException("unable to parse JSON data from HTTP response");
}
} else {
if (responseContentType == null) {
responseContentType = PLAIN_MIME_TYPE;
}
final String base64 = Base64.getEncoder().encodeToString(bodyData.getBytes(StandardCharsets.UTF_8));
payloadData = String.format(BASE64_ENCODED_VALUE, responseContentType, base64);
}
}
}

final HttpData data = new HttpData(pollingContext,
adapterConfig.getUrl(),
httpResponse.statusCode(),
responseContentType,
adapterFactories.dataPointFactory());
//When the body is empty, just include the metadata
if (payloadData != null) {
data.addDataPoint(RESPONSE_DATA, payloadData);
}
return data;
});

dataFuture.whenComplete((data, throwable) -> {
if (throwable != null) {
pollingOutput.fail(throwable, null);
return;
}
boolean publishData = isSuccessStatusCode(data.getHttpStatusCode()) ||
!adapterConfig.isHttpPublishSuccessStatusCodeOnly();
protocolAdapterState.setConnectionStatus(isSuccessStatusCode(data.getHttpStatusCode()) ? STATELESS : ERROR);
if (publishData) {
for (DataPoint dataPoint : data.getDataPoints()) {

if (data.isSuccessStatusCode()) {
protocolAdapterState.setConnectionStatus(STATELESS);
} else {
protocolAdapterState.setConnectionStatus(ERROR);
}

if (data.isSuccessStatusCode() || !adapterConfig.isHttpPublishSuccessStatusCodeOnly()) {
for (final DataPoint dataPoint : data.getDataPoints()) {
pollingOutput.addDataPoint(dataPoint);
}
}
Expand All @@ -195,123 +274,14 @@ public int getMaxPollingErrorsBeforeRemoval() {
return adapterConfig.getMaxPollingErrorsBeforeRemoval();
}

protected void initializeHttpRequest(@NotNull final HttpAdapterConfig config) {
if (HttpUtils.validHttpOrHttpsUrl(config.getUrl())) {
//initialize client
HttpClient.Builder builder = HttpClient.newBuilder();
builder.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(config.getHttpConnectTimeout()));
if (config.isAllowUntrustedCertificates()) {
builder.sslContext(createTrustAllContext());
}
httpClient = builder.build();
} else {
protocolAdapterState.setErrorConnectionStatus(null, "Invalid URL supplied");
}
}

private static boolean isSuccessStatusCode(final int statusCode) {
return statusCode >= 200 && statusCode <= 299;
}

protected @NotNull CompletableFuture<HttpData> httpPut(@NotNull final HttpAdapterConfig config) {
HttpRequest.Builder builder =
HttpRequest.newBuilder().PUT(HttpRequest.BodyPublishers.ofString(config.getHttpRequestBody()));
builder.header(CONTENT_TYPE_HEADER, config.getHttpRequestBodyContentType().getContentType());
return executeInternal(config, builder);
}

protected @NotNull CompletableFuture<HttpData> httpPost(@NotNull final HttpAdapterConfig config) {
HttpRequest.Builder builder =
HttpRequest.newBuilder().POST(HttpRequest.BodyPublishers.ofString(config.getHttpRequestBody()));
builder.header(CONTENT_TYPE_HEADER, config.getHttpRequestBodyContentType().getContentType());
return executeInternal(config, builder);
}

protected @NotNull CompletableFuture<HttpData> httpGet(@NotNull final HttpAdapterConfig config) {
HttpRequest.Builder builder = HttpRequest.newBuilder().GET();
return executeInternal(config, builder);
}

protected @NotNull CompletableFuture<HttpData> executeInternal(
@NotNull final HttpAdapterConfig config, @NotNull final HttpRequest.Builder builder) {
builder.uri(URI.create(config.getUrl()));
//-- Ensure we apply a reasonable timeout so we don't hang threads
Integer timeout = config.getHttpConnectTimeout();
timeout = timeout == null ? HttpAdapterConstants.DEFAULT_TIMEOUT_SECONDS : timeout;
timeout = Math.max(timeout, HttpAdapterConstants.MAX_TIMEOUT_SECONDS);
builder.timeout(Duration.ofSeconds(timeout));
builder.setHeader(USER_AGENT_HEADER, String.format("HiveMQ-Edge; %s", version));
if (config.getHttpHeaders() != null && !config.getHttpHeaders().isEmpty()) {
config.getHttpHeaders().forEach(hv -> builder.setHeader(hv.getName(), hv.getValue()));
}
HttpRequest request = builder.build();
CompletableFuture<HttpResponse<String>> responseFuture =
httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
return responseFuture.thenApply(response -> readResponse(config, response));
}

protected @NotNull HttpData readResponse(
@NotNull final HttpAdapterConfig config, final @NotNull HttpResponse<String> response) {
Object payloadData = null;
String responseContentType = null;
if (isSuccessStatusCode(response.statusCode())) {
String bodyData = response.body() == null ? null : response.body();
//-- if the content type is json, then apply the JSON to the output data,
//-- else encode using base64 (as we dont know what the content is).
if (bodyData != null) {
responseContentType = response.headers().firstValue(CONTENT_TYPE_HEADER).orElse(null);
responseContentType = config.isAssertResponseIsJson() ? JSON_MIME_TYPE : responseContentType;
if (JSON_MIME_TYPE.equals(responseContentType)) {
try {
payloadData = objectMapper.readTree(bodyData);
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("Invalid JSON data was [{}]", bodyData);
}
moduleServices.eventService()
.createAdapterEvent(adapterConfig.getId(), adapterInformation.getProtocolId())
.withSeverity(Event.SEVERITY.WARN)
.withMessage(String.format(
"Http response on adapter '%s' could not be parsed as JSON data.",
adapterConfig.getId()))
.fire();
throw new RuntimeException("unable to parse JSON data from HTTP response");
}
} else {
if (responseContentType == null) {
responseContentType = PLAIN_MIME_TYPE;
}
String base64 = Base64.getEncoder().encodeToString(bodyData.getBytes(StandardCharsets.UTF_8));
payloadData = String.format(BASE64_ENCODED_VALUE, responseContentType, base64);
}
}
}

HttpData data = new HttpData(pollingContext,
adapterConfig.getUrl(),
response.statusCode(),
responseContentType,
adapterFactories.dataPointFactory());
if (payloadData != null) {
data.addDataPoint(RESPONSE_DATA, payloadData);
} else {
//When the body is empty, just include the metadata
data.addDataPoint(RESPONSE_DATA,
new HttpData(pollingContext,
adapterConfig.getUrl(),
response.statusCode(),
responseContentType,
adapterFactories.dataPointFactory()));
}
return data;
}

protected @NotNull SSLContext createTrustAllContext() {
try {
SSLContext sslContext = SSLContext.getInstance("TLS");
X509ExtendedTrustManager trustManager = new X509ExtendedTrustManager() {
final SSLContext sslContext = SSLContext.getInstance("TLS");
final X509ExtendedTrustManager trustManager = new X509ExtendedTrustManager() {
@Override
public void checkClientTrusted(
final X509Certificate @NotNull [] x509Certificates, final @NotNull String s) {
Expand All @@ -323,7 +293,7 @@ public void checkServerTrusted(
}

@Override
public X509Certificate[] getAcceptedIssuers() {
public X509Certificate @NotNull [] getAcceptedIssuers() {
return new X509Certificate[0];
}

Expand Down Expand Up @@ -357,9 +327,8 @@ public void checkServerTrusted(
};
sslContext.init(null, new TrustManager[]{trustManager}, new SecureRandom());
return sslContext;
} catch (NoSuchAlgorithmException | KeyManagementException e) {
} catch (final NoSuchAlgorithmException | KeyManagementException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public String getContentType() {
return contentType;
}

public int getHttpStatusCode() {
return httpStatusCode;
public boolean isSuccessStatusCode() {
return httpStatusCode >= 200 && httpStatusCode <= 299;
}

@Override
Expand All @@ -86,12 +86,12 @@ public void addDataPoint(final @NotNull String tagName, final @NotNull Object ta
}

@Override
public void addDataPoint(@NotNull final DataPoint dataPoint) {
public void addDataPoint(final @NotNull DataPoint dataPoint) {
dataPoints.add(dataPoint);
}

@Override
public void setDataPoints(@NotNull List<DataPoint> list) {
public void setDataPoints(final @NotNull List<DataPoint> list) {
this.dataPoints = list;
}

Expand Down

0 comments on commit 485ac15

Please sign in to comment.