Skip to content

Commit

Permalink
Merge branch 'main' into feature/update-qrs-query
Browse files Browse the repository at this point in the history
* main:
  更新havenask支持分布式的镜像到0.3版本 (alibaba#351)
  增加merge参数,可以支持merge存量的segment (alibaba#350)
  search接口对分布式的适配 (alibaba#348)
  优化shard级别变更时更新qrs的判断逻辑 (alibaba#349)
  • Loading branch information
weizijun committed Dec 11, 2023
2 parents 22953e1 + 4fd833b commit 69f91da
Show file tree
Hide file tree
Showing 17 changed files with 1,300 additions and 49 deletions.
2 changes: 1 addition & 1 deletion elastic-fed/buildSrc/version.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
havenask = 1.0.0
lucene = 8.7.0
runtime_image = 0.2
runtime_image = 0.3

bundled_jdk_vendor = openjdk
bundled_jdk = 11.0.2+9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class SqlResponse {
private final double totalTime;
private final boolean hasSoftFailure;
private final double coveredPercent;
private final int rowCount;
private final SqlResult sqlResult;
private final ErrorInfo errorInfo;
Expand Down Expand Up @@ -70,9 +71,16 @@ public int GetErrorCode() {
}
}

public SqlResponse(double totalTime, boolean hasSoftFailure, int rowCount, SqlResult sqlResult, ErrorInfo errorInfo) {
public SqlResponse(
double totalTime,
boolean hasSoftFailure,
double coveredPercent,
int rowCount,
SqlResult sqlResult,
ErrorInfo errorInfo) {
this.totalTime = totalTime;
this.hasSoftFailure = hasSoftFailure;
this.coveredPercent = coveredPercent;
this.rowCount = rowCount;
this.sqlResult = sqlResult;
this.errorInfo = errorInfo;
Expand All @@ -86,6 +94,10 @@ public boolean isHasSoftFailure() {
return hasSoftFailure;
}

public double getCoveredPercent() {
return coveredPercent;
}

public int getRowCount() {
return rowCount;
}
Expand All @@ -102,6 +114,7 @@ public static SqlResponse fromXContent(XContentParser parser) throws IOException
XContentParser.Token token;
double totalTime = 0;
boolean hasSoftFailure = false;
double coveredPercent = 0;
int rowCount = 0;
SqlResult sqlResult = null;
ErrorInfo errorInfo = null;
Expand All @@ -116,6 +129,9 @@ public static SqlResponse fromXContent(XContentParser parser) throws IOException
case "has_soft_failure":
hasSoftFailure = parser.booleanValue();
break;
case "covered_percent":
coveredPercent = parser.doubleValue();
break;
case "row_count":
rowCount = parser.intValue();
break;
Expand Down Expand Up @@ -200,7 +216,7 @@ public static SqlResponse fromXContent(XContentParser parser) throws IOException
}
}
}
return new SqlResponse(totalTime, hasSoftFailure, rowCount, sqlResult, errorInfo);
return new SqlResponse(totalTime, hasSoftFailure, coveredPercent, rowCount, sqlResult, errorInfo);
}

public static SqlResponse parse(String strResponse) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.junit.Before;

public abstract class AbstractHavenaskRestTestCase extends HavenaskRestTestCase {
public static final String NUMBER_OF_SHARDS = "number_of_shards";
public static final String NUMBER_OF_REPLICAS = "number_of_replicas";
private static RestHighLevelClient restHighLevelClient;

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package org.havenask.engine;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -47,10 +49,11 @@
public class SearchIT extends AbstractHavenaskRestTestCase {
// static logger
private static final Logger logger = LogManager.getLogger(SearchIT.class);
private static final String[] SearchITIndices = { "single_shard_test", "multi_shard_test", "multi_vector_test" };
private static final int TEST_SINGLE_SHARD_KNN_INDEX_POS = 0;
private static final int TEST_MULTI_SHARD_KNN_INDEX_POS = 1;
private static final int TEST_MULTI_KNN_QUERY_INDEX_POS = 2;
private static final String[] SearchITIndices = { "search_test", "single_shard_test", "multi_shard_test", "multi_vector_test" };
private static final int TEST_SEARCH_INDEX_POS = 0;
private static final int TEST_SINGLE_SHARD_KNN_INDEX_POS = 1;
private static final int TEST_MULTI_SHARD_KNN_INDEX_POS = 2;
private static final int TEST_MULTI_KNN_QUERY_INDEX_POS = 3;

@AfterClass
public static void cleanIndices() {
Expand All @@ -66,6 +69,60 @@ public static void cleanIndices() {
}
}

public void testSearch() throws Exception {
String index = SearchITIndices[TEST_SEARCH_INDEX_POS];
int dataNum = 3;
double delta = 0.00001;
// create index
Settings settings = Settings.builder()
.put(EngineSettings.ENGINE_TYPE_SETTING.getKey(), EngineSettings.ENGINE_HAVENASK)
.put(NUMBER_OF_SHARDS, 2)
.put(NUMBER_OF_REPLICAS, 0)
.build();

java.util.Map<String, ?> map = Map.of(
"properties",
Map.of("seq", Map.of("type", "integer"), "content", Map.of("type", "keyword"), "time", Map.of("type", "date"))
);
assertTrue(createTestIndex(index, settings, map));

waitIndexGreen(index);

// PUT docs
String[] idList = { "1", "2", "3" };
java.util.List<java.util.Map<String, ?>> sourceList = new ArrayList<>();
sourceList.add(Map.of("seq", 1, "content", "欢迎使用1", "time", "20230718"));
sourceList.add(Map.of("seq", 2, "content", "欢迎使用2", "time", "20230717"));
sourceList.add(Map.of("seq", 3, "content", "欢迎使用3", "time", "20230716"));
for (int i = 0; i < idList.length; i++) {
putDoc(index, idList[i], sourceList.get(i));
}

// get data with _search API
SearchRequest searchRequest = new SearchRequest(index);

assertBusy(() -> {
SearchResponse searchResponse = highLevelClient().search(searchRequest, RequestOptions.DEFAULT);
assertEquals(dataNum, searchResponse.getHits().getTotalHits().value);
}, 10, TimeUnit.SECONDS);
SearchResponse searchResponse = highLevelClient().search(searchRequest, RequestOptions.DEFAULT);
assertEquals(dataNum, searchResponse.getHits().getTotalHits().value);

Set<Integer> expectedSeq = Set.of(1, 2, 3);
Set<String> expectedContent = Set.of("欢迎使用1", "欢迎使用2", "欢迎使用3");
Set<String> expectedTime = Set.of("20230718", "20230717", "20230716");
for (int i = 0; i < dataNum; i++) {
assertEquals(index, searchResponse.getHits().getHits()[i].getIndex());
assertEquals(1.0, searchResponse.getHits().getHits()[i].getScore(), delta);
assertTrue(expectedSeq.contains(searchResponse.getHits().getHits()[i].getSourceAsMap().get("seq")));
assertTrue(expectedContent.contains(searchResponse.getHits().getHits()[i].getSourceAsMap().get("content")));
assertTrue(expectedTime.contains(searchResponse.getHits().getHits()[i].getSourceAsMap().get("time")));
}

// delete index and HEAD index
deleteAndHeadIndex(index);
}

public void testSingleShardKnn() throws Exception {
String index = SearchITIndices[TEST_SINGLE_SHARD_KNN_INDEX_POS];
String fieldName = "image";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -40,13 +39,15 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.cluster.ClusterChangedEvent;
import org.havenask.cluster.ClusterState;
import org.havenask.cluster.ClusterStateApplier;
import org.havenask.cluster.metadata.IndexMetadata;
import org.havenask.cluster.node.DiscoveryNode;
import org.havenask.cluster.routing.IndexRoutingTable;
import org.havenask.cluster.routing.RoutingNode;
import org.havenask.cluster.routing.ShardRouting;
import org.havenask.cluster.service.ClusterService;
Expand All @@ -66,6 +67,7 @@
import org.havenask.engine.rpc.UpdateHeartbeatTargetRequest;
import org.havenask.engine.util.RangeUtil;
import org.havenask.engine.util.Utils;
import org.havenask.index.Index;
import org.havenask.threadpool.ThreadPool;

public class MetaDataSyncer extends AbstractLifecycleComponent implements ClusterStateApplier {
Expand Down Expand Up @@ -317,20 +319,24 @@ protected String getThreadPool() {

@Override
public void applyClusterState(ClusterChangedEvent event) {
if (isIngestNode && shouldUpdateQrs(event.previousState(), event.state())) {
// update qrs target
setQrsPendingSync();
try {
if (isIngestNode && shouldUpdateQrs(event)) {
// update qrs target
setQrsPendingSync();
}
} catch (Throwable e) {
LOGGER.error("error when update qrs target: ", e);
}
}

private boolean shouldUpdateQrs(ClusterState prevClusterState, ClusterState curClusterState) {
private boolean shouldUpdateQrs(ClusterChangedEvent event) {
// check 是否有索引级别的增删
if (isHavenaskIndexChanged(prevClusterState, curClusterState)) {
if (isHavenaskIndexChanged(event)) {
return true;
}

// TODO: check 分片的搬迁是否要更新qrs
if (isHavenaskShardChanged(prevClusterState, curClusterState)) {
// check shard级别的变更
if (isHavenaskShardChanged(event.previousState(), event.state())) {
return true;
}

Expand Down Expand Up @@ -611,45 +617,38 @@ private synchronized void generateDefaultBizConfig(List<String> indexList) throw
);
}

private boolean isHavenaskIndexChanged(ClusterState prevClusterState, ClusterState curClusterState) {
Set<String> prevIndexNamesSet = new HashSet<>(Arrays.asList(prevClusterState.metadata().indices().keys().toArray(String.class)));
Set<String> currentIndexNamesSet = new HashSet<>(Arrays.asList(curClusterState.metadata().indices().keys().toArray(String.class)));
Set<String> prevDiff = new HashSet<>(prevIndexNamesSet);
Set<String> curDiff = new HashSet<>(currentIndexNamesSet);
prevDiff.removeAll(currentIndexNamesSet);
curDiff.removeAll(prevIndexNamesSet);

for (String indexName : prevDiff) {
IndexMetadata indexMetadata = prevClusterState.metadata().index(indexName);
private boolean isHavenaskIndexChanged(ClusterChangedEvent event) {
List<Index> indicesDeleted = event.indicesDeleted();
List<String> indicesCreated = event.indicesCreated();
for (Index index : indicesDeleted) {
IndexMetadata indexMetadata = event.previousState().getMetadata().index(index);
if (EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) {
return true;
}
}

for (String indexName : curDiff) {
IndexMetadata indexMetadata = curClusterState.metadata().index(indexName);
for (String index : indicesCreated) {
IndexMetadata indexMetadata = event.state().metadata().index(index);
if (EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) {
return true;
}
}

return false;
}

private boolean isHavenaskShardChanged(ClusterState prevClusterState, ClusterState curClusterState) {
// TODO : 识别shard搬迁的case
for (RoutingNode routingNode : prevClusterState.getRoutingNodes()) {
for (ShardRouting shardRouting : routingNode) {
IndexMetadata indexMetadata = prevClusterState.metadata().index(shardRouting.index().getName());
if (false == EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) {
continue;
}
ShardRouting curShardRouting = curClusterState.getRoutingNodes()
.node(routingNode.nodeId())
.getByShardId(shardRouting.shardId());
if (curShardRouting == null || curShardRouting.getTargetRelocatingShard() != null) {
return true;
}
for (ObjectCursor<String> indexNameCursor : prevClusterState.routingTable().indicesRouting().keys()) {
String indexName = indexNameCursor.value;
IndexMetadata indexMetadata = prevClusterState.metadata().index(indexName);
if (false == EngineSettings.isHavenaskEngine(indexMetadata.getSettings())) {
continue;
}
IndexRoutingTable prevIndexRoutingTable = prevClusterState.routingTable().indicesRouting().get(indexName);
IndexRoutingTable curIndexRoutingTable = curClusterState.routingTable().indicesRouting().get(indexName);

// TODO: shard级别的判断变更逻辑,目前使用IndexRoutingTable的equals方法,比较index以及shards是否相等,考虑后续优化
if (false == prevIndexRoutingTable.equals(curIndexRoutingTable)) {
return true;
}
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ public SortConfig(String sort_field, String sort_pattern) {

public static class MergeConfig {
public String merge_strategy = "combined";
public MergeStrategyConfig merge_strategy_params = new MergeStrategyConfig();
}

public static class MergeStrategyConfig {
public String input_limits = "max-segment-size=5120";
public String strategy_conditions = "priority-feature=valid-doc-count#asc;conflict-segment-count=10;conflict-delete-percent=30";
public String output_limits = "max-merged-segment-size=13312;max-total-merged-size=15360;"
+ "max-small-segment-count=10;merge-size-upperbound=256;merge-size-lowerbound=64";
}

public static class BuildConfig {
Expand Down
Loading

0 comments on commit 69f91da

Please sign in to comment.