Skip to content

Commit

Permalink
Merge branch 'main' into feature/support-unpublish_part_ids
Browse files Browse the repository at this point in the history
* main:
  适配之前分布式架构在多节点情况下的search接口 (alibaba#413)
  优化search接口对于object对象fieldname的解析 (alibaba#411)
  增加分片路由相关的javaRestTest (alibaba#410)
  适配分布式相关的javaRestTest (alibaba#409)
  修改main接口,适配esrally压测 (alibaba#408)
  查询doc count时如果返回空行,则返回docCount为0 (alibaba#407)
  • Loading branch information
weizijun committed Jan 5, 2024
2 parents 6c3379a + ca087ee commit 5b3a7e0
Show file tree
Hide file tree
Showing 18 changed files with 1,090 additions and 365 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ private HighLevelClient(RestClient restClient) {
public boolean clusterIsSingleNode() throws IOException {
ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
.health(new ClusterHealthRequest(), RequestOptions.DEFAULT);
return clusterHealthResponse.getNumberOfNodes() == 1;
return clusterHealthResponse.getNumberOfDataNodes() == 1;
}

public boolean clusterIsMultiNodes() throws IOException {
ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
.health(new ClusterHealthRequest(), RequestOptions.DEFAULT);
return clusterHealthResponse.getNumberOfNodes() >= 2;
return clusterHealthResponse.getNumberOfDataNodes() >= 2;
}

protected void waitIndexGreen(String index) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
Expand All @@ -33,7 +35,6 @@
import org.havenask.client.indices.CreateIndexRequest;
import org.havenask.client.indices.GetIndexRequest;
import org.havenask.client.indices.GetIndexResponse;
import org.havenask.cluster.health.ClusterHealthStatus;
import org.havenask.cluster.metadata.MappingMetadata;
import org.havenask.common.collect.Map;
import org.havenask.common.compress.CompressedXContent;
Expand All @@ -49,20 +50,12 @@ public class BasicIT extends AbstractHavenaskRestTestCase {

// static logger
private static final Logger logger = LogManager.getLogger(BasicIT.class);
private static final String[] BasicITIndices = {
"index_crud",
"index_index_method",
"create_and_delete_same_index_test",
"create_and_delete_diff_index_test" };
private static final int TEST_CRUD_INDEX_POS = 0;
private static final int TEST_INDEX_METHOD_INDEX_POS = 1;
private static final int TEST_CREATE_AND_DELETE_SAME_INDEX_INDEX_POS = 2;
private static final int TEST_CREATE_AND_DELETE_DIFF_INDEX_INDEX_POS = 3;
private static Set<String> basicITIndices = new HashSet<>();

@AfterClass
public static void cleanIndices() {
try {
for (String index : BasicITIndices) {
for (String index : basicITIndices) {
if (highLevelClient().indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT)) {
highLevelClient().indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
logger.info("clean index {}", index);
Expand All @@ -73,61 +66,26 @@ public static void cleanIndices() {
}
}

public void testCRUD() throws Exception {
assumeTrue("number_of_nodes more than 1, Skip func: testCRUD()", clusterIsSingleNode());

String index = BasicITIndices[TEST_CRUD_INDEX_POS];
assertTrue(
highLevelClient().indices()
.create(
new CreateIndexRequest(index).settings(
Settings.builder()
.put(EngineSettings.ENGINE_TYPE_SETTING.getKey(), EngineSettings.ENGINE_HAVENASK)
.put("number_of_replicas", 0)
.build()
),
RequestOptions.DEFAULT
)
.isAcknowledged()
);

GetIndexResponse getIndexResponse = highLevelClient().indices().get(new GetIndexRequest(index), RequestOptions.DEFAULT);
assertEquals(getIndexResponse.getIndices().length, 1);
assertEquals(getIndexResponse.getSetting(index, EngineSettings.ENGINE_TYPE_SETTING.getKey()), EngineSettings.ENGINE_HAVENASK);
assertEquals(getIndexResponse.getSetting(index, "index.number_of_replicas"), "0");
assertEquals(getIndexResponse.getMappings().get(index), new MappingMetadata("_doc", Map.of("dynamic", "false")));

assertBusy(() -> {
ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
.health(new ClusterHealthRequest(index), RequestOptions.DEFAULT);
assertEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.GREEN);
}, 2, TimeUnit.MINUTES);

SqlClientInfoResponse sqlClientInfoResponse = highLevelClient().havenask()
.sqlClientInfo(new SqlClientInfoRequest(), RequestOptions.DEFAULT);
assertEquals(sqlClientInfoResponse.getErrorCode(), 0);
assertEquals(sqlClientInfoResponse.getErrorMessage(), "");

@SuppressWarnings("unchecked")
java.util.Map<String, Object> tables = (java.util.Map<String, Object>) ((java.util.Map<String, Object>) (((java.util.Map<
String,
Object>) (sqlClientInfoResponse.getResult().get("default"))).get("general"))).get("tables");
assertTrue(tables.containsKey(index));

assertTrue(highLevelClient().indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged());
}

// create index, get index, delete index, HEAD index and set mapping
public void testIndexMethod() throws Exception {
String index = BasicITIndices[TEST_INDEX_METHOD_INDEX_POS];
String index = "index_method_test";
basicITIndices.add(index);

ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
.health(new ClusterHealthRequest(), RequestOptions.DEFAULT);
int numberOfDataNodes = clusterHealthResponse.getNumberOfDataNodes();

int shardsNum = randomIntBetween(1, 6);
int replicasNum = randomIntBetween(0, numberOfDataNodes - 1);
// create index
assertTrue(
highLevelClient().indices()
.create(
new CreateIndexRequest(index).settings(
Settings.builder()
.put(EngineSettings.ENGINE_TYPE_SETTING.getKey(), EngineSettings.ENGINE_HAVENASK)
.put("number_of_replicas", 0)
.put("index.number_of_shards", shardsNum)
.put("number_of_replicas", replicasNum)
.build()
)
.mapping(
Expand All @@ -147,17 +105,15 @@ public void testIndexMethod() throws Exception {
)
.isAcknowledged()
);
assertBusy(() -> {
ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
.health(new ClusterHealthRequest(index), RequestOptions.DEFAULT);
assertEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.GREEN);
}, 2, TimeUnit.MINUTES);

waitIndexGreen(index);

// get index
GetIndexResponse getIndexResponse = highLevelClient().indices().get(new GetIndexRequest(index), RequestOptions.DEFAULT);
assertEquals(getIndexResponse.getIndices().length, 1);
assertEquals(getIndexResponse.getSetting(index, EngineSettings.ENGINE_TYPE_SETTING.getKey()), EngineSettings.ENGINE_HAVENASK);
assertEquals(getIndexResponse.getSetting(index, "index.number_of_replicas"), "0");
assertEquals(getIndexResponse.getSetting(index, "index.number_of_shards"), String.valueOf(shardsNum));
assertEquals(getIndexResponse.getSetting(index, "index.number_of_replicas"), String.valueOf(replicasNum));

MappingMetadata expectedMappingMetaData = new MappingMetadata(
"_doc",
Expand All @@ -173,36 +129,51 @@ public void testIndexMethod() throws Exception {
assertEquals(expectedMappingMetaData.routing(), resMappingMetaData.routing());
assertTrue(mappingsEquals(expectedMappingMetaData.source(), resMappingMetaData.source()));

assertBusy(() -> {
SqlClientInfoResponse sqlClientInfoResponse = highLevelClient().havenask()
.sqlClientInfo(new SqlClientInfoRequest(), RequestOptions.DEFAULT);
assertEquals(sqlClientInfoResponse.getErrorCode(), 0);
assertEquals(sqlClientInfoResponse.getErrorMessage(), "");
@SuppressWarnings("unchecked")
java.util.Map<String, Object> tables = (java.util.Map<String, Object>) ((java.util.Map<String, Object>) (((java.util.Map<
String,
Object>) (sqlClientInfoResponse.getResult().get("default"))).get("general"))).get("tables");
assertTrue(tables.containsKey(index));
}, 10, TimeUnit.SECONDS);

// delete index and HEAD index
deleteAndHeadIndex(index);
}

public void testCreateAndDeleteSameIndex() throws Exception {
int randomTimes = randomIntBetween(2, 6);
for (int i = 0; i < randomTimes; i++) {
String index = "create_and_delete_same_index_test";
basicITIndices.add(index);

ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
.health(new ClusterHealthRequest(), RequestOptions.DEFAULT);
int numberOfDataNodes = clusterHealthResponse.getNumberOfDataNodes();

int shardsNum = randomIntBetween(1, 6);
String index = BasicITIndices[TEST_CREATE_AND_DELETE_SAME_INDEX_INDEX_POS];
int replicasNum = randomIntBetween(0, numberOfDataNodes - 1);
// create index
assertTrue(
highLevelClient().indices()
.create(
new CreateIndexRequest(index).settings(
Settings.builder()
.put(EngineSettings.ENGINE_TYPE_SETTING.getKey(), EngineSettings.ENGINE_HAVENASK)
// TODO 暂时只支持单shard
// .put("index.number_of_shards", shardsNum)
.put("number_of_replicas", 0)
.put("index.number_of_shards", shardsNum)
.put("number_of_replicas", replicasNum)
.build()
).mapping(Map.of("properties", Map.of("content" + i, Map.of("type", "keyword")))),
RequestOptions.DEFAULT
)
.isAcknowledged()
);
assertBusy(() -> {
ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
.health(new ClusterHealthRequest(index), RequestOptions.DEFAULT);
assertEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.GREEN);
}, 2, TimeUnit.MINUTES);

waitIndexGreen(index);

// GET index
assertEquals(true, highLevelClient().indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT));
Expand Down Expand Up @@ -235,31 +206,35 @@ public void testCreateAndDeleteSameIndex() throws Exception {
}

// delete index
assertTrue(highLevelClient().indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT).isAcknowledged());
assertEquals(false, highLevelClient().indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT));
deleteAndHeadIndex(index);
}
}

public void testCreateAndDeleteDiffIndex() throws Exception {
int randomNum = randomIntBetween(2, 6);
String baseName = BasicITIndices[TEST_CREATE_AND_DELETE_DIFF_INDEX_INDEX_POS];
int randomIndicesNum = randomIntBetween(3, 6);
String baseName = "create_and_delete_diff_index_test";
List<String> indices = new ArrayList<>();
for (int i = 0; i < randomNum; i++) {
for (int i = 0; i < randomIndicesNum; i++) {
indices.add(baseName + i);
basicITIndices.add(baseName + i);
}

ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
.health(new ClusterHealthRequest(), RequestOptions.DEFAULT);
int numberOfDataNodes = clusterHealthResponse.getNumberOfDataNodes();

// create indexs
for (int i = 0; i < randomNum; i++) {
for (int i = 0; i < randomIndicesNum; i++) {
int shardsNum = randomIntBetween(1, 6);
int replicasNum = randomIntBetween(0, numberOfDataNodes - 1);
assertTrue(
highLevelClient().indices()
.create(
new CreateIndexRequest(indices.get(i)).settings(
Settings.builder()
.put(EngineSettings.ENGINE_TYPE_SETTING.getKey(), EngineSettings.ENGINE_HAVENASK)
// TODO 暂时只支持单shard
// .put("index.number_of_shards", shardsNum)
.put("number_of_replicas", 0)
.put("index.number_of_shards", shardsNum)
.put("number_of_replicas", replicasNum)
.build()
).mapping(Map.of("properties", Map.of("content" + i, Map.of("type", "keyword")))),
RequestOptions.DEFAULT
Expand All @@ -268,17 +243,13 @@ public void testCreateAndDeleteDiffIndex() throws Exception {
);
}

for (int i = 0; i < randomNum; i++) {
for (int i = 0; i < randomIndicesNum; i++) {
String curIndex = indices.get(i);
assertBusy(() -> {
ClusterHealthResponse clusterHealthResponse = highLevelClient().cluster()
.health(new ClusterHealthRequest(curIndex), RequestOptions.DEFAULT);
assertEquals(clusterHealthResponse.getStatus(), ClusterHealthStatus.GREEN);
}, 2, TimeUnit.MINUTES);
waitIndexGreen(curIndex);
}

// get index
for (int i = 0; i < randomNum; i++) {
for (int i = 0; i < randomIndicesNum; i++) {
assertEquals(true, highLevelClient().indices().exists(new GetIndexRequest(indices.get(i)), RequestOptions.DEFAULT));
GetIndexResponse getIndexResponse = highLevelClient().indices()
.get(new GetIndexRequest(indices.get(i)), RequestOptions.DEFAULT);
Expand All @@ -290,7 +261,7 @@ public void testCreateAndDeleteDiffIndex() throws Exception {
}

// put and get doc
for (int i = 0; i < randomNum; i++) {
for (int i = 0; i < randomIndicesNum; i++) {
int randomDocNum = randomIntBetween(1, 4);
for (int j = 0; j < randomDocNum; j++) {
String curId = String.valueOf(i) + String.valueOf(j);
Expand All @@ -308,15 +279,13 @@ public void testCreateAndDeleteDiffIndex() throws Exception {
assertEquals(true, getResponse.isExists());
}, 10, TimeUnit.SECONDS);
GetResponse getResponse = highLevelClient().get(new GetRequest(curIndex, curId), RequestOptions.DEFAULT);
assertEquals(true, getResponse.isExists());
assertEquals("欢迎使用" + j, getResponse.getSourceAsMap().get("content" + i));
}
}

// delete index
for (int i = 0; i < randomNum; i++) {
assertTrue(highLevelClient().indices().delete(new DeleteIndexRequest(indices.get(i)), RequestOptions.DEFAULT).isAcknowledged());
assertEquals(false, highLevelClient().indices().exists(new GetIndexRequest(indices.get(i)), RequestOptions.DEFAULT));
for (int i = 0; i < randomIndicesNum; i++) {
deleteAndHeadIndex(indices.get(i));
}
}

Expand Down
Loading

0 comments on commit 5b3a7e0

Please sign in to comment.