Skip to content

Commit

Permalink
Use buffer accumulator in opensearch source to backoff and retry
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Jun 8, 2023
1 parent a858cee commit 14068c0
Show file tree
Hide file tree
Showing 9 changed files with 536 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.BufferAccumulator;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker;
Expand All @@ -29,13 +30,15 @@ public class OpenSearchService {
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchService.class);

static final Duration EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = Duration.ofSeconds(30);
static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(30);

private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;
private final ScheduledExecutorService scheduledExecutorService;
private final BufferAccumulator<Record<Event>> bufferAccumulator;

private SearchWorker searchWorker;
private ScheduledFuture<?> searchWorkerFuture;
Expand All @@ -44,30 +47,34 @@ public static OpenSearchService createOpenSearchService(final SearchAccessor sea
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
return new OpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, Executors.newSingleThreadScheduledExecutor());
return new OpenSearchService(
searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, Executors.newSingleThreadScheduledExecutor(),
BufferAccumulator.create(buffer, openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(), BUFFER_TIMEOUT));
}

private OpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer,
final ScheduledExecutorService scheduledExecutorService) {
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer,
final ScheduledExecutorService scheduledExecutorService,
final BufferAccumulator<Record<Event>> bufferAccumulator) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
this.sourceCoordinator = sourceCoordinator;
this.sourceCoordinator.initialize();
this.openSearchIndexPartitionCreationSupplier = new OpenSearchIndexPartitionCreationSupplier(openSearchSourceConfiguration, (ClusterClientFactory) searchAccessor);
this.scheduledExecutorService = scheduledExecutorService;
this.bufferAccumulator = bufferAccumulator;
}

public void start() {
switch(searchAccessor.getSearchContextType()) {
case POINT_IN_TIME:
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier);
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier);
break;
case SCROLL:
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, buffer, openSearchIndexPartitionCreationSupplier);
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier);
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Accumulates {@link Record} objects before placing them into a Data Prepper
* {@link Buffer}. This class is not thread-safe and should only be used by one
* thread at a time.
*
* @param <T> Type of record to accumulate
*/
public class BufferAccumulator<T extends Record<?>> {
private static final Logger LOG = LoggerFactory.getLogger(BufferAccumulator.class);

private static final int MAX_FLUSH_RETRIES_ON_IO_EXCEPTION = Integer.MAX_VALUE;
private static final Duration INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION = Duration.ofSeconds(5);

private final Buffer<T> buffer;
private final int numberOfRecordsToAccumulate;
private final int bufferTimeoutMillis;
private int totalWritten = 0;

private final Collection<T> recordsAccumulated;

private BufferAccumulator(final Buffer<T> buffer, final int numberOfRecordsToAccumulate, final Duration bufferTimeout) {
this.buffer = Objects.requireNonNull(buffer, "buffer must be non-null.");
this.numberOfRecordsToAccumulate = numberOfRecordsToAccumulate;
Objects.requireNonNull(bufferTimeout, "bufferTimeout must be non-null.");
this.bufferTimeoutMillis = (int) bufferTimeout.toMillis();

if(numberOfRecordsToAccumulate < 1)
throw new IllegalArgumentException("numberOfRecordsToAccumulate must be greater than zero.");

recordsAccumulated = new ArrayList<>(numberOfRecordsToAccumulate);
}

public static <T extends Record<?>> BufferAccumulator<T> create(final Buffer<T> buffer, final int recordsToAccumulate, final Duration bufferTimeout) {
return new BufferAccumulator<T>(buffer, recordsToAccumulate, bufferTimeout);
}

void add(final T record) throws Exception {
recordsAccumulated.add(record);
if (recordsAccumulated.size() >= numberOfRecordsToAccumulate) {
flush();
}
}

void flush() throws Exception {
try {
flushAccumulatedToBuffer();
} catch (final TimeoutException timeoutException) {
flushWithBackoff();
}
}

private boolean flushWithBackoff() throws Exception{
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
long nextDelay = INITIAL_FLUSH_RETRY_DELAY_ON_IO_EXCEPTION.toMillis();
boolean flushedSuccessfully;

for (int retryCount = 0; retryCount < MAX_FLUSH_RETRIES_ON_IO_EXCEPTION; retryCount++) {
final ScheduledFuture<Boolean> flushBufferFuture = scheduledExecutorService.schedule(() -> {
try {
flushAccumulatedToBuffer();
return true;
} catch (final TimeoutException e) {
return false;
}
}, nextDelay, TimeUnit.MILLISECONDS);

try {
flushedSuccessfully = flushBufferFuture.get();
if (flushedSuccessfully) {
LOG.info("Successfully flushed the buffer accumulator on retry attempt {}", retryCount + 1);
scheduledExecutorService.shutdownNow();
return true;
}
} catch (final ExecutionException e) {
LOG.warn("Retrying of flushing the buffer accumulator hit an exception: {}", e.getMessage());
scheduledExecutorService.shutdownNow();
throw e;
} catch (final InterruptedException e) {
LOG.warn("Retrying of flushing the buffer accumulator was interrupted: {}", e.getMessage());
scheduledExecutorService.shutdownNow();
throw e;
}
}

LOG.warn("Flushing the bufferAccumulator failed after {} attempts", MAX_FLUSH_RETRIES_ON_IO_EXCEPTION);
scheduledExecutorService.shutdownNow();
return false;
}

private void flushAccumulatedToBuffer() throws Exception {
final int currentRecordCountAccumulated = recordsAccumulated.size();
if (currentRecordCountAccumulated > 0) {
buffer.writeAll(recordsAccumulated, bufferTimeoutMillis);
recordsAccumulated.clear();
totalWritten += currentRecordCountAccumulated;
}
}

/**
* Gets the total number of records written to the buffer.
*
* @return the total number of records written
*/
public int getTotalWritten() {
return totalWritten;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
Expand All @@ -28,8 +27,6 @@
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
* PitWorker polls the source cluster via Point-In-Time contexts.
Expand All @@ -38,7 +35,6 @@ public class PitWorker implements SearchWorker, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(PitWorker.class);

static final int BUFFER_TIMEOUT_MILLIS = 180_000;
private static final int STANDARD_BACKOFF_MILLIS = 30_000;
private static final Duration BACKOFF_ON_PIT_LIMIT_REACHED = Duration.ofSeconds(60);
static final String STARTING_KEEP_ALIVE = "15m";
Expand All @@ -47,18 +43,18 @@ public class PitWorker implements SearchWorker, Runnable {
private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

public PitWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) {
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
this.bufferAccumulator = bufferAccumulator;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
}

Expand Down Expand Up @@ -138,9 +134,14 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
.withPaginationSize(searchConfiguration.getBatchSize())
.withSearchAfter(Objects.nonNull(searchPointInTimeResponse) ? searchPointInTimeResponse.getNextSearchAfter() : null)
.build());
buffer.writeAll(searchPointInTimeResponse.getDocuments().stream().map(Record::new).collect(Collectors.toList()), BUFFER_TIMEOUT_MILLIS);
} catch (final TimeoutException e) {
// todo: implement backoff and retry, can reuse buffer accumulator code from the s3 source

searchPointInTimeResponse.getDocuments().stream().map(Record::new).forEach(record -> {
try {
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed writing OpenSearch documents to buffer due to: {}", e.getMessage());
}
});
} catch (final Exception e) {
LOG.error("Received an exception while searching with PIT for index '{}'", indexName);
throw new RuntimeException(e);
Expand All @@ -150,6 +151,11 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
sourceCoordinator.saveProgressStateForPartition(indexName, openSearchIndexProgressState);
} while (searchPointInTimeResponse.getDocuments().size() == searchConfiguration.getBatchSize());

try {
bufferAccumulator.flush();
} catch (final Exception e) {
LOG.error("Failed writing remaining OpenSearch documents to buffer due to: {}", e.getMessage());
}

// todo: This API call is failing with sigv4 enabled due to a mismatch in the signature. Tracking issue (https://github.com/opensearch-project/opensearch-java/issues/521)
searchAccessor.deletePit(DeletePointInTimeRequest.builder().withPitId(openSearchIndexProgressState.getPitId()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
Expand All @@ -20,18 +19,18 @@ public class ScrollWorker implements SearchWorker {
private final SearchAccessor searchAccessor;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final Buffer<Record<Event>> buffer;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

public ScrollWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final Buffer<Record<Event>> buffer,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.sourceCoordinator = sourceCoordinator;
this.buffer = buffer;
this.bufferAccumulator = bufferAccumulator;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,8 @@ public void deletePit(final DeletePointInTimeRequest deletePointInTimeRequest) {
} else {
LOG.warn("Point in time id {} was not deleted successfully. It will expire from keep-alive", deletePointInTimeRequest.getPitId());
}
} catch (final OpenSearchException e) {
LOG.error("There was an error deleting the point in time with id {} for OpenSearch: ", deletePointInTimeRequest.getPitId(), e);
throw e;
} catch (IOException e) {
LOG.error("There was an error deleting the point in time with id {} for OpenSearch: {}", deletePointInTimeRequest.getPitId(), e.getMessage());
throw new RuntimeException(e);
} catch (final IOException | RuntimeException e) {
LOG.error("There was an error deleting the point in time with id {} for OpenSearch. It will expire from keep-alive: ", deletePointInTimeRequest.getPitId(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.opensearch;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
Expand All @@ -18,6 +19,8 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.BufferAccumulator;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker;
Expand All @@ -40,6 +43,7 @@
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchService.BUFFER_TIMEOUT;

@ExtendWith(MockitoExtension.class)
public class OpenSearchServiceTest {
Expand All @@ -53,6 +57,9 @@ public class OpenSearchServiceTest {
@Mock
private Buffer<Record<Event>> buffer;

@Mock
private BufferAccumulator<Record<Event>> bufferAccumulator;

@Mock
private SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;

Expand All @@ -65,12 +72,22 @@ public class OpenSearchServiceTest {
@Mock
private SearchWorker searchWorker;

@BeforeEach
void setup() {
final SearchConfiguration searchConfiguration = mock(SearchConfiguration.class);
when(searchConfiguration.getBatchSize()).thenReturn(1000);

when(openSearchSourceConfiguration.getSearchConfiguration()).thenReturn(searchConfiguration);
}

private OpenSearchService createObjectUnderTest() {
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class);
final MockedConstruction<OpenSearchIndexPartitionCreationSupplier> mockedConstruction = mockConstruction(OpenSearchIndexPartitionCreationSupplier.class, (mock, context) -> {
openSearchIndexPartitionCreationSupplier = mock;
})) {
executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor).thenReturn(scheduledExecutorService);
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(), BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
return OpenSearchService.createOpenSearchService(openSearchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer);
}
}
Expand Down
Loading

0 comments on commit 14068c0

Please sign in to comment.