Skip to content

Commit

Permalink
Connection changes for opensearch for opensearch-project#1985,2264.
Browse files Browse the repository at this point in the history
Signed-off-by: mallikagogoi7 <[email protected]>
  • Loading branch information
mallikagogoi7 committed May 25, 2023
1 parent 3ce5b53 commit f5dbff3
Show file tree
Hide file tree
Showing 24 changed files with 1,578 additions and 11 deletions.
15 changes: 13 additions & 2 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,21 @@
*/
dependencies {
implementation project(path: ':data-prepper-api')
implementation libs.armeria.core
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.1'
implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1'
implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1'
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation group: 'org.opensearch.client', name: 'opensearch-rest-client', version: '2.6.0'
implementation 'org.opensearch.client:opensearch-java:2.4.0'
implementation 'software.amazon.awssdk:aws-sdk-java:2.17.148'
implementation 'co.elastic.clients:elasticsearch-java:8.7.0'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
implementation project(path: ':data-prepper-core')
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Accumulates {@link Record} objects before placing them into a Data Prepper
* {@link Buffer}. This class is not thread-safe and should only be used by one
* thread at a time.
*
* @param <T> Type of record to accumulate
*/
public class BufferAccumulator<T extends Record<?>> {
private static final Logger LOG = LoggerFactory.getLogger(BufferAccumulator.class);

private static final int MAX_FLUSH_RETRIES_ON_IO_EXCEPTION = Integer.MAX_VALUE;
private static final Duration INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION = Duration.ofSeconds(5);

private final Buffer<T> buffer;
private final int numberOfRecordsToAccumulate;
private final int bufferTimeoutMillis;
private int totalWritten = 0;

private final Collection<T> recordsAccumulated;

private BufferAccumulator(final Buffer<T> buffer, final int numberOfRecordsToAccumulate, final Duration bufferTimeout) {
this.buffer = Objects.requireNonNull(buffer, "buffer must be non-null.");
this.numberOfRecordsToAccumulate = numberOfRecordsToAccumulate;
Objects.requireNonNull(bufferTimeout, "bufferTimeout must be non-null.");
this.bufferTimeoutMillis = (int) bufferTimeout.toMillis();

if(numberOfRecordsToAccumulate < 1)
throw new IllegalArgumentException("numberOfRecordsToAccumulate must be greater than zero.");

recordsAccumulated = new ArrayList<>(numberOfRecordsToAccumulate);
}

static <T extends Record<?>> BufferAccumulator<T> create(final Buffer<T> buffer, final int recordsToAccumulate, final Duration bufferTimeout) {
return new BufferAccumulator<T>(buffer, recordsToAccumulate, bufferTimeout);
}

void add(final T record) throws Exception {
recordsAccumulated.add(record);
if (recordsAccumulated.size() >= numberOfRecordsToAccumulate) {
flush();
}
}

void flush() throws Exception {
try {
flushAccumulatedToBuffer();
} catch (final TimeoutException timeoutException) {
flushWithBackoff();
}
}

private boolean flushWithBackoff() throws Exception{
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
long nextDelay = INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION.toMillis();
boolean flushedSuccessfully;

for (int retryCount = 0; retryCount < MAX_FLUSH_RETRIES_ON_IO_EXCEPTION; retryCount++) {
final ScheduledFuture<Boolean> flushBufferFuture = scheduledExecutorService.schedule(() -> {
try {
flushAccumulatedToBuffer();
return true;
} catch (final TimeoutException e) {
return false;
}
}, nextDelay, TimeUnit.MILLISECONDS);

try {
flushedSuccessfully = flushBufferFuture.get();
if (flushedSuccessfully) {
LOG.info("Successfully flushed the buffer accumulator on retry attempt {}", retryCount + 1);
scheduledExecutorService.shutdownNow();
return true;
}
} catch (final ExecutionException e) {
LOG.warn("Retrying of flushing the buffer accumulator hit an exception: {}", e.getMessage());
scheduledExecutorService.shutdownNow();
throw e;
} catch (final InterruptedException e) {
LOG.warn("Retrying of flushing the buffer accumulator was interrupted: {}", e.getMessage());
scheduledExecutorService.shutdownNow();
throw e;
}
}

LOG.warn("Flushing the bufferAccumulator failed after {} attempts", MAX_FLUSH_RETRIES_ON_IO_EXCEPTION);
scheduledExecutorService.shutdownNow();
return false;
}

private void flushAccumulatedToBuffer() throws Exception {
final int currentRecordCountAccumulated = recordsAccumulated.size();
if (currentRecordCountAccumulated > 0) {
buffer.writeAll(recordsAccumulated, bufferTimeoutMillis);
recordsAccumulated.clear();
totalWritten += currentRecordCountAccumulated;
}
}

/**
* Gets the total number of records written to the buffer.
*
* @return the total number of records written
*/
public int getTotalWritten() {
return totalWritten;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import java.net.URL;
import java.util.List;

import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.core5.http.HttpHost;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.entity.ContentType;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;

/**
* used for creating connection
*/
public class OpenSearchClientBuilder {

/**
* This method create opensearch client based on host information, which will be used to call opensearch apis
* @param url
* @return
*/
public OpenSearchClient createOpenSearchClient(final URL url){
final HttpHost host = new HttpHost(url.getProtocol(), url.getHost(), url.getPort());
final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
final OpenSearchTransport transport = ApacheHttpClient5TransportBuilder
.builder(host)
.setMapper(new org.opensearch.client.json.jackson.JacksonJsonpMapper())
.build();
return new OpenSearchClient(transport);
}

/**
* This method create Elasticsearch client based on host information, which will be used to call opensearch apis
* @param url
* @return
*/
public ElasticsearchClient createElasticSearchClient(final URL url) {
final String HEADER_NAME = "X-Elastic-Product";
final String HEADER_VALUE = "Elasticsearch";

RestClient client = org.elasticsearch.client.RestClient.builder(new org.apache.http.HttpHost(url.getHost(), url.getPort())).
setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultHeaders(List.of(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())))
.addInterceptorLast((HttpResponseInterceptor) (response, context) -> response.addHeader(HEADER_NAME, HEADER_VALUE))).build();
JacksonJsonpMapper jacksonJsonpMapper = new JacksonJsonpMapper();
ElasticsearchTransport transport = new RestClientTransport(client, jacksonJsonpMapper);
return new ElasticsearchClient(transport);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,22 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.plugins.source.opensearch.service.ElasticSearchService;
import org.opensearch.dataprepper.plugins.source.opensearch.service.HostsService;
import org.opensearch.dataprepper.plugins.source.opensearch.service.OpenSearchService;

import java.time.Duration;

@DataPrepperPlugin(name="opensearch", pluginType = Source.class , pluginConfigurationType =OpenSearchSourceConfiguration.class )
public class OpenSearchSource implements Source<Record<Event>> {

static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(10);
static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 100;

private final OpenSearchSourceConfiguration openSearchSourceConfiguration;

private OpenSearchSourceService openSearchSourceService;

@DataPrepperPluginConstructor
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
Expand All @@ -26,15 +36,22 @@ public void start(Buffer<Record<Event>> buffer) {
if (buffer == null) {
throw new IllegalStateException("Buffer provided is null");
}
startProcess(openSearchSourceConfiguration);
}

private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
//Yet to implement
HostsService hostsService = new HostsService();
OpenSearchClientBuilder clientBuilder = new OpenSearchClientBuilder();
OpenSearchService openSearchService = new OpenSearchService(clientBuilder);
ElasticSearchService elasticSearchService = new ElasticSearchService(clientBuilder);
BufferAccumulator bufferAccumulator = BufferAccumulator.create(buffer,DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, DEFAULT_BUFFER_TIMEOUT);
openSearchSourceService = new OpenSearchSourceService(
openSearchSourceConfiguration,
hostsService,
openSearchService,
elasticSearchService,
bufferAccumulator);
openSearchSourceService.processHosts();
}

@Override
public void stop() {
// Yet to implement
openSearchSourceService.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.model.ServiceInfo;
import org.opensearch.dataprepper.plugins.source.opensearch.service.ElasticSearchService;
import org.opensearch.dataprepper.plugins.source.opensearch.service.HostsService;
import org.opensearch.dataprepper.plugins.source.opensearch.service.OpenSearchService;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchTimerWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Timer;

/**
* Service that will call both worker classes
*/
public class OpenSearchSourceService {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSourceService.class);

private final OpenSearchService openSearchService;

private final ElasticSearchService elasticSearchService;

private final HostsService hostsService;

private final OpenSearchSourceConfiguration sourceConfig;

private final BufferAccumulator bufferAccumulator;

private final Timer timer = new Timer();

public OpenSearchSourceService(final OpenSearchSourceConfiguration sourceConfig,
final HostsService hostsService,
final OpenSearchService openSearchService,
final ElasticSearchService elasticSearchService,
final BufferAccumulator bufferAccumulator){
this.sourceConfig = sourceConfig;
this.hostsService = hostsService;
this.openSearchService = openSearchService;
this.elasticSearchService =elasticSearchService;
this.bufferAccumulator = bufferAccumulator;
}

public void processHosts(){
sourceConfig.getHosts().forEach(host ->{
final ServiceInfo serviceInfo = hostsService.findServiceDetailsByUrl(host);
IndexParametersConfiguration index = sourceConfig.getIndexParametersConfiguration();
timer.scheduleAtFixedRate(new OpenSearchTimerWorker(openSearchService,elasticSearchService,
sourceConfig,bufferAccumulator,serviceInfo,host),
sourceConfig.getSchedulingParameterConfiguration().getStartTime().getSecond(),
sourceConfig.getSchedulingParameterConfiguration().getRate().toMillis());
});
}
public void stop(){
timer.cancel();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.codec;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpDeserializerBase;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.jackson.JacksonJsonpParser;
import java.io.IOException;
import java.util.EnumSet;

public class JacksonValueParser<T> extends JsonpDeserializerBase<T> {

private final ObjectMapper objectMapper = new ObjectMapper();
private final Class<T> clazz;
public JacksonValueParser(Class<T> clazz) {
super(EnumSet.allOf(JsonParser.Event.class));
this.clazz = clazz;
}
@Override
public T deserialize(JsonParser parser, JsonpMapper mapper, JsonParser.Event event) {

if (!(parser instanceof JacksonJsonpParser)) {
throw new IllegalArgumentException("Jackson's ObjectMapper can only be used with the JacksonJsonpProvider");
}
com.fasterxml.jackson.core.JsonParser jkParser = ((JacksonJsonpParser) parser).jacksonParser();

try {
return objectMapper.readValue(jkParser, clazz);
} catch (IOException e) {
throw new RuntimeException(e);
}

}
}
Loading

0 comments on commit f5dbff3

Please sign in to comment.