Skip to content

Commit

Permalink
Rewrite flaky tests
Browse files Browse the repository at this point in the history
  • Loading branch information
afoucret committed Oct 19, 2023
1 parent db985e4 commit 81ac77f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ENT_SEARCH_ORIGIN;
Expand All @@ -31,16 +32,22 @@ public class BulkProcessorFactory {

private final AnalyticsEventIngestConfig config;

private final Client client;
private final Supplier<BulkProcessor2.Builder> builderSupplier;

@Inject
public BulkProcessorFactory(Client client, AnalyticsEventIngestConfig config) {
this.client = new OriginSettingClient(client, ENT_SEARCH_ORIGIN);
Client originClient = new OriginSettingClient(client, ENT_SEARCH_ORIGIN);
this.builderSupplier = () -> BulkProcessor2.builder(originClient::bulk, new BulkProcessorListener(), originClient.threadPool());
this.config = config;
}

protected BulkProcessorFactory(AnalyticsEventIngestConfig config, Supplier<BulkProcessor2.Builder> builderSupplier) {
this.builderSupplier = builderSupplier;
this.config = config;
}

public BulkProcessor2 create() {
return BulkProcessor2.builder(client::bulk, new BulkProcessorListener(), client.threadPool())
return builderSupplier.get()
.setMaxNumberOfRetries(config.maxNumberOfRetries())
.setBulkActions(config.maxNumberOfEventsPerBulk())
.setFlushInterval(config.flushDelay())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,23 @@

package org.elasticsearch.xpack.application.analytics.ingest;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkProcessor2;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.mockito.InOrder;
import org.mockito.Mockito;

import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

public class BulkProcessorFactoryTests extends ESTestCase {
Expand All @@ -49,83 +39,31 @@ public static void afterClass() {
ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS);
}

public void testFlushDelay() throws Exception {
AnalyticsEventIngestConfig config = mock(AnalyticsEventIngestConfig.class);
doReturn(ByteSizeValue.ofMb(10)).when(config).maxBytesInFlight();
doReturn(TimeValue.timeValueSeconds(1)).when(config).flushDelay();
doReturn(10).when(config).maxNumberOfEventsPerBulk();

Client client = mock(Client.class);

doReturn(testThreadPool).when(client).threadPool();
BulkProcessor2 bulkProcessor = new BulkProcessorFactory(client, config).create();
IndexRequest indexRequest = mock(IndexRequest.class);
bulkProcessor.add(indexRequest);

assertBusy(() -> verify(client).execute(any(BulkAction.class), argThat((BulkRequest bulkRequest) -> {
assertThat(bulkRequest.numberOfActions(), equalTo(1));
assertThat(bulkRequest.requests().stream().findFirst().get(), equalTo(indexRequest));
return true;
}), any()), 1, TimeUnit.SECONDS);
public void testDefaultConstructor() throws Exception {
BulkProcessorFactory factory = new BulkProcessorFactory(mock(Client.class), mock(AnalyticsEventIngestConfig.class));
assertThat(factory.create(), instanceOf(BulkProcessor2.class));
}

public void testMaxBulkActions() throws InterruptedException {
public void testConfigValueAreUsed() throws Exception {
TimeValue flushDelay = TimeValue.parseTimeValue(randomTimeValue(), "random time value");
int maxBulkActions = randomIntBetween(1, 10);
int totalEvents = randomIntBetween(1, 5) * maxBulkActions + randomIntBetween(1, maxBulkActions);
int numberOfRetries = between(0, 5);
ByteSizeValue maxBytesInFlight = randomByteSizeValue();

AnalyticsEventIngestConfig config = mock(AnalyticsEventIngestConfig.class);
doReturn(flushDelay).when(config).flushDelay();
doReturn(maxBulkActions).when(config).maxNumberOfEventsPerBulk();
doReturn(ByteSizeValue.ofMb(10)).when(config).maxBytesInFlight();

Client client = mock(Client.class);
InOrder inOrder = Mockito.inOrder(client);

doReturn(testThreadPool).when(client).threadPool();
BulkProcessor2 bulkProcessor = new BulkProcessorFactory(client, config).create();

for (int i = 0; i < totalEvents; i++) {
bulkProcessor.add(mock(IndexRequest.class));
}

inOrder.verify(client, times(totalEvents / maxBulkActions)).execute(any(BulkAction.class), argThat((BulkRequest bulkRequest) -> {
// Verify a bulk is executed immediately with maxNumberOfEventsPerBulk is reached.
assertThat(bulkRequest.numberOfActions(), equalTo(maxBulkActions));
return true;
}), any());

bulkProcessor.awaitClose(1, TimeUnit.SECONDS);

if (totalEvents % maxBulkActions > 0) {
inOrder.verify(client).execute(any(BulkAction.class), argThat((BulkRequest bulkRequest) -> {
// Verify another bulk with only 1 event (the remaining) is executed when closing the processor.
assertThat(bulkRequest.numberOfActions(), equalTo(totalEvents % maxBulkActions));
return true;
}), any());
}
}

public void testMaxRetries() {
int numberOfRetries = between(0, 5);
AnalyticsEventIngestConfig config = mock(AnalyticsEventIngestConfig.class);
doReturn(1).when(config).maxNumberOfEventsPerBulk();
doReturn(maxBytesInFlight).when(config).maxBytesInFlight();
doReturn(numberOfRetries).when(config).maxNumberOfRetries();
doReturn(ByteSizeValue.ofMb(10)).when(config).maxBytesInFlight();

Client client = mock(Client.class);
doAnswer(i -> {
i.getArgument(2, ActionListener.class).onFailure(new ElasticsearchStatusException("", RestStatus.TOO_MANY_REQUESTS));
return null;
}).when(client).execute(any(), any(), any());
doReturn(testThreadPool).when(client).threadPool();
BulkProcessor2 bulkProcessor = new BulkProcessorFactory(client, config).create();
BulkProcessor2.Builder baseBuilder = spy(BulkProcessor2.builder(mock(), new BulkProcessorFactory.BulkProcessorListener(), mock()));
BulkProcessorFactory factory = new BulkProcessorFactory(config, () -> baseBuilder);

IndexRequest indexRequest = mock(IndexRequest.class);
bulkProcessor.add(indexRequest);
assertThat(factory.create(), instanceOf(BulkProcessor2.class));

verify(client, times(numberOfRetries + 1)).execute(any(BulkAction.class), argThat((BulkRequest bulkRequest) -> {
assertThat(bulkRequest.numberOfActions(), equalTo(1));
assertThat(bulkRequest.requests().stream().findFirst().get(), equalTo(indexRequest));
return true;
}), any());
verify(baseBuilder).setFlushInterval(eq(flushDelay));
verify(baseBuilder).setBulkActions(eq(maxBulkActions));
verify(baseBuilder).setMaxNumberOfRetries(numberOfRetries);
verify(baseBuilder).setMaxBytesInFlight(maxBytesInFlight);
}
}

0 comments on commit 81ac77f

Please sign in to comment.