Skip to content

Commit

Permalink
Final code for opensearch for opensearch-project#1985,2264.
Browse files Browse the repository at this point in the history
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 committed May 29, 2023
1 parent 40b73ad commit 5d5b765
Show file tree
Hide file tree
Showing 41 changed files with 2,396 additions and 302 deletions.
16 changes: 13 additions & 3 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,22 @@
*/
dependencies {
implementation project(path: ':data-prepper-api')
implementation libs.armeria.core
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.1'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation group: 'org.opensearch.client', name: 'opensearch-rest-client', version: '2.6.0'
implementation 'org.opensearch.client:opensearch-java:2.4.0'
implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148'
implementation 'co.elastic.clients:elasticsearch-java:8.7.0'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
implementation project(path: ':data-prepper-core')
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation testLibs.mockito.inline
}

test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import java.net.URL;
import java.util.List;

import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.core5.http.HttpHost;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.entity.ContentType;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
import org.opensearch.dataprepper.plugins.source.opensearch.client.HttpCustomClient;

/**
* used for creating connection
*/
public class OpenSearchClientBuilder {

/**
* This method create opensearch client based on host information, which will be used to call opensearch apis
* @param url
* @return
*/
public OpenSearchClient createOpenSearchClient(final URL url){
final HttpHost host = new HttpHost(url.getProtocol(), url.getHost(), url.getPort());
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
final OpenSearchTransport transport = ApacheHttpClient5TransportBuilder
.builder(host)
.setMapper(new org.opensearch.client.json.jackson.JacksonJsonpMapper())
.build();
return new OpenSearchClient(transport);
}

/**
* This method create Elasticsearch client based on host information, which will be used to call opensearch apis
* @param url
* @return
*/
public ElasticsearchClient createElasticSearchClient(final URL url) {
final String HEADER_NAME = "X-Elastic-Product";
final String HEADER_VALUE = "Elasticsearch";

RestClient client = org.elasticsearch.client.RestClient.builder(new org.apache.http.HttpHost(url.getHost(), url.getPort())).
setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultHeaders(List.of(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())))
.addInterceptorLast((HttpResponseInterceptor) (response, context) -> response.addHeader(HEADER_NAME, HEADER_VALUE))).build();
JacksonJsonpMapper jacksonJsonpMapper = new JacksonJsonpMapper();
ElasticsearchTransport transport = new RestClientTransport(client, jacksonJsonpMapper);
return new ElasticsearchClient(transport);
}

/**
* This will create a custom http client.
* @param url
* @return
*/
public HttpCustomClient createCustomHttpClient(final URL url){
return new HttpCustomClient(url);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.source.opensearch.service.ElasticSearchService;
import org.opensearch.dataprepper.plugins.source.opensearch.service.HostsService;
import org.opensearch.dataprepper.plugins.source.opensearch.service.OpenSearchService;

import java.time.Duration;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType =OpenSearchSourceConfiguration.class )
public class OpenSearchSource implements Source<Record<Event>> {

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;

private OpenSearchSourceService openSearchSourceService;

@DataPrepperPluginConstructor
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
Expand All @@ -26,16 +33,20 @@ public void start(Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
startProcess(openSearchSourceConfiguration);
}

private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
// todo: implement
// Should leverage OpenSearchService to run the actual plugin core logic.
HostsService hostsService = new HostsService();
OpenSearchClientBuilder clientBuilder = new OpenSearchClientBuilder();
OpenSearchService openSearchService = new OpenSearchService(clientBuilder);
ElasticSearchService elasticSearchService = new ElasticSearchService(clientBuilder);
openSearchSourceService = new OpenSearchSourceService(
openSearchSourceConfiguration,
hostsService,
openSearchService,
elasticSearchService, buffer);
openSearchSourceService.processHosts();
}

@Override
public void stop() {
// Yet to implement
openSearchSourceService.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,20 @@
package org.opensearch.dataprepper.plugins.source.opensearch;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.QueryParameterConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;

import java.util.List;
import java.util.Objects;

public class OpenSearchSourceConfiguration {

/**
* 0 indicates infinite retries
*/
@JsonProperty("max_retries")
@Min(0)
private Integer maxRetries = 0;
private Integer maxRetries;

@NotNull
@JsonProperty("hosts")
Expand All @@ -38,23 +31,21 @@ public class OpenSearchSourceConfiguration {
private String password;

@JsonProperty("connection")
@Valid
private ConnectionConfiguration connectionConfiguration;

@JsonProperty("indices")
@Valid
private IndexParametersConfiguration indexParametersConfiguration;

@JsonProperty("aws")
@Valid
private AwsAuthenticationConfiguration awsAuthenticationOptions;

@JsonProperty("scheduling")
@Valid
private SchedulingParameterConfiguration schedulingParameterConfiguration;

@JsonProperty("query")
private QueryParameterConfiguration queryParameterConfiguration;

@JsonProperty("search_options")
@Valid
private SearchConfiguration searchConfiguration;

public Integer getMaxRetries() {
Expand Down Expand Up @@ -89,15 +80,12 @@ public SchedulingParameterConfiguration getSchedulingParameterConfiguration() {
return schedulingParameterConfiguration;
}

public SearchConfiguration getSearchConfiguration() {
return searchConfiguration;
public QueryParameterConfiguration getQueryParameterConfiguration() {
return queryParameterConfiguration;
}

@AssertTrue(message = "Either username and password, or aws options must be specified. Both cannot be set at once.")
boolean validateAwsConfigWithUsernameAndPassword() {

return !((Objects.nonNull(awsAuthenticationOptions) && (Objects.nonNull(username) || Objects.nonNull(password))) ||
(Objects.isNull(awsAuthenticationOptions) && Objects.isNull(username) && Objects.isNull(password)));
public SearchConfiguration getSearchConfiguration() {
return searchConfiguration;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import com.linecorp.armeria.client.retry.Backoff;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.model.ServiceInfo;
import org.opensearch.dataprepper.plugins.source.opensearch.service.ElasticSearchService;
import org.opensearch.dataprepper.plugins.source.opensearch.service.HostsService;
import org.opensearch.dataprepper.plugins.source.opensearch.service.OpenSearchService;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchTimerWorker;

import java.time.Duration;
import java.util.Timer;

/**
* Service that will call both worker classes
*/
public class OpenSearchSourceService {

private final OpenSearchService openSearchService;

private final ElasticSearchService elasticSearchService;

private final HostsService hostsService;

private final OpenSearchSourceConfiguration sourceConfig;

private final Timer timer = new Timer();

private final Buffer<Record<Event>> buffer;

static final long INITIAL_DELAY = Duration.ofSeconds(20).toMillis();
static final long MAXIMUM_DELAY = Duration.ofMinutes(5).toMillis();
static final double JITTER_RATE = 0.20;

public OpenSearchSourceService(final OpenSearchSourceConfiguration sourceConfig,
final HostsService hostsService,
final OpenSearchService openSearchService,
final ElasticSearchService elasticSearchService,
final Buffer<Record<Event>> buffer){
this.sourceConfig = sourceConfig;
this.hostsService = hostsService;
this.openSearchService = openSearchService;
this.elasticSearchService =elasticSearchService;
this.buffer = buffer;
}

public void processHosts(){
sourceConfig.getHosts().forEach(host ->{
final ServiceInfo serviceInfo = hostsService.findServiceDetailsByUrl(host);
IndexParametersConfiguration index = sourceConfig.getIndexParametersConfiguration();
final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
timer.scheduleAtFixedRate(new OpenSearchTimerWorker(openSearchService,elasticSearchService,
sourceConfig,buffer,serviceInfo,host, backoff),
sourceConfig.getSchedulingParameterConfiguration().getStartTime().getSecond(),
sourceConfig.getSchedulingParameterConfiguration().getRate().toMillis());
});
}
public void stop(){
timer.cancel();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.opensearch.dataprepper.plugins.source.opensearch.client;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.io.entity.StringEntity;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;

public class HttpCustomClient {

private URL url;

public HttpCustomClient(URL url) {
this.url = url;
}

private ObjectMapper mapper = new ObjectMapper();

public <T> T execute(final Class<T> responseType, final Object requestObject, final String httpMethod,final String uri) throws IOException {
StringEntity requestEntity = new StringEntity(mapper.writeValueAsString(requestObject));
URI httpUri = URI.create(url.getProtocol()+"://"+url.getAuthority()+"/"+uri);
HttpUriRequestBase operationRequest = new HttpUriRequestBase(httpMethod, httpUri);
operationRequest.setHeader("Accept", ContentType.APPLICATION_JSON);
operationRequest.setHeader("Content-type", ContentType.APPLICATION_JSON);
operationRequest.setEntity(requestEntity);

CloseableHttpResponse closeableHttpResponse = getCloseableHttpResponse(operationRequest);
T response = mapper.readValue(readBuffer(closeableHttpResponse).toString(), responseType);
return response;
}

private StringBuffer readBuffer(CloseableHttpResponse pitCloseableResponse) throws IOException {
StringBuffer result = new StringBuffer();
BufferedReader reader = new BufferedReader(new InputStreamReader(pitCloseableResponse.getEntity().getContent()));
String line = "";
while ((line = reader.readLine()) != null) {
result.append(line);
}
return result;
}

private CloseableHttpResponse getCloseableHttpResponse(final HttpUriRequestBase operationRequest) throws IOException {
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse pitResponse = httpClient.execute(operationRequest);
return pitResponse;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.codec;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpDeserializerBase;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.jackson.JacksonJsonpParser;
import java.io.IOException;
import java.util.EnumSet;

public class JacksonValueParser<T> extends JsonpDeserializerBase<T> {

private final ObjectMapper objectMapper = new ObjectMapper();
private final Class<T> clazz;
public JacksonValueParser(Class<T> clazz) {
super(EnumSet.allOf(JsonParser.Event.class));
this.clazz = clazz;
}
@Override
public T deserialize(JsonParser parser, JsonpMapper mapper, JsonParser.Event event) {

if (!(parser instanceof JacksonJsonpParser)) {
throw new IllegalArgumentException("Jackson's ObjectMapper can only be used with the JacksonJsonpProvider");
}
com.fasterxml.jackson.core.JsonParser jkParser = ((JacksonJsonpParser) parser).jacksonParser();

try {
return objectMapper.readValue(jkParser, clazz);
} catch (IOException e) {
throw new RuntimeException(e);
}

}
}
Loading

0 comments on commit 5d5b765

Please sign in to comment.