Skip to content

Commit

Permalink
Added path support for HTTP source (#2277)
Browse files Browse the repository at this point in the history
* Added path support for HTTP source

Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed authored Feb 27, 2023
1 parent 88401f2 commit e291070
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 7 deletions.
3 changes: 2 additions & 1 deletion data-prepper-plugins/http-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This is a source plugin that supports HTTP protocol. Currently ONLY support Json


## Usages
Currently, we are exposing `/log/ingest` URI path for http log ingestion. Example `.yaml` configuration:
To get started with HTTP source, create the following `pipeline.yaml` configuration:
```yaml
source:
http:
Expand All @@ -22,6 +22,7 @@ source:
## Configurations

* port (Optional) => An `int` between 0 and 65535 represents the port source is running on. Default is ```2021```.
* path (Optional) => A `string` which represents the URI path for log ingestion, and it should start with `/`. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name. Default value is `/log/ingest`.
* health_check_service (Optional) => A `boolean` that determines if a `/health` endpoint on the defined port will be home to a health check. Default is `false`
* unauthenticated_health_check (Optional) => A `boolean` that determines if the health endpoint will require authentication. This option is ignored if no authentication is defined. Default is `false`
* request_timeout (Optional) => An `int` larger than 0 represents request timeout in millis. Default is ```10_000```.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@
@DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class)
public class HTTPSource implements Source<Record<Log>> {
private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}";
public static final String REGEX_HEALTH = "regex:^/(?!health$).*$";

private final HTTPSourceConfig sourceConfig;
private final CertificateProviderFactory certificateProviderFactory;
private final ArmeriaHttpAuthenticationProvider authenticationProvider;
private final String pipelineName;
private Server server;
private final PluginMetrics pluginMetrics;
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
Expand All @@ -54,7 +56,8 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
final PipelineDescription pipelineDescription) {
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
this.pipelineName = pipelineDescription.getPipelineName();
this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
final PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
final PluginSetting authenticationPluginSetting;

Expand All @@ -70,7 +73,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
authenticationPluginSetting =
new PluginSetting(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap());
}
authenticationPluginSetting.setPipelineName(pipelineDescription.getPipelineName());
authenticationPluginSetting.setPipelineName(pipelineName);
authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting);
}

Expand Down Expand Up @@ -119,10 +122,11 @@ public void start(final Buffer<Record<Log>> buffer) {
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
maxPendingRequests, blockingTaskExecutor.getQueue());
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);
// TODO: allow customization on URI path for log ingestion
sb.decorator(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));

final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
final LogHTTPService logHTTPService = new LogHTTPService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics);
sb.annotatedService(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, logHTTPService);
sb.annotatedService(httpSourcePath, logHTTPService);

if (sourceConfig.hasHealthCheckService()) {
LOG.info("HTTP source health check is enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class HTTPSourceConfig {
@Max(65535)
private int port = DEFAULT_PORT;

@JsonProperty("path")
private String path = DEFAULT_LOG_INGEST_URI;

@JsonProperty("request_timeout")
@Min(2)
private int requestTimeoutInMillis = DEFAULT_REQUEST_TIMEOUT_MS;
Expand Down Expand Up @@ -89,6 +92,11 @@ public boolean isSslCertAndKeyFileInS3() {
sslKeyFile.toLowerCase().startsWith(S3_PREFIX);
}

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path.startsWith("/");
}

@AssertTrue(message = "ssl_certificate_file cannot be a empty or null when ssl is enabled")
boolean isSslCertificateFileValid() {
if (ssl && !useAcmCertificateForSsl) {
Expand Down Expand Up @@ -131,6 +139,10 @@ public int getPort() {
return port;
}

public String getPath() {
return path;
}

public int getRequestTimeoutInMillis() {
return requestTimeoutInMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void testDefault() {

// When/Then
assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getPort());
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath());
assertEquals(HTTPSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS, sourceConfig.getRequestTimeoutInMillis());
assertEquals(HTTPSourceConfig.DEFAULT_THREAD_COUNT, sourceConfig.getThreadCount());
assertEquals(HTTPSourceConfig.DEFAULT_MAX_CONNECTION_COUNT, sourceConfig.getMaxConnectionCount());
Expand Down Expand Up @@ -226,6 +227,25 @@ void isAcmCertificateArnValid_should_return_true_if_ssl_is_true_and_acm_is_true_
}
}

@Test
void getPath_should_return_correct_path() throws NoSuchFieldException, IllegalAccessException {
final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig();

reflectivelySetField(objectUnderTest, "path", "/my/custom/path");

assertThat(objectUnderTest.isPathValid(), equalTo(true));
assertThat(objectUnderTest.getPath(), equalTo("/my/custom/path"));
}

@Test
void isPathValid_should_return_false_for_invalid_path() throws NoSuchFieldException, IllegalAccessException {
final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig();

reflectivelySetField(objectUnderTest, "path", "my/custom/path");

assertThat(objectUnderTest.isPathValid(), equalTo(false));
}

private void reflectivelySetField(final HTTPSourceConfig httpSourceConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
final Field field = HTTPSourceConfig.class.getDeclaredField(fieldName);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public void setUp() {

sourceConfig = mock(HTTPSourceConfig.class);
lenient().when(sourceConfig.getPort()).thenReturn(2021);
lenient().when(sourceConfig.getPath()).thenReturn(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI);
lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(10_000);
lenient().when(sourceConfig.getThreadCount()).thenReturn(200);
lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(500);
Expand Down Expand Up @@ -512,6 +513,7 @@ public void testServerStartCertFileSuccess() throws IOException {
void testHTTPSJsonResponse() {
reset(sourceConfig);
when(sourceConfig.getPort()).thenReturn(2021);
when(sourceConfig.getPath()).thenReturn(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI);
when(sourceConfig.getThreadCount()).thenReturn(200);
when(sourceConfig.getMaxConnectionCount()).thenReturn(500);
when(sourceConfig.getMaxPendingRequests()).thenReturn(1024);
Expand All @@ -536,6 +538,38 @@ void testHTTPSJsonResponse() {
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
}

@Test
void testHTTPSJsonResponse_with_custom_path_along_with_placeholder() {
reset(sourceConfig);
when(sourceConfig.getPort()).thenReturn(2021);
when(sourceConfig.getPath()).thenReturn("/${pipelineName}/test");
when(sourceConfig.getThreadCount()).thenReturn(200);
when(sourceConfig.getMaxConnectionCount()).thenReturn(500);
when(sourceConfig.getMaxPendingRequests()).thenReturn(1024);
when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(200);
when(sourceConfig.isSsl()).thenReturn(true);

when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE);
when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE);
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);

testBuffer = getBuffer();
HTTPSourceUnderTest.start(testBuffer);

final String path = "/" + TEST_PIPELINE_NAME + "/test";

WebClient.builder().factory(ClientFactory.insecure()).build().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTPS)
.authority("127.0.0.1:2021")
.method(HttpMethod.POST)
.path(path)
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.ofUtf8("[{\"log\": \"somelog\"}]"))
.aggregate()
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
}

@Test
public void testDoubleStart() {
// starting server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private void sendHttpRequestToSource(final int port, final HttpData httpData) {
.scheme(SessionProtocol.HTTP)
.authority(String.format("127.0.0.1:%d", port))
.method(HttpMethod.POST)
.path("/log/ingest")
.path("/grok-pipeline/logs")
.contentType(MediaType.JSON_UTF_8)
.build(),
httpData)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
grok-pipeline:
source:
http:
path: "/${pipelineName}/logs"
processor:
- grok:
match:
Expand Down

0 comments on commit e291070

Please sign in to comment.