Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Sep 6, 2024
1 parent 7399f51 commit 506107a
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Assert;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.equalToIgnoringCase;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.oneOf;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ClusterStatsRemoteIT extends AbstractMultiClustersTestCase {
private static final String REMOTE1 = "cluster-a";
private static final String REMOTE2 = "cluster-b";

private static final String INDEX_NAME = "demo";

@Override
protected boolean reuseClusters() {
return false;
}

@Override
protected Collection<String> remoteClusterAlias() {
return List.of(REMOTE1, REMOTE2);
}

@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
return Map.of(REMOTE1, false, REMOTE2, true);
}

public void testRemoteClusterStats() throws ExecutionException, InterruptedException {
setupClusters();
final Client client = client(LOCAL_CLUSTER);
SearchRequest searchRequest = new SearchRequest("*", "*:*");
searchRequest.allowPartialSearchResults(false);
searchRequest.setCcsMinimizeRoundtrips(randomBoolean());
searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(10));

// do a search
assertResponse(cluster(LOCAL_CLUSTER).client().search(searchRequest), Assert::assertNotNull);
// collect stats without remotes
ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get();
assertNotNull(response.getCcsMetrics());
var remotesUsage = response.getCcsMetrics().getByRemoteCluster();
assertThat(remotesUsage.size(), equalTo(3));
assertNull(response.getRemoteClustersStats());
// collect stats with remotes
response = client.admin().cluster().prepareClusterStatsWithRemotes().get();
assertNotNull(response.getCcsMetrics());
remotesUsage = response.getCcsMetrics().getByRemoteCluster();
assertThat(remotesUsage.size(), equalTo(3));
assertNotNull(response.getRemoteClustersStats());
var remoteStats = response.getRemoteClustersStats();
assertThat(remoteStats.size(), equalTo(2));
for (String clusterAlias : remoteClusterAlias()) {
assertThat(remoteStats, hasKey(clusterAlias));
assertThat(remotesUsage, hasKey(clusterAlias));
assertThat(remoteStats.get(clusterAlias).getStatus(), equalToIgnoringCase(ClusterHealthStatus.GREEN.name()));
assertThat(remoteStats.get(clusterAlias).getIndicesCount(), greaterThan(0L));
assertThat(remoteStats.get(clusterAlias).getNodesCount(), greaterThan(0L));
assertThat(remoteStats.get(clusterAlias).getShardsCount(), greaterThan(0L));
assertThat(remoteStats.get(clusterAlias).getHeapBytes(), greaterThan(0L));
assertThat(remoteStats.get(clusterAlias).getMemBytes(), greaterThan(0L));
assertThat(remoteStats.get(clusterAlias).getIndicesBytes(), greaterThan(0L));
assertThat(remoteStats.get(clusterAlias).getVersions(), hasItem(Version.CURRENT.toString()));
assertThat(remoteStats.get(clusterAlias).getClusterUUID(), not(equalTo("")));
assertThat(remoteStats.get(clusterAlias).getMode(), oneOf("sniff", "proxy"));
}
assertFalse(remoteStats.get(REMOTE1).isSkipUnavailable());
assertTrue(remoteStats.get(REMOTE2).isSkipUnavailable());
}

private void setupClusters() {
int numShardsLocal = randomIntBetween(2, 10);
Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build();
assertAcked(
client(LOCAL_CLUSTER).admin()
.indices()
.prepareCreate(INDEX_NAME)
.setSettings(localSettings)
.setMapping("@timestamp", "type=date", "f", "type=text")
);
indexDocs(client(LOCAL_CLUSTER));

int numShardsRemote = randomIntBetween(2, 10);
for (String clusterAlias : remoteClusterAlias()) {
final InternalTestCluster remoteCluster = cluster(clusterAlias);
remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3));
assertAcked(
client(clusterAlias).admin()
.indices()
.prepareCreate(INDEX_NAME)
.setSettings(indexSettings(numShardsRemote, randomIntBetween(0, 1)))
.setMapping("@timestamp", "type=date", "f", "type=text")
);
assertFalse(
client(clusterAlias).admin()
.cluster()
.prepareHealth(INDEX_NAME)
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(10))
.get()
.isTimedOut()
);
indexDocs(client(clusterAlias));
}

}

private void indexDocs(Client client) {
int numDocs = between(5, 20);
for (int i = 0; i < numDocs; i++) {
client.prepareIndex(INDEX_NAME).setSource("f", "v", "@timestamp", randomNonNegativeLong()).get();
}
client.admin().indices().prepareRefresh(INDEX_NAME).get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ public class ClusterStatsRequestBuilder extends NodesOperationRequestBuilder<
public ClusterStatsRequestBuilder(ElasticsearchClient client) {
super(client, TransportClusterStatsAction.TYPE, new ClusterStatsRequest());
}
public ClusterStatsRequestBuilder(ElasticsearchClient client, boolean doRemotes) {
super(client, TransportClusterStatsAction.TYPE, new ClusterStatsRequest(doRemotes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class ClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResp
final ClusterHealthStatus status;
final ClusterSnapshotStats clusterSnapshotStats;
final RepositoryUsageStats repositoryUsageStats;

final CCSTelemetrySnapshot ccsMetrics;
final long timestamp;
final String clusterUUID;
Expand Down Expand Up @@ -105,6 +104,10 @@ public CCSTelemetrySnapshot getCcsMetrics() {
return ccsMetrics;
}

public Map<String, RemoteClusterStats> getRemoteClustersStats() {
return remoteClustersStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
TransportAction.localOnly();
Expand Down Expand Up @@ -202,6 +205,54 @@ public RemoteClusterStats(
this.memBytes = 0;
}
}
public String getClusterUUID() {
return clusterUUID;
}

public String getMode() {
return mode;
}

public boolean isSkipUnavailable() {
return skipUnavailable;
}

public String getTransportCompress() {
return transportCompress;
}

public Set<String> getVersions() {
return versions;
}

public String getStatus() {
return status;
}

public long getNodesCount() {
return nodesCount;
}

public long getShardsCount() {
return shardsCount;
}

public long getIndicesCount() {
return indicesCount;
}

public long getIndicesBytes() {
return indicesBytes;
}

public long getHeapBytes() {
return heapBytes;
}

public long getMemBytes() {
return memBytes;
}


@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private ActionFuture<Map<String, RemoteClusterStatsResponse>> getStatsFromRemote
}

// TODO: make correct pool
final var remoteClientResponseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT);
final var remoteClientResponseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION);
var remotes = remoteClusterService.getRegisteredRemoteClusterNames();

var remotesFuture = new PlainActionFuture<Map<String, RemoteClusterStatsResponse>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ public ClusterStatsRequestBuilder prepareClusterStats() {
return new ClusterStatsRequestBuilder(this);
}

public ClusterStatsRequestBuilder prepareClusterStatsWithRemotes() {
return new ClusterStatsRequestBuilder(this, true);
}

public ActionFuture<NodesStatsResponse> nodesStats(final NodesStatsRequest request) {
return execute(TransportNodesStatsAction.TYPE, request);
}
Expand Down

0 comments on commit 506107a

Please sign in to comment.