diff --git a/data-prepper-plugins/http-source/README.md b/data-prepper-plugins/http-source/README.md index 19b6a98397..50087de671 100644 --- a/data-prepper-plugins/http-source/README.md +++ b/data-prepper-plugins/http-source/README.md @@ -5,7 +5,7 @@ This is a source plugin that supports HTTP protocol. Currently ONLY support Json ## Usages -Currently, we are exposing `/log/ingest` URI path for http log ingestion. Example `.yaml` configuration: +To get started with HTTP source, create the following `pipeline.yaml` configuration: ```yaml source: http: @@ -22,6 +22,7 @@ source: ## Configurations * port (Optional) => An `int` between 0 and 65535 represents the port source is running on. Default is ```2021```. +* path (Optional) => A `string` which represents the URI path for log ingestion, and it should start with `/`. Path can contain `${pipelineName}` placeholder which will be replaced with pipeline name. Default value is `/log/ingest`. * health_check_service (Optional) => A `boolean` that determines if a `/health` endpoint on the defined port will be home to a health check. Default is `false` * unauthenticated_health_check (Optional) => A `boolean` that determines if the health endpoint will require authentication. This option is ignored if no authentication is defined. Default is `false` * request_timeout (Optional) => An `int` larger than 0 represents request timeout in millis. Default is ```10_000```. diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java index efe025aecb..a98198480c 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSource.java @@ -40,11 +40,13 @@ @DataPrepperPlugin(name = "http", pluginType = Source.class, pluginConfigurationType = HTTPSourceConfig.class) public class HTTPSource implements Source> { private static final Logger LOG = LoggerFactory.getLogger(HTTPSource.class); + private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}"; public static final String REGEX_HEALTH = "regex:^/(?!health$).*$"; private final HTTPSourceConfig sourceConfig; private final CertificateProviderFactory certificateProviderFactory; private final ArmeriaHttpAuthenticationProvider authenticationProvider; + private final String pipelineName; private Server server; private final PluginMetrics pluginMetrics; private static final String HTTP_HEALTH_CHECK_PATH = "/health"; @@ -54,7 +56,8 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi final PipelineDescription pipelineDescription) { this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; - certificateProviderFactory = new CertificateProviderFactory(sourceConfig); + this.pipelineName = pipelineDescription.getPipelineName(); + this.certificateProviderFactory = new CertificateProviderFactory(sourceConfig); final PluginModel authenticationConfiguration = sourceConfig.getAuthentication(); final PluginSetting authenticationPluginSetting; @@ -70,7 +73,7 @@ public HTTPSource(final HTTPSourceConfig sourceConfig, final PluginMetrics plugi authenticationPluginSetting = new PluginSetting(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, Collections.emptyMap()); } - authenticationPluginSetting.setPipelineName(pipelineDescription.getPipelineName()); + authenticationPluginSetting.setPipelineName(pipelineName); authenticationProvider = pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, authenticationPluginSetting); } @@ -119,10 +122,11 @@ public void start(final Buffer> buffer) { final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy( maxPendingRequests, blockingTaskExecutor.getQueue()); final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler(maxPendingRequests, pluginMetrics); - // TODO: allow customization on URI path for log ingestion - sb.decorator(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); + + final String httpSourcePath = sourceConfig.getPath().replace(PIPELINE_NAME_PLACEHOLDER, pipelineName); + sb.decorator(httpSourcePath, ThrottlingService.newDecorator(logThrottlingStrategy, logThrottlingRejectHandler)); final LogHTTPService logHTTPService = new LogHTTPService(sourceConfig.getBufferTimeoutInMillis(), buffer, pluginMetrics); - sb.annotatedService(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, logHTTPService); + sb.annotatedService(httpSourcePath, logHTTPService); if (sourceConfig.hasHealthCheckService()) { LOG.info("HTTP source health check is enabled"); diff --git a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java index 88c6806153..f93275586a 100644 --- a/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java +++ b/data-prepper-plugins/http-source/src/main/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfig.java @@ -35,6 +35,9 @@ public class HTTPSourceConfig { @Max(65535) private int port = DEFAULT_PORT; + @JsonProperty("path") + private String path = DEFAULT_LOG_INGEST_URI; + @JsonProperty("request_timeout") @Min(2) private int requestTimeoutInMillis = DEFAULT_REQUEST_TIMEOUT_MS; @@ -89,6 +92,11 @@ public boolean isSslCertAndKeyFileInS3() { sslKeyFile.toLowerCase().startsWith(S3_PREFIX); } + @AssertTrue(message = "path should start with /") + boolean isPathValid() { + return path.startsWith("/"); + } + @AssertTrue(message = "ssl_certificate_file cannot be a empty or null when ssl is enabled") boolean isSslCertificateFileValid() { if (ssl && !useAcmCertificateForSsl) { @@ -131,6 +139,10 @@ public int getPort() { return port; } + public String getPath() { + return path; + } + public int getRequestTimeoutInMillis() { return requestTimeoutInMillis; } diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java index 6f19d6bf87..ce1bceb25a 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceConfigTest.java @@ -24,6 +24,7 @@ void testDefault() { // When/Then assertEquals(HTTPSourceConfig.DEFAULT_PORT, sourceConfig.getPort()); + assertEquals(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI, sourceConfig.getPath()); assertEquals(HTTPSourceConfig.DEFAULT_REQUEST_TIMEOUT_MS, sourceConfig.getRequestTimeoutInMillis()); assertEquals(HTTPSourceConfig.DEFAULT_THREAD_COUNT, sourceConfig.getThreadCount()); assertEquals(HTTPSourceConfig.DEFAULT_MAX_CONNECTION_COUNT, sourceConfig.getMaxConnectionCount()); @@ -226,6 +227,25 @@ void isAcmCertificateArnValid_should_return_true_if_ssl_is_true_and_acm_is_true_ } } + @Test + void getPath_should_return_correct_path() throws NoSuchFieldException, IllegalAccessException { + final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); + + reflectivelySetField(objectUnderTest, "path", "/my/custom/path"); + + assertThat(objectUnderTest.isPathValid(), equalTo(true)); + assertThat(objectUnderTest.getPath(), equalTo("/my/custom/path")); + } + + @Test + void isPathValid_should_return_false_for_invalid_path() throws NoSuchFieldException, IllegalAccessException { + final HTTPSourceConfig objectUnderTest = new HTTPSourceConfig(); + + reflectivelySetField(objectUnderTest, "path", "my/custom/path"); + + assertThat(objectUnderTest.isPathValid(), equalTo(false)); + } + private void reflectivelySetField(final HTTPSourceConfig httpSourceConfig, final String fieldName, final Object value) throws NoSuchFieldException, IllegalAccessException { final Field field = HTTPSourceConfig.class.getDeclaredField(fieldName); try { diff --git a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java index 54b64971b9..18d39813b0 100644 --- a/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java +++ b/data-prepper-plugins/http-source/src/test/java/org/opensearch/dataprepper/plugins/source/loghttp/HTTPSourceTest.java @@ -170,6 +170,7 @@ public void setUp() { sourceConfig = mock(HTTPSourceConfig.class); lenient().when(sourceConfig.getPort()).thenReturn(2021); + lenient().when(sourceConfig.getPath()).thenReturn(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI); lenient().when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(10_000); lenient().when(sourceConfig.getThreadCount()).thenReturn(200); lenient().when(sourceConfig.getMaxConnectionCount()).thenReturn(500); @@ -512,6 +513,7 @@ public void testServerStartCertFileSuccess() throws IOException { void testHTTPSJsonResponse() { reset(sourceConfig); when(sourceConfig.getPort()).thenReturn(2021); + when(sourceConfig.getPath()).thenReturn(HTTPSourceConfig.DEFAULT_LOG_INGEST_URI); when(sourceConfig.getThreadCount()).thenReturn(200); when(sourceConfig.getMaxConnectionCount()).thenReturn(500); when(sourceConfig.getMaxPendingRequests()).thenReturn(1024); @@ -536,6 +538,38 @@ void testHTTPSJsonResponse() { .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); } + @Test + void testHTTPSJsonResponse_with_custom_path_along_with_placeholder() { + reset(sourceConfig); + when(sourceConfig.getPort()).thenReturn(2021); + when(sourceConfig.getPath()).thenReturn("/${pipelineName}/test"); + when(sourceConfig.getThreadCount()).thenReturn(200); + when(sourceConfig.getMaxConnectionCount()).thenReturn(500); + when(sourceConfig.getMaxPendingRequests()).thenReturn(1024); + when(sourceConfig.getRequestTimeoutInMillis()).thenReturn(200); + when(sourceConfig.isSsl()).thenReturn(true); + + when(sourceConfig.getSslCertificateFile()).thenReturn(TEST_SSL_CERTIFICATE_FILE); + when(sourceConfig.getSslKeyFile()).thenReturn(TEST_SSL_KEY_FILE); + HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription); + + testBuffer = getBuffer(); + HTTPSourceUnderTest.start(testBuffer); + + final String path = "/" + TEST_PIPELINE_NAME + "/test"; + + WebClient.builder().factory(ClientFactory.insecure()).build().execute(RequestHeaders.builder() + .scheme(SessionProtocol.HTTPS) + .authority("127.0.0.1:2021") + .method(HttpMethod.POST) + .path(path) + .contentType(MediaType.JSON_UTF_8) + .build(), + HttpData.ofUtf8("[{\"log\": \"somelog\"}]")) + .aggregate() + .whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.OK)).join(); + } + @Test public void testDoubleStart() { // starting server diff --git a/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndBasicLogTest.java b/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndBasicLogTest.java index 4e26b2ffd7..0f7e98c1ce 100644 --- a/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndBasicLogTest.java +++ b/e2e-test/log/src/integrationTest/java/org/opensearch/dataprepper/integration/log/EndToEndBasicLogTest.java @@ -96,7 +96,7 @@ private void sendHttpRequestToSource(final int port, final HttpData httpData) { .scheme(SessionProtocol.HTTP) .authority(String.format("127.0.0.1:%d", port)) .method(HttpMethod.POST) - .path("/log/ingest") + .path("/grok-pipeline/logs") .contentType(MediaType.JSON_UTF_8) .build(), httpData) diff --git a/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml b/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml index 8313a5662b..b016c28b70 100644 --- a/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml +++ b/e2e-test/log/src/integrationTest/resources/basic-grok-e2e-pipeline.yml @@ -1,6 +1,7 @@ grok-pipeline: source: http: + path: "/${pipelineName}/logs" processor: - grok: match: