Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

支持esrally pmc、http_logs查询集相关的query dsl转成havenask sql #435

Merged
merged 8 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public class Schema {
// HA3 do not allow '.' for field name
// currently only multi-field index contains '.'
public static final String encodeFieldWithDot(String field) {
return field.replace('.', FIELD_DOT_REPLACEMENT);
field = field.replace('.', FIELD_DOT_REPLACEMENT);
field = field.replace('@', FIELD_DOT_REPLACEMENT);
return field;
}

public final long floatToLongVal(double d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@

package org.havenask.engine.index.config.generator;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.common.io.Streams;
Expand All @@ -49,20 +62,6 @@
import org.havenask.index.mapper.MapperService;
import org.havenask.index.mapper.TextSearchInfo;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class SchemaGenerator {
// have deprecated fields or not support now.
public static final Set<String> ExcludeFields = Set.of("_field_names", "index", "_type", "_uid", "_parent");
Expand Down Expand Up @@ -148,10 +147,7 @@ public Schema getSchema(String table, Settings indexSettings, MapperService mapp
}

// multi field index
if (fieldName.contains(".")) {
String originField = fieldName.substring(0, fieldName.lastIndexOf('.'));
schema.copyToFields.computeIfAbsent(originField, (k) -> new LinkedList<>()).add(fieldName);
// replace '.' in field name
if (fieldName.contains(".") || fieldName.contains("@")) {
fieldName = Schema.encodeFieldWithDot(fieldName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ static Map<String, String> toHaIndex(ParsedDocument parsedDocument) throws IOExc
for (IndexableField field : rootDoc.getFields()) {
String fieldName = field.name();
// multi field index
if (fieldName.contains(".")) {
if (fieldName.contains(".") || fieldName.contains("@")) {
fieldName = Schema.encodeFieldWithDot(fieldName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

package org.havenask.engine.search;

import static org.havenask.engine.search.rest.RestHavenaskSqlAction.SQL_DATABASE;

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.action.search.SearchRequest;
Expand All @@ -25,19 +32,17 @@
import org.havenask.engine.rpc.QrsClient;
import org.havenask.engine.rpc.QrsSqlRequest;
import org.havenask.engine.rpc.QrsSqlResponse;
import org.havenask.index.query.BoolQueryBuilder;
import org.havenask.index.query.MatchAllQueryBuilder;
import org.havenask.index.query.MatchPhraseQueryBuilder;
import org.havenask.index.query.MatchQueryBuilder;
import org.havenask.index.query.QueryBuilder;
import org.havenask.index.query.RangeQueryBuilder;
import org.havenask.index.query.TermQueryBuilder;
import org.havenask.search.builder.KnnSearchBuilder;
import org.havenask.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static org.havenask.engine.search.rest.RestHavenaskSqlAction.SQL_DATABASE;
import org.havenask.search.sort.FieldSortBuilder;
import org.havenask.search.sort.SortBuilder;

public class HavenaskSearchQueryProcessor {
private static final Logger logger = LogManager.getLogger(HavenaskSearchQueryProcessor.class);
Expand All @@ -55,6 +60,10 @@ public HavenaskSearchQueryProcessor(QrsClient qrsClient) {

public SqlResponse executeQuery(SearchRequest request, String tableName, Map<String, Object> indexMapping) throws IOException {
String sql = transferSearchRequest2HavenaskSql(tableName, request.source(), indexMapping);
if (logger.isTraceEnabled()) {
logger.trace("dsl: {}, sql: {}", request.source(), sql);
}

String kvpair = "format:full_json;timeout:10000;databaseName:" + SQL_DATABASE;
QrsSqlRequest qrsQueryPhaseSqlRequest = new QrsSqlRequest(sql, kvpair);
QrsSqlResponse qrsQueryPhaseSqlResponse = qrsClient.executeSql(qrsQueryPhaseSqlRequest);
Expand Down Expand Up @@ -170,10 +179,107 @@ public String transferSearchRequest2HavenaskSql(String table, SearchSourceBuilde
.append("', '")
.append(matchQueryBuilder.value())
.append("', 'default_op:OR')");

selectParams.append(", bm25_score() as _score");
orderBy.append(" order by _score desc");
} else if (queryBuilder instanceof MatchPhraseQueryBuilder) {
MatchPhraseQueryBuilder matchQueryBuilder = (MatchPhraseQueryBuilder) queryBuilder;
where.append(" where ")
.append("QUERY('', '")
.append(Schema.encodeFieldWithDot(matchQueryBuilder.fieldName()))
.append(":")
.append("\"")
.append(matchQueryBuilder.value())
.append("\"")
.append("')");
selectParams.append(", bm25_score() as _score");
orderBy.append(" order by _score desc");
} else if (queryBuilder instanceof RangeQueryBuilder) {
RangeQueryBuilder rangeQueryBuilder = (RangeQueryBuilder) queryBuilder;
where.append(" where ")
.append("QUERY('', '")
.append(Schema.encodeFieldWithDot(rangeQueryBuilder.fieldName()))
.append(":")
.append(rangeQueryBuilder.includeLower() ? "[" : "(")
.append(rangeQueryBuilder.from())
.append(",")
.append(rangeQueryBuilder.to())
.append(rangeQueryBuilder.includeUpper() ? "]" : ")")
.append("')");
} else if (queryBuilder instanceof BoolQueryBuilder) {
BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder) queryBuilder;
where.append(" where ");
boolean first = true;
for (QueryBuilder subQueryBuilder : boolQueryBuilder.must()) {
if (false == first) {
where.append(" and ");
}
if (first) {
first = false;
}
if (subQueryBuilder instanceof TermQueryBuilder) {
TermQueryBuilder termQueryBuilder = (TermQueryBuilder) subQueryBuilder;
where.append(Schema.encodeFieldWithDot(termQueryBuilder.fieldName()))
.append("='")
.append(termQueryBuilder.value())
.append("'");
} else if (subQueryBuilder instanceof MatchQueryBuilder) {
MatchQueryBuilder matchQueryBuilder = (MatchQueryBuilder) subQueryBuilder;
where.append("MATCHINDEX('")
.append(Schema.encodeFieldWithDot(matchQueryBuilder.fieldName()))
.append("', '")
.append(matchQueryBuilder.value())
.append("', 'default_op:OR')");

selectParams.append(", bm25_score() as _score");
orderBy.append(" order by _score desc");
} else if (subQueryBuilder instanceof RangeQueryBuilder) {
RangeQueryBuilder rangeQueryBuilder = (RangeQueryBuilder) subQueryBuilder;
where.append("QUERY('', '")
.append(Schema.encodeFieldWithDot(rangeQueryBuilder.fieldName()))
.append(":")
.append(rangeQueryBuilder.includeLower() ? "[" : "(")
.append(rangeQueryBuilder.from())
.append(",")
.append(rangeQueryBuilder.to())
.append(rangeQueryBuilder.includeUpper() ? "]" : ")")
.append("')");
} else {
throw new IOException("unsupported DSL(unsupported bool filter): " + dsl);
}
}
} else {
throw new IOException("unsupported DSL: " + dsl);
}
}

StringBuilder sortBuilder = new StringBuilder();
if (dsl.sorts() != null && dsl.sorts().size() > 0) {
sortBuilder.append(" order by ");
for (SortBuilder<?> sortField : dsl.sorts()) {
if (sortField instanceof FieldSortBuilder == false) {
throw new IOException("unsupported DSL(unsupported sort): " + dsl);
}

FieldSortBuilder fieldSortBuilder = (FieldSortBuilder) sortField;

sortBuilder.append("`")
.append(Schema.encodeFieldWithDot(fieldSortBuilder.getFieldName()))
.append("` ")
.append(sortField.order());

if (sortBuilder.length() > 0) {
sortBuilder.append(", ");
}
}

sortBuilder.delete(sortBuilder.length() - 2, sortBuilder.length());
}

if (orderBy.length() == 0) {
orderBy = sortBuilder;
}

sqlQuery.append("select")
.append(selectParams)
.append(" from ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.havenask.engine.search.action;

import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.havenask.action.ActionListener;
Expand All @@ -25,20 +27,20 @@
import org.havenask.action.support.HandledTransportAction;
import org.havenask.client.ha.SqlResponse;
import org.havenask.cluster.ClusterState;
import org.havenask.cluster.metadata.IndexAbstraction;
import org.havenask.cluster.metadata.IndexMetadata;
import org.havenask.cluster.service.ClusterService;
import org.havenask.common.inject.Inject;
import org.havenask.engine.NativeProcessControlService;
import org.havenask.engine.search.HavenaskSearchQueryProcessor;
import org.havenask.engine.rpc.QrsClient;
import org.havenask.engine.rpc.http.QrsHttpClient;
import org.havenask.engine.search.HavenaskSearchFetchProcessor;
import org.havenask.engine.search.HavenaskSearchQueryProcessor;
import org.havenask.search.internal.InternalSearchResponse;
import org.havenask.tasks.Task;
import org.havenask.threadpool.ThreadPool;
import org.havenask.transport.TransportService;

import java.util.Map;

public class TransportHavenaskSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
private static final Logger logger = LogManager.getLogger(TransportHavenaskSearchAction.class);
private ClusterService clusterService;
Expand Down Expand Up @@ -77,13 +79,18 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
}
String tableName = request.indices()[0];

long startTime = System.nanoTime();

ClusterState clusterState = clusterService.state();
IndexAbstraction indexAbstraction = clusterState.metadata().getIndicesLookup().get(tableName);
if (indexAbstraction == null) {
throw new IllegalArgumentException("illegal index name! index name not exist.");
}

IndexMetadata indexMetadata = indexAbstraction.getWriteIndex();
tableName = indexMetadata.getIndex().getName();

long startTime = System.nanoTime();

Map<String, Object> indexMapping = clusterState.metadata().index(tableName).mapping() != null
? clusterState.metadata().index(tableName).mapping().getSourceAsMap()
: null;
Map<String, Object> indexMapping = indexMetadata.mapping() != null ? indexMetadata.mapping().getSourceAsMap() : null;
SqlResponse havenaskSearchQueryPhaseSqlResponse = havenaskSearchQueryProcessor.executeQuery(request, tableName, indexMapping);

InternalSearchResponse internalSearchResponse = havenaskSearchFetchProcessor.executeFetch(
Expand All @@ -100,7 +107,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
);
listener.onResponse(searchResponse);
} catch (Exception e) {
logger.error("Failed to execute havenask search, ", e);
logger.info("Failed to execute havenask search, ", e);
listener.onFailure(e);
}
}
Expand Down
Loading
Loading