Skip to content

Commit

Permalink
支持esrally pmc、http_logs查询集相关的query dsl转成havenask sql (#435)
Browse files Browse the repository at this point in the history
  • Loading branch information
weizijun authored Jan 26, 2024
1 parent e0e5793 commit 1665891
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public class Schema {
// HA3 do not allow '.' for field name
// currently only multi-field index contains '.'
public static final String encodeFieldWithDot(String field) {
return field.replace('.', FIELD_DOT_REPLACEMENT);
field = field.replace('.', FIELD_DOT_REPLACEMENT);
field = field.replace('@', FIELD_DOT_REPLACEMENT);
return field;
}

public final long floatToLongVal(double d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@

package org.havenask.engine.index.config.generator;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.common.io.Streams;
Expand All @@ -49,20 +62,6 @@
import org.havenask.index.mapper.MapperService;
import org.havenask.index.mapper.TextSearchInfo;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class SchemaGenerator {
// have deprecated fields or not support now.
public static final Set<String> ExcludeFields = Set.of("_field_names", "index", "_type", "_uid", "_parent");
Expand Down Expand Up @@ -148,10 +147,7 @@ public Schema getSchema(String table, Settings indexSettings, MapperService mapp
}

// multi field index
if (fieldName.contains(".")) {
String originField = fieldName.substring(0, fieldName.lastIndexOf('.'));
schema.copyToFields.computeIfAbsent(originField, (k) -> new LinkedList<>()).add(fieldName);
// replace '.' in field name
if (fieldName.contains(".") || fieldName.contains("@")) {
fieldName = Schema.encodeFieldWithDot(fieldName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ static Map<String, String> toHaIndex(ParsedDocument parsedDocument) throws IOExc
for (IndexableField field : rootDoc.getFields()) {
String fieldName = field.name();
// multi field index
if (fieldName.contains(".")) {
if (fieldName.contains(".") || fieldName.contains("@")) {
fieldName = Schema.encodeFieldWithDot(fieldName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

package org.havenask.engine.search;

import static org.havenask.engine.search.rest.RestHavenaskSqlAction.SQL_DATABASE;

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.action.search.SearchRequest;
Expand All @@ -25,19 +32,17 @@
import org.havenask.engine.rpc.QrsClient;
import org.havenask.engine.rpc.QrsSqlRequest;
import org.havenask.engine.rpc.QrsSqlResponse;
import org.havenask.index.query.BoolQueryBuilder;
import org.havenask.index.query.MatchAllQueryBuilder;
import org.havenask.index.query.MatchPhraseQueryBuilder;
import org.havenask.index.query.MatchQueryBuilder;
import org.havenask.index.query.QueryBuilder;
import org.havenask.index.query.RangeQueryBuilder;
import org.havenask.index.query.TermQueryBuilder;
import org.havenask.search.builder.KnnSearchBuilder;
import org.havenask.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static org.havenask.engine.search.rest.RestHavenaskSqlAction.SQL_DATABASE;
import org.havenask.search.sort.FieldSortBuilder;
import org.havenask.search.sort.SortBuilder;

public class HavenaskSearchQueryProcessor {
private static final Logger logger = LogManager.getLogger(HavenaskSearchQueryProcessor.class);
Expand All @@ -55,6 +60,10 @@ public HavenaskSearchQueryProcessor(QrsClient qrsClient) {

public SqlResponse executeQuery(SearchRequest request, String tableName, Map<String, Object> indexMapping) throws IOException {
String sql = transferSearchRequest2HavenaskSql(tableName, request.source(), indexMapping);
if (logger.isTraceEnabled()) {
logger.trace("dsl: {}, sql: {}", request.source(), sql);
}

String kvpair = "format:full_json;timeout:10000;databaseName:" + SQL_DATABASE;
QrsSqlRequest qrsQueryPhaseSqlRequest = new QrsSqlRequest(sql, kvpair);
QrsSqlResponse qrsQueryPhaseSqlResponse = qrsClient.executeSql(qrsQueryPhaseSqlRequest);
Expand Down Expand Up @@ -170,10 +179,107 @@ public String transferSearchRequest2HavenaskSql(String table, SearchSourceBuilde
.append("', '")
.append(matchQueryBuilder.value())
.append("', 'default_op:OR')");

selectParams.append(", bm25_score() as _score");
orderBy.append(" order by _score desc");
} else if (queryBuilder instanceof MatchPhraseQueryBuilder) {
MatchPhraseQueryBuilder matchQueryBuilder = (MatchPhraseQueryBuilder) queryBuilder;
where.append(" where ")
.append("QUERY('', '")
.append(Schema.encodeFieldWithDot(matchQueryBuilder.fieldName()))
.append(":")
.append("\"")
.append(matchQueryBuilder.value())
.append("\"")
.append("')");
selectParams.append(", bm25_score() as _score");
orderBy.append(" order by _score desc");
} else if (queryBuilder instanceof RangeQueryBuilder) {
RangeQueryBuilder rangeQueryBuilder = (RangeQueryBuilder) queryBuilder;
where.append(" where ")
.append("QUERY('', '")
.append(Schema.encodeFieldWithDot(rangeQueryBuilder.fieldName()))
.append(":")
.append(rangeQueryBuilder.includeLower() ? "[" : "(")
.append(rangeQueryBuilder.from())
.append(",")
.append(rangeQueryBuilder.to())
.append(rangeQueryBuilder.includeUpper() ? "]" : ")")
.append("')");
} else if (queryBuilder instanceof BoolQueryBuilder) {
BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder) queryBuilder;
where.append(" where ");
boolean first = true;
for (QueryBuilder subQueryBuilder : boolQueryBuilder.must()) {
if (false == first) {
where.append(" and ");
}
if (first) {
first = false;
}
if (subQueryBuilder instanceof TermQueryBuilder) {
TermQueryBuilder termQueryBuilder = (TermQueryBuilder) subQueryBuilder;
where.append(Schema.encodeFieldWithDot(termQueryBuilder.fieldName()))
.append("='")
.append(termQueryBuilder.value())
.append("'");
} else if (subQueryBuilder instanceof MatchQueryBuilder) {
MatchQueryBuilder matchQueryBuilder = (MatchQueryBuilder) subQueryBuilder;
where.append("MATCHINDEX('")
.append(Schema.encodeFieldWithDot(matchQueryBuilder.fieldName()))
.append("', '")
.append(matchQueryBuilder.value())
.append("', 'default_op:OR')");

selectParams.append(", bm25_score() as _score");
orderBy.append(" order by _score desc");
} else if (subQueryBuilder instanceof RangeQueryBuilder) {
RangeQueryBuilder rangeQueryBuilder = (RangeQueryBuilder) subQueryBuilder;
where.append("QUERY('', '")
.append(Schema.encodeFieldWithDot(rangeQueryBuilder.fieldName()))
.append(":")
.append(rangeQueryBuilder.includeLower() ? "[" : "(")
.append(rangeQueryBuilder.from())
.append(",")
.append(rangeQueryBuilder.to())
.append(rangeQueryBuilder.includeUpper() ? "]" : ")")
.append("')");
} else {
throw new IOException("unsupported DSL(unsupported bool filter): " + dsl);
}
}
} else {
throw new IOException("unsupported DSL: " + dsl);
}
}

StringBuilder sortBuilder = new StringBuilder();
if (dsl.sorts() != null && dsl.sorts().size() > 0) {
sortBuilder.append(" order by ");
for (SortBuilder<?> sortField : dsl.sorts()) {
if (sortField instanceof FieldSortBuilder == false) {
throw new IOException("unsupported DSL(unsupported sort): " + dsl);
}

FieldSortBuilder fieldSortBuilder = (FieldSortBuilder) sortField;

sortBuilder.append("`")
.append(Schema.encodeFieldWithDot(fieldSortBuilder.getFieldName()))
.append("` ")
.append(sortField.order());

if (sortBuilder.length() > 0) {
sortBuilder.append(", ");
}
}

sortBuilder.delete(sortBuilder.length() - 2, sortBuilder.length());
}

if (orderBy.length() == 0) {
orderBy = sortBuilder;
}

sqlQuery.append("select")
.append(selectParams)
.append(" from ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.havenask.engine.search.action;

import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.action.ActionListener;
Expand All @@ -25,20 +27,20 @@
import org.havenask.action.support.HandledTransportAction;
import org.havenask.client.ha.SqlResponse;
import org.havenask.cluster.ClusterState;
import org.havenask.cluster.metadata.IndexAbstraction;
import org.havenask.cluster.metadata.IndexMetadata;
import org.havenask.cluster.service.ClusterService;
import org.havenask.common.inject.Inject;
import org.havenask.engine.NativeProcessControlService;
import org.havenask.engine.search.HavenaskSearchQueryProcessor;
import org.havenask.engine.rpc.QrsClient;
import org.havenask.engine.rpc.http.QrsHttpClient;
import org.havenask.engine.search.HavenaskSearchFetchProcessor;
import org.havenask.engine.search.HavenaskSearchQueryProcessor;
import org.havenask.search.internal.InternalSearchResponse;
import org.havenask.tasks.Task;
import org.havenask.threadpool.ThreadPool;
import org.havenask.transport.TransportService;

import java.util.Map;

public class TransportHavenaskSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
private static final Logger logger = LogManager.getLogger(TransportHavenaskSearchAction.class);
private ClusterService clusterService;
Expand Down Expand Up @@ -77,13 +79,18 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
}
String tableName = request.indices()[0];

long startTime = System.nanoTime();

ClusterState clusterState = clusterService.state();
IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(tableName);
if (indexAbstraction == null) {
throw new IllegalArgumentException("illegal index name! index name not exist.");
}

IndexMetadata indexMetadata = indexAbstraction.getWriteIndex();
tableName = indexMetadata.getIndex().getName();

long startTime = System.nanoTime();

Map<String, Object> indexMapping = clusterState.metadata().index(tableName).mapping() != null
? clusterState.metadata().index(tableName).mapping().getSourceAsMap()
: null;
Map<String, Object> indexMapping = indexMetadata.mapping() != null ? indexMetadata.mapping().getSourceAsMap() : null;
SqlResponse havenaskSearchQueryPhaseSqlResponse = havenaskSearchQueryProcessor.executeQuery(request, tableName, indexMapping);

InternalSearchResponse internalSearchResponse = havenaskSearchFetchProcessor.executeFetch(
Expand All @@ -100,7 +107,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
);
listener.onResponse(searchResponse);
} catch (Exception e) {
logger.error("Failed to execute havenask search, ", e);
logger.info("Failed to execute havenask search, ", e);
listener.onFailure(e);
}
}
Expand Down
Loading

0 comments on commit 1665891

Please sign in to comment.