Skip to content

Commit

Permalink
Opensearch connection related changes for opensearch-project#1985,2264.
Browse files Browse the repository at this point in the history
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 committed May 19, 2023
1 parent d64b48c commit 1db102d
Show file tree
Hide file tree
Showing 16 changed files with 875 additions and 0 deletions.
25 changes: 25 additions & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
dependencies {
implementation project(path: ':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'org.opensearch.client:opensearch-java:2.4.0'
implementation 'co.elastic.clients:elasticsearch-java:8.7.0'
implementation 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.1'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1'
implementation "org.apache.commons:commons-lang3:3.12.0"
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'

}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;

public class BackoffService {

private static final Logger LOG = LoggerFactory.getLogger(BackoffService.class);

private int numberOfRetries;

private int numberOfTriesLeft;

private long defaultTimeToWait = 1000;

private long timeToWait;

private final Random random = new Random();

public BackoffService(int numberOfRetries) {
this.numberOfRetries = numberOfRetries;
this.numberOfTriesLeft = numberOfRetries;
this.timeToWait = defaultTimeToWait;
}

public boolean shouldRetry() {
return numberOfTriesLeft > 0;
}

public void errorOccured() {
numberOfTriesLeft--;
if (!shouldRetry()) {
LOG.info("RETRY FAILED");
}
waitUntilNextTry();
timeToWait += random.nextInt(1000);
}

public void waitUntilNextTry() {
try {
Thread.sleep(timeToWait);
}catch(InterruptedException e) {
LOG.error("InterruptedException", e);
}
}

public long getTimeToWait() {
return this.timeToWait;
}

public void doNotRetry() {
numberOfTriesLeft = 0;
}

public void reset() {
this.numberOfTriesLeft = numberOfRetries;
this.timeToWait = defaultTimeToWait;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType =OpenSearchSourceConfiguration.class )
public class OpenSearchSource implements Source<Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSource.class);

private static final String OPEN_SEARCH = "opensearch";

private OpenSearchClient openSearchClient;

private ElasticsearchClient elasticsearchClient;

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;

@DataPrepperPluginConstructor
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
}

@Override
public void start(final Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
startProcess(openSearchSourceConfiguration,buffer);
}

private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration,final Buffer<Record<Event>> buffer) {
PrepareConnection prepareConnection = new PrepareConnection();

try {
SourceInfo sourceInfo = new SourceInfo();
SourceInfoProvider sourceInfoProvider = new SourceInfoProvider();
String datasource = sourceInfoProvider.getSourceInfo(openSearchSourceConfiguration);
sourceInfo.setDataSource(datasource);
LOG.info("Datasource is : {} ", sourceInfo.getDataSource());
sourceInfo = sourceInfoProvider.checkStatus(openSearchSourceConfiguration, sourceInfo);
if (Boolean.TRUE.equals(sourceInfo.getHealthStatus())) {
if (OPEN_SEARCH.equalsIgnoreCase(datasource)) {
openSearchClient = prepareConnection.prepareOpensearchConnection(openSearchSourceConfiguration);
} else {
elasticsearchClient = prepareConnection.prepareElasticSearchConnection(openSearchSourceConfiguration);

}

} else {
BackoffService backoff = new BackoffService(openSearchSourceConfiguration.getMaxRetries());
backoff.waitUntilNextTry();
while (backoff.shouldRetry()) {
sourceInfo = sourceInfoProvider.checkStatus(openSearchSourceConfiguration,sourceInfo);
if (Boolean.TRUE.equals(sourceInfo.getHealthStatus())) {
backoff.doNotRetry();
break;
} else {
backoff.errorOccured();
}
}

}
} catch ( Exception e ){
LOG.error("Error occured while starting the process",e);
}


}

@Override
public void stop() {
LOG.info("Process stopped");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.QueryParameterConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;

import java.util.List;

public class OpenSearchSourceConfiguration {

@JsonProperty("max_retries")
private Integer maxRetries;

@NotNull
@JsonProperty("hosts")
private List<String> hosts;

@JsonProperty("username")
private String username;

@JsonProperty("password")
private String password;

@JsonProperty("connection")
private ConnectionConfiguration connectionConfiguration;

@JsonProperty("indices")
private IndexParametersConfiguration indexParametersConfiguration;

@JsonProperty("aws")
private AwsAuthenticationConfiguration awsAuthenticationOptions;

@JsonProperty("scheduling")
private SchedulingParameterConfiguration schedulingParameterConfiguration;

@JsonProperty("query")
private QueryParameterConfiguration queryParameterConfiguration;

@JsonProperty("search_options")
private SearchConfiguration searchConfiguration;

public Integer getMaxRetries() {
return maxRetries;
}

public List<String> getHosts() { return hosts; }
public void setHosts(List<String> hosts) { this.hosts = hosts; }

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}

public ConnectionConfiguration getConnectionConfiguration() {
return connectionConfiguration;
}

public IndexParametersConfiguration getIndexParametersConfiguration() {
return indexParametersConfiguration;
}

public AwsAuthenticationConfiguration getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
}

public SchedulingParameterConfiguration getSchedulingParameterConfiguration() {
return schedulingParameterConfiguration;
}

public QueryParameterConfiguration getQueryParameterConfiguration() {
return queryParameterConfiguration;
}

public SearchConfiguration getSearchConfiguration() {
return searchConfiguration;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.core5.http.HttpHost;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.entity.ContentType;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;


public class PrepareConnection {

private static final String HEADER_NAME = "X-Elastic-Product";

private static final String HEADER_VALUE = "Elasticsearch";

/**
* @param openSearchSourceConfiguration
* @return
* @throws MalformedURLException This method create opensearch client based on host information, which will be used to call opensearch apis
*/
public OpenSearchClient prepareOpensearchConnection(final OpenSearchSourceConfiguration openSearchSourceConfiguration) throws MalformedURLException {
URL urlLink = getHostDetails(openSearchSourceConfiguration.getHosts().get(0));
final HttpHost host = new HttpHost(urlLink.getProtocol(), urlLink.getHost(), urlLink.getPort());
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
final OpenSearchTransport transport = ApacheHttpClient5TransportBuilder
.builder(host)
.setMapper(new org.opensearch.client.json.jackson.JacksonJsonpMapper())
.build();
return new OpenSearchClient(transport);
}

/**
* @param openSearchSourceConfiguration
* @return
* @throws MalformedURLException This method create Elasticsearch client based on host information, which will be used to call opensearch apis
*/
public ElasticsearchClient prepareElasticSearchConnection(final OpenSearchSourceConfiguration openSearchSourceConfiguration) throws MalformedURLException {
URL urlLink = getHostDetails(openSearchSourceConfiguration.getHosts().get(0));
RestClient client = org.elasticsearch.client.RestClient.builder(new org.apache.http.HttpHost(urlLink.getHost(), urlLink.getPort())).
setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultHeaders(List.of(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())))
.addInterceptorLast((HttpResponseInterceptor) (response, context) -> response.addHeader(HEADER_NAME, HEADER_VALUE))).build();
JacksonJsonpMapper jacksonJsonpMapper = new JacksonJsonpMapper();
ElasticsearchTransport transport = new RestClientTransport(client, jacksonJsonpMapper);
return new ElasticsearchClient(transport);
}

private URL getHostDetails(final String url) throws MalformedURLException {
URL urlLink = new URL(url);
return urlLink;
}
}




Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch;

public class SourceInfo {

private String osVersion;

private String dataSource;

private Boolean healthStatus = true;

public String getOsVersion() {
return osVersion;
}

public String getDataSource() {
return dataSource;
}

public void setDataSource(String dataSource) {
this.dataSource = dataSource;
}

public void setOsVersion(String osVersion) {
this.osVersion = osVersion;
}

public Boolean getHealthStatus() {
return healthStatus;
}

public void setHealthStatus(Boolean healthStatus) {
this.healthStatus = healthStatus;
}
}
Loading

0 comments on commit 1db102d

Please sign in to comment.