Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added path support for HTTP source #2277

Merged
merged 2 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion data-prepper-plugins/http-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This is a source plugin that supports HTTP protocol. Currently ONLY support Json


## Usages
Currently, we are exposing `/log/ingest` URI path for http log ingestion. Example `.yaml` configuration:
To get started with HTTP source, create the following `pipeline.yaml` configuration:
```yaml
source:
http:
Expand All @@ -22,6 +22,7 @@ source:
## Configurations

* port (Optional) => An `int` between 0 and 65535 represents the port source is running on. Default is ```2021```.
* path (Optional) => A `string` which represents the URI path for log ingestion, and it should start with `/`. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name. Default value is `/log/ingest`.
* health_check_service (Optional) => A `boolean` that determines if a `/health` endpoint on the defined port will be home to a health check. Default is `false`
* unauthenticated_health_check (Optional) => A `boolean` that determines if the health endpoint will require authentication. This option is ignored if no authentication is defined. Default is `false`
* request_timeout (Optional) => An `int` larger than 0 represents request timeout in millis. Default is ```10_000```.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@
@DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class)
public class HTTPSource implements Source<Record<Log>> {
private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class);
private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}";
public static final String REGEX_HEALTH = "regex:^/(?!health$).*$";

private final HTTPSourceConfig sourceConfig;
private final CertificateProviderFactory certificateProviderFactory;
private final ArmeriaHttpAuthenticationProvider authenticationProvider;
private final String pipelineName;
private Server server;
private final PluginMetrics pluginMetrics;
private static final String HTTP_HEALTH_CHECK_PATH = "/health";
Expand All @@ -54,7 +56,8 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
final PipelineDescription pipelineDescription) {
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
this.pipelineName = pipelineDescription.getPipelineName();
this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig);
final PluginModel authenticationConfiguration = sourceConfig.getAuthentication();
final PluginSetting authenticationPluginSetting;

Expand All @@ -70,7 +73,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi
authenticationPluginSetting =
new PluginSetting(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap());
}
authenticationPluginSetting.setPipelineName(pipelineDescription.getPipelineName());
authenticationPluginSetting.setPipelineName(pipelineName);
authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting);
}

Expand Down Expand Up @@ -119,10 +122,11 @@ public void start(final Buffer<Record<Log>> buffer) {
final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy(
maxPendingRequests, blockingTaskExecutor.getQueue());
final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics);
// TODO: allow customization on URI path for log ingestion
sb.decorator(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));

final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName);
sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler));
final LogHTTPService logHTTPService = new LogHTTPService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics);
sb.annotatedService(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, logHTTPService);
sb.annotatedService(httpSourcePath, logHTTPService);

if (sourceConfig.hasHealthCheckService()) {
LOG.info("HTTP source health check is enabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class HTTPSourceConfig {
@Max(65535)
private int port = DEFAULT_PORT;

@JsonProperty("path")
private String path = DEFAULT_LOG_INGEST_URI;

@JsonProperty("request_timeout")
@Min(2)
private int requestTimeoutInMillis = DEFAULT_REQUEST_TIMEOUT_MS;
Expand Down Expand Up @@ -89,6 +92,11 @@ public boolean isSslCertAndKeyFileInS3() {
sslKeyFile.toLowerCase().startsWith(S3_PREFIX);
}

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path.startsWith("/");
}

@AssertTrue(message = "ssl_certificate_file cannot be a empty or null when ssl is enabled")
boolean isSslCertificateFileValid() {
if (ssl && !useAcmCertificateForSsl) {
Expand Down Expand Up @@ -131,6 +139,10 @@ public int getPort() {
return port;
}

public String getPath() {
return path;
}

public int getRequestTimeoutInMillis() {
return requestTimeoutInMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ void testDefault() {

// When/Then
assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getPort());
assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath());
assertEquals(HTTPSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS, sourceConfig.getRequestTimeoutInMillis());
assertEquals(HTTPSourceConfig.DEFAULT_THREAD_COUNT, sourceConfig.getThreadCount());
assertEquals(HTTPSourceConfig.DEFAULT_MAX_CONNECTION_COUNT, sourceConfig.getMaxConnectionCount());
Expand Down Expand Up @@ -226,6 +227,25 @@ void isAcmCertificateArnValid_should_return_true_if_ssl_is_true_and_acm_is_true_
}
}

@Test
void getPath_should_return_correct_path() throws NoSuchFieldException, IllegalAccessException {
final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig();

reflectivelySetField(objectUnderTest, "path", "/my/custom/path");

assertThat(objectUnderTest.isPathValid(), equalTo(true));
assertThat(objectUnderTest.getPath(), equalTo("/my/custom/path"));
}

@Test
void isPathValid_should_return_false_for_invalid_path() throws NoSuchFieldException, IllegalAccessException {
final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig();

reflectivelySetField(objectUnderTest, "path", "my/custom/path");

assertThat(objectUnderTest.isPathValid(), equalTo(false));
}

private void reflectivelySetField(final HTTPSourceConfig httpSourceConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException {
final Field field = HTTPSourceConfig.class.getDeclaredField(fieldName);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public void setUp() {

sourceConfig = mock(HTTPSourceConfig.class);
lenient().when(sourceConfig.getPort()).thenReturn(2021);
lenient().when(sourceConfig.getPath()).thenReturn(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI);
lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(10_000);
lenient().when(sourceConfig.getThreadCount()).thenReturn(200);
lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(500);
Expand Down Expand Up @@ -512,6 +513,7 @@ public void testServerStartCertFileSuccess() throws IOException {
void testHTTPSJsonResponse() {
reset(sourceConfig);
when(sourceConfig.getPort()).thenReturn(2021);
when(sourceConfig.getPath()).thenReturn(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI);
when(sourceConfig.getThreadCount()).thenReturn(200);
when(sourceConfig.getMaxConnectionCount()).thenReturn(500);
when(sourceConfig.getMaxPendingRequests()).thenReturn(1024);
Expand All @@ -536,6 +538,38 @@ void testHTTPSJsonResponse() {
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
}

@Test
void testHTTPSJsonResponse_with_custom_path_along_with_placeholder() {
reset(sourceConfig);
when(sourceConfig.getPort()).thenReturn(2021);
when(sourceConfig.getPath()).thenReturn("/${pipelineName}/test");
when(sourceConfig.getThreadCount()).thenReturn(200);
when(sourceConfig.getMaxConnectionCount()).thenReturn(500);
when(sourceConfig.getMaxPendingRequests()).thenReturn(1024);
when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(200);
when(sourceConfig.isSsl()).thenReturn(true);

when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE);
when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE);
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);

testBuffer = getBuffer();
HTTPSourceUnderTest.start(testBuffer);

final String path = "/" + TEST_PIPELINE_NAME + "/test";

WebClient.builder().factory(ClientFactory.insecure()).build().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTPS)
.authority("127.0.0.1:2021")
.method(HttpMethod.POST)
.path(path)
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.ofUtf8("[{\"log\": \"somelog\"}]"))
.aggregate()
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join();
}

@Test
public void testDoubleStart() {
// starting server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private void sendHttpRequestToSource(final int port, final HttpData httpData) {
.scheme(SessionProtocol.HTTP)
.authority(String.format("127.0.0.1:%d", port))
.method(HttpMethod.POST)
.path("/log/ingest")
.path("/grok-pipeline/logs")
.contentType(MediaType.JSON_UTF_8)
.build(),
httpData)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
grok-pipeline:
source:
http:
path: "/${pipelineName}/logs"
processor:
- grok:
match:
Expand Down