Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: add metrics into HTTP source #380

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions data-prepper-plugins/http-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This is a source plugin that supports HTTP protocol. Currently ONLY support Json


## Usages
Example `.yaml` configuration:
Currently, we are exposing `/log/ingest` URI path for http log ingestion. Example `.yaml` configuration:
```
source:
- http:
Expand All @@ -27,7 +27,18 @@ source:

## Metrics

TBD
### Counter
- `requestsReceived`: measures total number of requests received by `/log/ingest` endpoint.
- `requestsRejected`: measures total number of requests rejected by HTTP source plugin.
- `successRequests`: measures total number of requests successfully processed by HTTP source plugin.
- `badRequests`: measures total number of requests with invalid content type or format processed by HTTP source plugin.
- `requestTimeouts`: measures total number of requests that time out in the HTTP source server.

### Timer
- `requestProcessDuration`: measures latency of requests processed by the HTTP source plugin in seconds.

### Distribution Summary
- `payloadSize`: measures the distribution of incoming requests payload sizes in bytes.

## Developer Guide
This plugin is compatible with Java 14. See
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/http-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies {
implementation project(':data-prepper-plugins:common')
implementation "com.linecorp.armeria:armeria:1.9.2"
implementation "commons-io:commons-io:2.11.0"
testImplementation project(':data-prepper-api').sourceSets.test.output
testImplementation 'org.assertj:assertj-core:3.20.2'
testImplementation "org.hamcrest:hamcrest:2.2"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.amazon.dataprepper.plugins.source.loghttp;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.PluginType;
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.buffer.Buffer;
Expand Down Expand Up @@ -39,10 +40,12 @@ public class HTTPSource implements Source<Record<String>> {
private final HTTPSourceConfig sourceConfig;
private final CertificateProviderFactory certificateProviderFactory;
private Server server;
private final PluginMetrics pluginMetrics;

public HTTPSource(final PluginSetting pluginSetting) {
sourceConfig = HTTPSourceConfig.buildConfig(pluginSetting);
certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
}

@Override
Expand Down Expand Up @@ -75,10 +78,10 @@ public void start(Buffer<Record<String>> buffer) {
final int maxPendingRequests = sourceConfig.getMaxPendingRequests();
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
maxPendingRequests, blockingTaskExecutor.getQueue());
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests);
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);
// TODO: allow customization on URI path for log ingestion
sb.decorator(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
final LogHTTPService logHTTPService = new LogHTTPService(requestTimeoutInMillis, buffer);
final LogHTTPService logHTTPService = new LogHTTPService(requestTimeoutInMillis, buffer, pluginMetrics);
sb.annotatedService(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, logHTTPService);
// TODO: attach HealthCheckService

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@

package com.amazon.dataprepper.plugins.source.loghttp;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.amazon.dataprepper.model.buffer.Buffer;
import com.amazon.dataprepper.model.record.Record;
import com.amazon.dataprepper.plugins.source.loghttp.codec.JsonCodec;
import com.linecorp.armeria.common.AggregatedHttpRequest;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.annotation.Blocking;
import com.linecorp.armeria.server.annotation.Post;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;

import java.io.IOException;
import java.util.List;
Expand All @@ -30,37 +35,65 @@
*/
@Blocking
public class LogHTTPService {
public static final String REQUESTS_RECEIVED = "requestsReceived";
public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String SUCCESS_REQUESTS = "successRequests";
public static final String BAD_REQUESTS = "badRequests";
public static final String PAYLOAD_SIZE = "payloadSize";
public static final String REQUEST_PROCESS_DURATION = "requestProcessDuration";

// TODO: support other data-types as request body, e.g. json_lines, msgpack
private final JsonCodec jsonCodec = new JsonCodec();
private final Buffer<Record<String>> buffer;
private final int bufferWriteTimeoutInMillis;
private final Counter requestsReceivedCounter;
private final Counter requestTimeoutsCounter;
private final Counter successRequestsCounter;
private final Counter badRequestsCounter;
private final DistributionSummary payloadSizeSummary;
private final Timer requestProcessDuration;

public LogHTTPService(final int bufferWriteTimeoutInMillis, final Buffer<Record<String>> buffer) {
public LogHTTPService(final int bufferWriteTimeoutInMillis,
final Buffer<Record<String>> buffer,
final PluginMetrics pluginMetrics) {
this.buffer = buffer;
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;

requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED);
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
payloadSizeSummary = pluginMetrics.summary(PAYLOAD_SIZE);
requestProcessDuration = pluginMetrics.timer(REQUEST_PROCESS_DURATION);
}

@Post
public HttpResponse doPost(final AggregatedHttpRequest aggregatedHttpRequest) {
return processRequest(aggregatedHttpRequest);
return requestProcessDuration.record(() -> processRequest(aggregatedHttpRequest));
}

private HttpResponse processRequest(final AggregatedHttpRequest aggregatedHttpRequest) {
requestsReceivedCounter.increment();

List<String> jsonList;
final HttpData content = aggregatedHttpRequest.content();
payloadSizeSummary.record(content.length());
try {
jsonList = jsonCodec.parse(aggregatedHttpRequest.content());
jsonList = jsonCodec.parse(content);
} catch (IOException e) {
badRequestsCounter.increment();
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.ANY_TYPE, "Bad request data format. Needs to be json array.");
}
for (String json: jsonList) {
try {
// TODO: switch to new API writeAll once ready
buffer.write(new Record<>(json), bufferWriteTimeoutInMillis);
} catch (TimeoutException e) {
requestTimeoutsCounter.increment();
return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT, MediaType.ANY_TYPE, e.getMessage());
}
}
successRequestsCounter.increment();
return HttpResponse.of(HttpStatus.OK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,34 @@

package com.amazon.dataprepper.plugins.source.loghttp;

import com.amazon.dataprepper.metrics.PluginMetrics;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.throttling.ThrottlingRejectHandler;
import io.micrometer.core.instrument.Counter;

import javax.annotation.Nullable;

public class LogThrottlingRejectHandler implements ThrottlingRejectHandler<HttpRequest, HttpResponse> {
public static final String REQUESTS_REJECTED = "requestsRejected";

private final int maxPendingRequests;
private final Counter rejectedRequestsCounter;

public LogThrottlingRejectHandler(final int maxPendingRequests) {
public LogThrottlingRejectHandler(final int maxPendingRequests, final PluginMetrics pluginMetrics) {
this.maxPendingRequests = maxPendingRequests;

rejectedRequestsCounter = pluginMetrics.counter(REQUESTS_REJECTED);
}

@Override
public HttpResponse handleRejected(final Service<HttpRequest, HttpResponse> delegate, final ServiceRequestContext ctx,
final HttpRequest req, final @Nullable Throwable cause) throws Exception {
rejectedRequestsCounter.increment();
return HttpResponse.of(HttpStatus.TOO_MANY_REQUESTS, MediaType.ANY_TYPE,
"The number of pending requests in the work queue reaches max_pending_requests:%d. Please retry later",
maxPendingRequests
Expand Down
Loading