Skip to content

Commit

Permalink
Remove redudant children of BroadcastResponse (elastic#104410)
Browse files Browse the repository at this point in the history
A couple of children of `BroadCastResponse` are completely redundant,
adding no extra fields or separate serialization.
Removed them and replaced their use by the broadcast response itself.
  • Loading branch information
original-brownbear authored Jan 16, 2024
1 parent af50962 commit 51caf17
Show file tree
Hide file tree
Showing 96 changed files with 293 additions and 655 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
Expand All @@ -30,6 +29,7 @@
import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
Expand Down Expand Up @@ -313,7 +313,7 @@ public void testAutomaticForceMerge() throws Exception {
for (int i = 0; i < randomIntBetween(10, 50); i++) {
indexDocs(dataStreamName, randomIntBetween(1, 300));
// Make sure the segments get written:
FlushResponse flushResponse = indicesAdmin().flush(new FlushRequest(toBeRolledOverIndex)).actionGet();
BroadcastResponse flushResponse = indicesAdmin().flush(new FlushRequest(toBeRolledOverIndex)).actionGet();
assertThat(flushResponse.getStatus(), equalTo(RestStatus.OK));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockAction;
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
Expand All @@ -33,6 +32,7 @@
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -1168,7 +1168,7 @@ private void forceMergeIndex(ForceMergeRequest forceMergeRequest, ActionListener
logger.info("Data stream lifecycle is issuing a request to force merge index [{}]", targetIndex);
client.admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() {
@Override
public void onResponse(ForceMergeResponse forceMergeResponse) {
public void onResponse(BroadcastResponse forceMergeResponse) {
if (forceMergeResponse.getFailedShards() > 0) {
DefaultShardOperationFailedException[] failures = forceMergeResponse.getShardFailures();
String message = Strings.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
Expand All @@ -27,6 +26,7 @@
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -578,7 +578,7 @@ public void testForceMerge() throws Exception {
// We want this test method to get fake force merge responses, because this is what triggers a cluster state update
clientDelegate = (action, request, listener) -> {
if (action.name().equals("indices:admin/forcemerge")) {
listener.onResponse(new ForceMergeResponse(5, 5, 0, List.of()));
listener.onResponse(new BroadcastResponse(5, 5, 0, List.of()));
}
};
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand Down Expand Up @@ -748,7 +748,7 @@ public void testForceMergeRetries() throws Exception {
clientDelegate = (action, request, listener) -> {
if (action.name().equals("indices:admin/forcemerge")) {
listener.onResponse(
new ForceMergeResponse(
new BroadcastResponse(
5,
5,
1,
Expand Down Expand Up @@ -779,7 +779,7 @@ public void testForceMergeRetries() throws Exception {
AtomicInteger forceMergeFailedCount = new AtomicInteger(0);
clientDelegate = (action, request, listener) -> {
if (action.name().equals("indices:admin/forcemerge")) {
listener.onResponse(new ForceMergeResponse(5, 4, 0, List.of()));
listener.onResponse(new BroadcastResponse(5, 4, 0, List.of()));
forceMergeFailedCount.incrementAndGet();
}
};
Expand All @@ -800,7 +800,7 @@ public void testForceMergeRetries() throws Exception {
// For the final data stream lifecycle run, we let forcemerge run normally
clientDelegate = (action, request, listener) -> {
if (action.name().equals("indices:admin/forcemerge")) {
listener.onResponse(new ForceMergeResponse(5, 5, 0, List.of()));
listener.onResponse(new BroadcastResponse(5, 5, 0, List.of()));
}
};
dataStreamLifecycleService.run(clusterService.state());
Expand Down Expand Up @@ -900,7 +900,7 @@ public void testForceMergeDedup() throws Exception {
setState(clusterService, state);
clientDelegate = (action, request, listener) -> {
if (action.name().equals("indices:admin/forcemerge")) {
listener.onResponse(new ForceMergeResponse(5, 5, 0, List.of()));
listener.onResponse(new BroadcastResponse(5, 5, 0, List.of()));
}
};
for (int i = 0; i < 100; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -178,28 +177,34 @@ public int read() throws IOException {
}

public void testIndexChunksNoData() throws IOException {
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<FlushResponse> flushResponseActionListener) -> {
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
flushResponseActionListener.onResponse(mock(FlushResponse.class));
});
client.addHandler(RefreshAction.INSTANCE, (RefreshRequest request, ActionListener<RefreshResponse> flushResponseActionListener) -> {
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
flushResponseActionListener.onResponse(mock(RefreshResponse.class));
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
});
client.addHandler(
RefreshAction.INSTANCE,
(RefreshRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
}
);

InputStream empty = new ByteArrayInputStream(new byte[0]);
assertEquals(0, geoIpDownloader.indexChunks("test", empty, 0, "d41d8cd98f00b204e9800998ecf8427e", 0));
}

public void testIndexChunksMd5Mismatch() {
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<FlushResponse> flushResponseActionListener) -> {
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
flushResponseActionListener.onResponse(mock(FlushResponse.class));
});
client.addHandler(RefreshAction.INSTANCE, (RefreshRequest request, ActionListener<RefreshResponse> flushResponseActionListener) -> {
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
flushResponseActionListener.onResponse(mock(RefreshResponse.class));
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
});
client.addHandler(
RefreshAction.INSTANCE,
(RefreshRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
}
);

IOException exception = expectThrows(
IOException.class,
Expand Down Expand Up @@ -232,14 +237,17 @@ public void testIndexChunks() throws IOException {
assertEquals(chunk + 15, source.get("chunk"));
listener.onResponse(mock(IndexResponse.class));
});
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<FlushResponse> flushResponseActionListener) -> {
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
flushResponseActionListener.onResponse(mock(FlushResponse.class));
});
client.addHandler(RefreshAction.INSTANCE, (RefreshRequest request, ActionListener<RefreshResponse> flushResponseActionListener) -> {
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
flushResponseActionListener.onResponse(mock(RefreshResponse.class));
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
});
client.addHandler(
RefreshAction.INSTANCE,
(RefreshRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
}
);

InputStream big = new ByteArrayInputStream(bigArray);
assertEquals(17, geoIpDownloader.indexChunks("test", big, 15, "a67563dfa8f3cba8b8cff61eb989a749", 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
Expand All @@ -24,6 +23,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -554,9 +554,9 @@ void refreshAndFinish(List<Failure> indexingFailures, List<SearchFailure> search
RefreshRequest refresh = new RefreshRequest();
refresh.indices(destinationIndices.toArray(new String[destinationIndices.size()]));
logger.debug("[{}]: refreshing", task.getId());
bulkClient.admin().indices().refresh(refresh, new ActionListener<RefreshResponse>() {
bulkClient.admin().indices().refresh(refresh, new ActionListener<>() {
@Override
public void onResponse(RefreshResponse response) {
public void onResponse(BroadcastResponse response) {
finishHim(null, indexingFailures, searchFailures, timedOut);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import com.sun.net.httpserver.HttpHandler;

import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -191,7 +191,7 @@ public void testAbortRequestStats() throws Exception {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
BroadcastResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(prepareSearch(index).setSize(0).setTrackTotalHits(true), nbDocs);

Expand Down Expand Up @@ -234,7 +234,7 @@ public void testMetrics() throws Exception {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
BroadcastResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(prepareSearch(index).setSize(0).setTrackTotalHits(true), nbDocs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.apache.lucene.search.join.ScoreMode;
import org.apache.lucene.tests.util.TimeUnits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -199,7 +199,7 @@ private void indexDocuments(String idPrefix) throws IOException, InterruptedExce

assertTrue(latch.await(30, TimeUnit.SECONDS));

RefreshResponse refreshResponse = refresh(INDEX_NAME);
BroadcastResponse refreshResponse = refresh(INDEX_NAME);
ElasticsearchAssertions.assertNoFailures(refreshResponse);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.admin.indices.cache.clear;

import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;

Expand All @@ -33,7 +34,7 @@ public void testClearIndicesCacheWithBlocks() {
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) {
try {
enableIndexBlock("test", blockSetting);
ClearIndicesCacheResponse clearIndicesCacheResponse = indicesAdmin().prepareClearCache("test")
BroadcastResponse clearIndicesCacheResponse = indicesAdmin().prepareClearCache("test")
.setFieldDataCache(true)
.setQueryCache(true)
.setFieldDataCache(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;

Expand Down Expand Up @@ -44,7 +45,7 @@ public void testFlushWithBlocks() {
)) {
try {
enableIndexBlock("test", blockSetting);
FlushResponse response = indicesAdmin().prepareFlush("test").get();
BroadcastResponse response = indicesAdmin().prepareFlush("test").get();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.admin.indices.forcemerge;

import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
Expand Down Expand Up @@ -50,7 +51,7 @@ public void testForceMergeWithBlocks() {
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY_ALLOW_DELETE)) {
try {
enableIndexBlock("test", blockSetting);
ForceMergeResponse response = indicesAdmin().prepareForceMerge("test").get();
BaseBroadcastResponse response = indicesAdmin().prepareForceMerge("test").get();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
} finally {
Expand All @@ -70,7 +71,7 @@ public void testForceMergeWithBlocks() {

// Merging all indices is blocked when the cluster is read-only
try {
ForceMergeResponse response = indicesAdmin().prepareForceMerge().get();
BaseBroadcastResponse response = indicesAdmin().prepareForceMerge().get();
assertNoFailures(response);
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));

Expand Down
Loading

0 comments on commit 51caf17

Please sign in to comment.