Skip to content

Commit

Permalink
请求fed接口替代直接访问qrs
Browse files Browse the repository at this point in the history
  • Loading branch information
weizijun committed Dec 11, 2023
1 parent ec313bc commit 22953e1
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class HavenaskEnginePlugin extends Plugin
private final SetOnce<NativeProcessControlService> nativeProcessControlServiceSetOnce = new SetOnce<>();
private final SetOnce<HavenaskClient> searcherClientSetOnce = new SetOnce<>();
private final SetOnce<QrsClient> qrsClientSetOnce = new SetOnce<>();
private final SetOnce<Client> clientSetOnce = new SetOnce<>();
private final SetOnce<SearcherClient> searcherArpcClientSetOnce = new SetOnce<>();
private final SetOnce<MetaDataSyncer> metaDataSyncerSetOnce = new SetOnce<>();
private final Settings settings;
Expand Down Expand Up @@ -177,7 +178,7 @@ public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
engineConfig -> new HavenaskEngine(
engineConfig,
searcherClientSetOnce.get(),
qrsClientSetOnce.get(),
clientSetOnce.get(),
searcherArpcClientSetOnce.get(),
havenaskEngineEnvironmentSetOnce.get(),
nativeProcessControlServiceSetOnce.get(),
Expand Down Expand Up @@ -226,6 +227,7 @@ public Collection<Object> createComponents(
qrsClientSetOnce.get()
);
metaDataSyncerSetOnce.set(metaDataSyncer);
clientSetOnce.set(client);

return Arrays.asList(
nativeProcessControlServiceSetOnce.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,26 @@

package org.havenask.engine.index.engine;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

import javax.management.MBeanTrustPermission;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
Expand All @@ -37,6 +55,7 @@
import org.apache.lucene.util.BytesRef;
import org.havenask.HavenaskException;
import org.havenask.action.bulk.BackoffPolicy;
import org.havenask.client.Client;
import org.havenask.common.Nullable;
import org.havenask.common.bytes.BytesArray;
import org.havenask.common.bytes.BytesReference;
Expand All @@ -51,13 +70,13 @@
import org.havenask.engine.NativeProcessControlService;
import org.havenask.engine.index.mapper.VectorField;
import org.havenask.engine.rpc.HavenaskClient;
import org.havenask.engine.rpc.QrsClient;
import org.havenask.engine.rpc.QrsSqlRequest;
import org.havenask.engine.rpc.QrsSqlResponse;
import org.havenask.engine.rpc.SearcherClient;
import org.havenask.engine.rpc.TargetInfo;
import org.havenask.engine.rpc.WriteRequest;
import org.havenask.engine.rpc.WriteResponse;
import org.havenask.engine.search.action.HavenaskSqlAction;
import org.havenask.engine.search.action.HavenaskSqlRequest;
import org.havenask.engine.search.action.HavenaskSqlResponse;
import org.havenask.engine.util.JsonPrettyFormatter;
import org.havenask.engine.util.RangeUtil;
import org.havenask.engine.util.Utils;
Expand All @@ -79,33 +98,16 @@
import org.havenask.index.translog.TranslogDeletionPolicy;
import org.havenask.search.DefaultSearchContext;
import org.havenask.search.internal.ContextIndexSearcher;
import suez.service.proto.ErrorCode;

import javax.management.MBeanTrustPermission;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import static org.havenask.engine.search.rest.RestHavenaskSqlAction.SQL_DATABASE;
import suez.service.proto.ErrorCode;

public class HavenaskEngine extends InternalEngine {

private final HavenaskClient searcherHttpClient;
private final QrsClient qrsHttpClient;
private final Client client;
private final SearcherClient searcherClient;
private final HavenaskEngineEnvironment env;
private final NativeProcessControlService nativeProcessControlService;
Expand All @@ -123,7 +125,7 @@ public class HavenaskEngine extends InternalEngine {
public HavenaskEngine(
EngineConfig engineConfig,
HavenaskClient searcherHttpClient,
QrsClient qrsHttpClient,
Client client,
SearcherClient searcherClient,
HavenaskEngineEnvironment env,
NativeProcessControlService nativeProcessControlService,
Expand All @@ -132,7 +134,7 @@ public HavenaskEngine(
super(engineConfig);

this.searcherHttpClient = searcherHttpClient;
this.qrsHttpClient = qrsHttpClient;
this.client = client;
this.searcherClient = searcherClient;
this.env = env;
this.nativeProcessControlService = nativeProcessControlService;
Expand Down Expand Up @@ -552,9 +554,8 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
tableName,
get.id()
);
String kvpair = "format:full_json;timeout:10000;databaseName:" + SQL_DATABASE;
QrsSqlRequest request = new QrsSqlRequest(sql, kvpair);
QrsSqlResponse response = qrsHttpClient.executeSql(request);

HavenaskSqlResponse response = client.execute(HavenaskSqlAction.INSTANCE, new HavenaskSqlRequest(sql, null)).actionGet();
JSONObject jsonObject = JsonPrettyFormatter.fromString(response.getResult());
JSONObject sqlResult = jsonObject.getJSONObject("sql_result");
JSONArray datas = sqlResult.getJSONArray("data");
Expand Down Expand Up @@ -815,9 +816,7 @@ long getDocCount() {
long docCount = 0;
try {
String sql = String.format(Locale.ROOT, "select /*+ SCAN_ATTR(partitionIds='%d')*/ count(*) from %s", shardId.id(), tableName);
String kvpair = "format:full_json;timeout:10000;databaseName:" + SQL_DATABASE;
QrsSqlRequest request = new QrsSqlRequest(sql, kvpair);
QrsSqlResponse response = qrsHttpClient.executeSql(request);
HavenaskSqlResponse response = client.execute(HavenaskSqlAction.INSTANCE, new HavenaskSqlRequest(sql, null)).actionGet();
JSONObject jsonObject = JsonPrettyFormatter.fromString(response.getResult());
JSONObject sqlResult = jsonObject.getJSONObject("sql_result");
JSONArray datas = sqlResult.getJSONArray("data");
Expand All @@ -839,7 +838,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope, Function<Sea
ReferenceManager<HavenaskDirectoryReader> referenceManager = getReferenceManager(scope);
HavenaskDirectoryReader acquire = referenceManager.acquire();
return new HavenaskSearcher(
qrsHttpClient,
client,
shardId,
source,
acquire,
Expand Down Expand Up @@ -867,7 +866,7 @@ public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wra
@Override
public Searcher acquireSearcherInternal(String source) {
return new HavenaskSearcher(
qrsHttpClient,
client,
shardId,
"search",
acquire,
Expand Down Expand Up @@ -899,11 +898,11 @@ public void doClose() {
}

public static class HavenaskSearcher extends Engine.Searcher {
private final QrsClient qrsHttpClient;
private final Client client;
private final ShardId shardId;

public HavenaskSearcher(
QrsClient qrsHttpClient,
Client client,
ShardId shardId,
String source,
IndexReader reader,
Expand All @@ -913,15 +912,15 @@ public HavenaskSearcher(
Closeable onClose
) {
super(source, reader, similarity, queryCache, queryCachingPolicy, onClose);
this.qrsHttpClient = qrsHttpClient;
this.client = client;
this.shardId = shardId;
}

@Override
public ContextIndexSearcher createContextIndexSearcher(DefaultSearchContext searchContext, boolean lowLevelCancellation)
throws IOException {
return new HavenaskIndexSearcher(
qrsHttpClient,
client,
shardId,
searchContext,
getIndexReader(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package org.havenask.engine.index.engine;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexReader;
Expand All @@ -26,12 +30,13 @@
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.apache.lucene.search.similarities.Similarity;
import org.havenask.client.Client;
import org.havenask.client.ha.SqlResponse;
import org.havenask.common.Strings;
import org.havenask.common.lucene.search.TopDocsAndMaxScore;
import org.havenask.engine.rpc.QrsClient;
import org.havenask.engine.rpc.QrsSqlRequest;
import org.havenask.engine.rpc.QrsSqlResponse;
import org.havenask.engine.search.action.HavenaskSqlAction;
import org.havenask.engine.search.action.HavenaskSqlRequest;
import org.havenask.engine.search.action.HavenaskSqlResponse;
import org.havenask.engine.util.Utils;
import org.havenask.index.shard.ShardId;
import org.havenask.search.DefaultSearchContext;
Expand All @@ -40,24 +45,18 @@
import org.havenask.search.internal.ReaderContext;
import org.havenask.search.query.QuerySearchResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.havenask.engine.search.rest.RestHavenaskSqlAction.SQL_DATABASE;

public class HavenaskIndexSearcher extends ContextIndexSearcher {
private static final Logger logger = LogManager.getLogger(HavenaskIndexSearcher.class);
public static final String IDS_CONTEXT = "havenask_ids";
private static final int ID_POS = 0;
private static final int SCORE_POS = 1;
private final QrsClient qrsHttpClient;
private final Client client;
private final ShardId shardId;
private final String tableName;
private final DefaultSearchContext searchContext;

public HavenaskIndexSearcher(
QrsClient qrsHttpClient,
Client client,
ShardId shardId,
DefaultSearchContext searchContext,
IndexReader reader,
Expand All @@ -67,7 +66,7 @@ public HavenaskIndexSearcher(
boolean wrapWithExitableDirectoryReader
) throws IOException {
super(reader, similarity, queryCache, queryCachingPolicy, wrapWithExitableDirectoryReader);
this.qrsHttpClient = qrsHttpClient;
this.client = client;
this.shardId = shardId;
this.tableName = Utils.getHavenaskTableName(shardId);
this.searchContext = searchContext;
Expand All @@ -76,9 +75,7 @@ public HavenaskIndexSearcher(
@Override
public void search(Query query, Collector collector) throws IOException {
String sql = QueryTransformer.toSql(tableName, searchContext.request().source(), searchContext.indexShard().mapperService());
String kvpair = "format:full_json;timeout:10000;databaseName:" + SQL_DATABASE;
QrsSqlRequest request = new QrsSqlRequest(sql, kvpair);
QrsSqlResponse response = qrsHttpClient.executeSql(request);
HavenaskSqlResponse response = client.execute(HavenaskSqlAction.INSTANCE, new HavenaskSqlRequest(sql, null)).actionGet();
if (false == Strings.isNullOrEmpty(response.getResult())) {
SqlResponse sqlResponse = SqlResponse.parse(response.getResult());
if (logger.isDebugEnabled()) {
Expand Down

0 comments on commit 22953e1

Please sign in to comment.