Skip to content

Commit

Permalink
Rework query parsing to use the OpenSearch query parser, enabling us …
Browse files Browse the repository at this point in the history
…to handle a wider variety

of queries
  • Loading branch information
kyle-sammons committed Jul 12, 2024
1 parent 30a66a0 commit c888a75
Show file tree
Hide file tree
Showing 28 changed files with 793 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ public SearchResult<T> query(SearchQuery query) {
searchStartTime,
searchEndTime,
query.howMany,
query.aggBuilder);
query.aggBuilder,
query.queryBuilder);
} else {
return (SearchResult<T>) SearchResult.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ public SearchResult<T> query(SearchQuery query) {
query.startTimeEpochMs,
query.endTimeEpochMs,
query.howMany,
query.aggBuilder);
query.aggBuilder,
query.queryBuilder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,19 @@ public List<AstraSearch.SearchRequest> parseHttpPostBody(String postBody)
.setStartTimeEpochMs(getStartTimeEpochMs(body))
.setEndTimeEpochMs(getEndTimeEpochMs(body))
.setAggregations(getAggregations(body))
.setQuery(getQuery(body))
.build());
}
return searchRequests;
}

private static String getQuery(JsonNode body) {
if (!body.get("query").isNull() && !body.get("query").isEmpty()) {
return body.get("query").toString();
}
return null;
}

private static String getDataset(JsonNode header) {
return header.get("index").asText();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.opensearch.common.settings.IndexScopedSettings.BUILT_IN_INDEX_SETTINGS;

import com.slack.astra.logstore.LogMessage;
import com.slack.astra.logstore.search.aggregations.AggBuilder;
Expand Down Expand Up @@ -30,8 +31,10 @@
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ObjectUtils;
Expand All @@ -44,6 +47,8 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.compress.CompressedXContent;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -60,6 +65,7 @@
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.QueryStringQueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
Expand Down Expand Up @@ -127,16 +133,22 @@ public class OpenSearchAdapter {
// set this to a high number for now
private static final int TOTAL_FIELDS_LIMIT = 2500;

// This will enable OpenSearch query parsing by default, rather than going down the
// QueryString parsing path we have been using
private boolean useOpenSearchQueryParsing = false;

public OpenSearchAdapter(Map<String, LuceneFieldDef> chunkSchema) {
this.indexSettings = buildIndexSettings();
this.similarityService = new SimilarityService(indexSettings, null, emptyMap());
this.mapperService = buildMapperService(indexSettings, similarityService);
this.chunkSchema = chunkSchema;
this.useOpenSearchQueryParsing =
Boolean.parseBoolean(System.getProperty("astra.bulkIngest.useKafkaTransactions", "false"));
}

/**
* Builds a Lucene query using the provided arguments, and the currently loaded schema. Uses the
* Opensearch Query String builder. TODO - use the dataset param in building query
* Builds a Lucene query using the provided arguments, and the currently loaded schema. Uses
* Opensearch QueryBuilder's. TODO - use the dataset param in building query
*
* @see <a href="https://opensearch.org/docs/latest/query-dsl/full-text/query-string/">Query
* parsing OpenSearch docs</a>
Expand All @@ -149,7 +161,8 @@ public Query buildQuery(
String queryStr,
Long startTimeMsEpoch,
Long endTimeMsEpoch,
IndexSearcher indexSearcher)
IndexSearcher indexSearcher,
QueryBuilder queryBuilder)
throws IOException {
LOG.trace("Query raw input string: '{}'", queryStr);
QueryShardContext queryShardContext =
Expand All @@ -159,6 +172,11 @@ public Query buildQuery(
indexSearcher,
similarityService,
mapperService);

if (queryBuilder != null && this.useOpenSearchQueryParsing) {
return queryBuilder.rewrite(queryShardContext).toQuery(queryShardContext);
}

try {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

Expand Down Expand Up @@ -192,11 +210,14 @@ public Query buildQuery(

if (queryShardContext.getMapperService().fieldType(LogMessage.SystemField.ALL.fieldName)
!= null) {
queryStringQueryBuilder.defaultField(LogMessage.SystemField.ALL.fieldName);
// setting lenient=false will not throw error when the query fails to parse against
// numeric fields
queryStringQueryBuilder.lenient(false);
} else {
// The _all field is the default field for all queries. If we explicitly don't want
// to search that field, or that field isn't mapped, then we need to set the default to be
// *
queryStringQueryBuilder.defaultField("*");
queryStringQueryBuilder.lenient(true);
}

Expand Down Expand Up @@ -393,9 +414,21 @@ protected static IndexSettings buildIndexSettings() {
// index sort info to be present as a setting here.
.put("index.sort.field", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName)
.put("index.sort.order", "desc")
.put("index.query.default_field", LogMessage.SystemField.ALL.fieldName)
.put("index.query_string.lenient", false)
.build();

Settings nodeSetings =
Settings.builder().put("indices.query.query_string.analyze_wildcard", true).build();

Set<Setting<?>> built = new HashSet<>(BUILT_IN_INDEX_SETTINGS);

IndexScopedSettings indexScopedSettings = new IndexScopedSettings(settings, built);

return new IndexSettings(
IndexMetadata.builder("index").settings(settings).build(), Settings.EMPTY);
IndexMetadata.builder("index").settings(settings).build(),
nodeSetings,
indexScopedSettings);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@

import com.slack.astra.logstore.search.aggregations.AggBuilder;
import java.io.Closeable;
import org.opensearch.index.query.QueryBuilder;

public interface LogIndexSearcher<T> extends Closeable {
SearchResult<T> search(
String dataset, String query, Long minTime, Long maxTime, int howMany, AggBuilder aggBuilder);
String dataset,
String query,
Long minTime,
Long maxTime,
int howMany,
AggBuilder aggBuilder,
QueryBuilder queryBuilder);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.store.MMapDirectory;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.aggregations.InternalAggregation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -87,7 +88,8 @@ public SearchResult<LogMessage> search(
Long startTimeMsEpoch,
Long endTimeMsEpoch,
int howMany,
AggBuilder aggBuilder) {
AggBuilder aggBuilder,
QueryBuilder queryBuilder) {

ensureNonEmptyString(dataset, "dataset should be a non-empty string");
ensureNonNullString(queryStr, "query should be a non-empty string");
Expand All @@ -111,12 +113,13 @@ public SearchResult<LogMessage> search(
// Acquire an index searcher from searcher manager.
// This is a useful optimization for indexes that are static.
IndexSearcher searcher = searcherManager.acquire();

try {
List<LogMessage> results;
InternalAggregation internalAggregation = null;
Query query =
openSearchAdapter.buildQuery(
dataset, queryStr, startTimeMsEpoch, endTimeMsEpoch, searcher);
dataset, queryStr, startTimeMsEpoch, endTimeMsEpoch, searcher, queryBuilder);

if (howMany > 0) {
CollectorManager<TopFieldCollector, TopFieldDocs> topFieldCollector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import com.slack.astra.logstore.search.aggregations.AggBuilder;
import java.util.List;
import org.opensearch.index.query.QueryBuilder;

/** A class that represents a search query internally to LogStore. */
public class SearchQuery {
// TODO: Remove the dataset field from this class since it is not a lucene level concept.
@Deprecated public final String dataset;

public final String queryStr;
public final QueryBuilder queryBuilder;
public final long startTimeEpochMs;
public final long endTimeEpochMs;
public final int howMany;
Expand All @@ -22,14 +24,16 @@ public SearchQuery(
long endTimeEpochMs,
int howMany,
AggBuilder aggBuilder,
List<String> chunkIds) {
List<String> chunkIds,
QueryBuilder queryBuilder) {
this.dataset = dataset;
this.queryStr = queryStr;
this.startTimeEpochMs = startTimeEpochMs;
this.endTimeEpochMs = endTimeEpochMs;
this.howMany = howMany;
this.aggBuilder = aggBuilder;
this.chunkIds = chunkIds;
this.queryBuilder = queryBuilder;
}

@Override
Expand All @@ -51,6 +55,8 @@ public String toString() {
+ chunkIds
+ ", aggBuilder="
+ aggBuilder
+ ", queryBuilder="
+ queryBuilder
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import brave.ScopedSpan;
import brave.Tracing;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.logstore.LogWireMessage;
Expand Down Expand Up @@ -34,6 +35,13 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.json.JsonXContentParser;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.index.query.AbstractQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;

public class SearchResultUtils {
public static Map<String, Object> fromValueStruct(AstraSearch.Struct struct) {
Expand Down Expand Up @@ -657,14 +665,33 @@ public static AstraSearch.SearchRequest.SearchAggregation.FiltersAggregation toF
}

public static SearchQuery fromSearchRequest(AstraSearch.SearchRequest searchRequest) {
QueryBuilder queryBuilder = null;
if (!searchRequest.getQuery().isEmpty()) {
SearchModule searchModule = new SearchModule(Settings.EMPTY, List.of());
try {
ObjectMapper objectMapper = new ObjectMapper();
NamedXContentRegistry namedXContentRegistry =
new NamedXContentRegistry(searchModule.getNamedXContents());
JsonXContentParser jsonXContentParser =
new JsonXContentParser(
namedXContentRegistry,
DeprecationHandler.IGNORE_DEPRECATIONS,
objectMapper.createParser(searchRequest.getQuery()));
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(jsonXContentParser);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

return new SearchQuery(
searchRequest.getDataset(),
searchRequest.getQueryString(),
searchRequest.getStartTimeEpochMs(),
searchRequest.getEndTimeEpochMs(),
searchRequest.getHowMany(),
fromSearchAggregations(searchRequest.getAggregations()),
searchRequest.getChunkIdsList());
searchRequest.getChunkIdsList(),
queryBuilder);
}

public static SearchResult<LogMessage> fromSearchResultProtoOrEmpty(
Expand Down
3 changes: 3 additions & 0 deletions astra/src/main/proto/astra_search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ message SearchRequest {
// Only a single top-level aggregation is currently supported
SearchAggregation aggregations = 7;

// The fully query object, to be used in the leaf nodes and parsed by OpenSearch
string query = 8;

message SearchAggregation {
// The type of aggregation (ie, avg, date_histogram, etc)
string type = 1;
Expand Down
Loading

0 comments on commit c888a75

Please sign in to comment.