From 22953e101bfbf31d3e88da9d95303cf855d3d038 Mon Sep 17 00:00:00 2001 From: weizijun Date: Mon, 11 Dec 2023 11:16:07 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AF=B7=E6=B1=82fed=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E6=9B=BF=E4=BB=A3=E7=9B=B4=E6=8E=A5=E8=AE=BF=E9=97=AEqrs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../havenask/engine/HavenaskEnginePlugin.java | 4 +- .../engine/index/engine/HavenaskEngine.java | 79 +++++++++---------- .../index/engine/HavenaskIndexSearcher.java | 27 +++---- 3 files changed, 54 insertions(+), 56 deletions(-) diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEnginePlugin.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEnginePlugin.java index d70d1dbf..2900b54e 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEnginePlugin.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/HavenaskEnginePlugin.java @@ -123,6 +123,7 @@ public class HavenaskEnginePlugin extends Plugin private final SetOnce nativeProcessControlServiceSetOnce = new SetOnce<>(); private final SetOnce searcherClientSetOnce = new SetOnce<>(); private final SetOnce qrsClientSetOnce = new SetOnce<>(); + private final SetOnce clientSetOnce = new SetOnce<>(); private final SetOnce searcherArpcClientSetOnce = new SetOnce<>(); private final SetOnce metaDataSyncerSetOnce = new SetOnce<>(); private final Settings settings; @@ -177,7 +178,7 @@ public Optional getEngineFactory(IndexSettings indexSettings) { engineConfig -> new HavenaskEngine( engineConfig, searcherClientSetOnce.get(), - qrsClientSetOnce.get(), + clientSetOnce.get(), searcherArpcClientSetOnce.get(), havenaskEngineEnvironmentSetOnce.get(), nativeProcessControlServiceSetOnce.get(), @@ -226,6 +227,7 @@ public Collection createComponents( qrsClientSetOnce.get() ); metaDataSyncerSetOnce.set(metaDataSyncer); + clientSetOnce.set(client); return Arrays.asList( nativeProcessControlServiceSetOnce.get(), diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskEngine.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskEngine.java index 93dcd6cb..f12826d8 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskEngine.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskEngine.java @@ -14,8 +14,26 @@ package org.havenask.engine.index.engine; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; + +import javax.management.MBeanTrustPermission; + import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; @@ -37,6 +55,7 @@ import org.apache.lucene.util.BytesRef; import org.havenask.HavenaskException; import org.havenask.action.bulk.BackoffPolicy; +import org.havenask.client.Client; import org.havenask.common.Nullable; import org.havenask.common.bytes.BytesArray; import org.havenask.common.bytes.BytesReference; @@ -51,13 +70,13 @@ import org.havenask.engine.NativeProcessControlService; import org.havenask.engine.index.mapper.VectorField; import org.havenask.engine.rpc.HavenaskClient; -import org.havenask.engine.rpc.QrsClient; -import org.havenask.engine.rpc.QrsSqlRequest; -import org.havenask.engine.rpc.QrsSqlResponse; import org.havenask.engine.rpc.SearcherClient; import org.havenask.engine.rpc.TargetInfo; import org.havenask.engine.rpc.WriteRequest; import org.havenask.engine.rpc.WriteResponse; +import org.havenask.engine.search.action.HavenaskSqlAction; +import org.havenask.engine.search.action.HavenaskSqlRequest; +import org.havenask.engine.search.action.HavenaskSqlResponse; import org.havenask.engine.util.JsonPrettyFormatter; import org.havenask.engine.util.RangeUtil; import org.havenask.engine.util.Utils; @@ -79,33 +98,16 @@ import org.havenask.index.translog.TranslogDeletionPolicy; import org.havenask.search.DefaultSearchContext; import org.havenask.search.internal.ContextIndexSearcher; -import suez.service.proto.ErrorCode; -import javax.management.MBeanTrustPermission; -import java.io.Closeable; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.LongConsumer; -import java.util.function.LongSupplier; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; -import static org.havenask.engine.search.rest.RestHavenaskSqlAction.SQL_DATABASE; +import suez.service.proto.ErrorCode; public class HavenaskEngine extends InternalEngine { private final HavenaskClient searcherHttpClient; - private final QrsClient qrsHttpClient; + private final Client client; private final SearcherClient searcherClient; private final HavenaskEngineEnvironment env; private final NativeProcessControlService nativeProcessControlService; @@ -123,7 +125,7 @@ public class HavenaskEngine extends InternalEngine { public HavenaskEngine( EngineConfig engineConfig, HavenaskClient searcherHttpClient, - QrsClient qrsHttpClient, + Client client, SearcherClient searcherClient, HavenaskEngineEnvironment env, NativeProcessControlService nativeProcessControlService, @@ -132,7 +134,7 @@ public HavenaskEngine( super(engineConfig); this.searcherHttpClient = searcherHttpClient; - this.qrsHttpClient = qrsHttpClient; + this.client = client; this.searcherClient = searcherClient; this.env = env; this.nativeProcessControlService = nativeProcessControlService; @@ -552,9 +554,8 @@ public GetResult get(Get get, BiFunction search tableName, get.id() ); - String kvpair = "format:full_json;timeout:10000;databaseName:" + SQL_DATABASE; - QrsSqlRequest request = new QrsSqlRequest(sql, kvpair); - QrsSqlResponse response = qrsHttpClient.executeSql(request); + + HavenaskSqlResponse response = client.execute(HavenaskSqlAction.INSTANCE, new HavenaskSqlRequest(sql, null)).actionGet(); JSONObject jsonObject = JsonPrettyFormatter.fromString(response.getResult()); JSONObject sqlResult = jsonObject.getJSONObject("sql_result"); JSONArray datas = sqlResult.getJSONArray("data"); @@ -815,9 +816,7 @@ long getDocCount() { long docCount = 0; try { String sql = String.format(Locale.ROOT, "select /*+ SCAN_ATTR(partitionIds='%d')*/ count(*) from %s", shardId.id(), tableName); - String kvpair = "format:full_json;timeout:10000;databaseName:" + SQL_DATABASE; - QrsSqlRequest request = new QrsSqlRequest(sql, kvpair); - QrsSqlResponse response = qrsHttpClient.executeSql(request); + HavenaskSqlResponse response = client.execute(HavenaskSqlAction.INSTANCE, new HavenaskSqlRequest(sql, null)).actionGet(); JSONObject jsonObject = JsonPrettyFormatter.fromString(response.getResult()); JSONObject sqlResult = jsonObject.getJSONObject("sql_result"); JSONArray datas = sqlResult.getJSONArray("data"); @@ -839,7 +838,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope, Function referenceManager = getReferenceManager(scope); HavenaskDirectoryReader acquire = referenceManager.acquire(); return new HavenaskSearcher( - qrsHttpClient, + client, shardId, source, acquire, @@ -867,7 +866,7 @@ public SearcherSupplier acquireSearcherSupplier(Function wra @Override public Searcher acquireSearcherInternal(String source) { return new HavenaskSearcher( - qrsHttpClient, + client, shardId, "search", acquire, @@ -899,11 +898,11 @@ public void doClose() { } public static class HavenaskSearcher extends Engine.Searcher { - private final QrsClient qrsHttpClient; + private final Client client; private final ShardId shardId; public HavenaskSearcher( - QrsClient qrsHttpClient, + Client client, ShardId shardId, String source, IndexReader reader, @@ -913,7 +912,7 @@ public HavenaskSearcher( Closeable onClose ) { super(source, reader, similarity, queryCache, queryCachingPolicy, onClose); - this.qrsHttpClient = qrsHttpClient; + this.client = client; this.shardId = shardId; } @@ -921,7 +920,7 @@ public HavenaskSearcher( public ContextIndexSearcher createContextIndexSearcher(DefaultSearchContext searchContext, boolean lowLevelCancellation) throws IOException { return new HavenaskIndexSearcher( - qrsHttpClient, + client, shardId, searchContext, getIndexReader(), diff --git a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskIndexSearcher.java b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskIndexSearcher.java index a5a88320..8fd615fa 100644 --- a/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskIndexSearcher.java +++ b/elastic-fed/modules/havenask-engine/src/main/java/org/havenask/engine/index/engine/HavenaskIndexSearcher.java @@ -14,6 +14,10 @@ package org.havenask.engine.index.engine; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexReader; @@ -26,12 +30,13 @@ import org.apache.lucene.search.TotalHits; import org.apache.lucene.search.TotalHits.Relation; import org.apache.lucene.search.similarities.Similarity; +import org.havenask.client.Client; import org.havenask.client.ha.SqlResponse; import org.havenask.common.Strings; import org.havenask.common.lucene.search.TopDocsAndMaxScore; -import org.havenask.engine.rpc.QrsClient; -import org.havenask.engine.rpc.QrsSqlRequest; -import org.havenask.engine.rpc.QrsSqlResponse; +import org.havenask.engine.search.action.HavenaskSqlAction; +import org.havenask.engine.search.action.HavenaskSqlRequest; +import org.havenask.engine.search.action.HavenaskSqlResponse; import org.havenask.engine.util.Utils; import org.havenask.index.shard.ShardId; import org.havenask.search.DefaultSearchContext; @@ -40,24 +45,18 @@ import org.havenask.search.internal.ReaderContext; import org.havenask.search.query.QuerySearchResult; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static org.havenask.engine.search.rest.RestHavenaskSqlAction.SQL_DATABASE; - public class HavenaskIndexSearcher extends ContextIndexSearcher { private static final Logger logger = LogManager.getLogger(HavenaskIndexSearcher.class); public static final String IDS_CONTEXT = "havenask_ids"; private static final int ID_POS = 0; private static final int SCORE_POS = 1; - private final QrsClient qrsHttpClient; + private final Client client; private final ShardId shardId; private final String tableName; private final DefaultSearchContext searchContext; public HavenaskIndexSearcher( - QrsClient qrsHttpClient, + Client client, ShardId shardId, DefaultSearchContext searchContext, IndexReader reader, @@ -67,7 +66,7 @@ public HavenaskIndexSearcher( boolean wrapWithExitableDirectoryReader ) throws IOException { super(reader, similarity, queryCache, queryCachingPolicy, wrapWithExitableDirectoryReader); - this.qrsHttpClient = qrsHttpClient; + this.client = client; this.shardId = shardId; this.tableName = Utils.getHavenaskTableName(shardId); this.searchContext = searchContext; @@ -76,9 +75,7 @@ public HavenaskIndexSearcher( @Override public void search(Query query, Collector collector) throws IOException { String sql = QueryTransformer.toSql(tableName, searchContext.request().source(), searchContext.indexShard().mapperService()); - String kvpair = "format:full_json;timeout:10000;databaseName:" + SQL_DATABASE; - QrsSqlRequest request = new QrsSqlRequest(sql, kvpair); - QrsSqlResponse response = qrsHttpClient.executeSql(request); + HavenaskSqlResponse response = client.execute(HavenaskSqlAction.INSTANCE, new HavenaskSqlRequest(sql, null)).actionGet(); if (false == Strings.isNullOrEmpty(response.getResult())) { SqlResponse sqlResponse = SqlResponse.parse(response.getResult()); if (logger.isDebugEnabled()) {