From 5319a2817f8488c124dc9e66ede36aa51e7c832d Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Thu, 1 Feb 2024 13:14:23 -0700 Subject: [PATCH 1/4] Fix schema query --- .../search/KaldbDistributedQueryService.java | 114 ++++++++++-------- 1 file changed, 62 insertions(+), 52 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java b/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java index 720ff49179..a372e00f4b 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/search/KaldbDistributedQueryService.java @@ -7,8 +7,6 @@ import brave.grpc.GrpcTracing; import brave.propagation.CurrentTraceContext; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.linecorp.armeria.client.grpc.GrpcClients; import com.slack.kaldb.logstore.LogMessage; import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener; @@ -93,7 +91,6 @@ public class KaldbDistributedQueryService extends KaldbQueryServiceBase implemen // include metadata that should always be present. The Armeria timeout is used at the top request, // distributed query is used as a deadline for all nodes to return, and the local query timeout // is used for controlling lucene future timeouts. - private final Duration requestTimeout; private final Duration defaultQueryTimeout; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); @@ -116,7 +113,6 @@ public KaldbDistributedQueryService( this.searchMetadataStore = searchMetadataStore; this.snapshotMetadataStore = snapshotMetadataStore; this.datasetMetadataStore = datasetMetadataStore; - this.requestTimeout = requestTimeout; this.defaultQueryTimeout = defaultQueryTimeout; searchMetadataTotalChangeCounter = meterRegistry.counter(SEARCH_METADATA_TOTAL_CHANGE_COUNTER); this.distributedQueryApdexSatisfied = meterRegistry.counter(DISTRIBUTED_QUERY_APDEX_SATISFIED); @@ -494,60 +490,74 @@ public KaldbSearch.SchemaResult getSchema(KaldbSearch.SchemaRequest distribSchem Map> nodesAndSnapshotsToQuery = getNodesAndSnapshotsToQuery(searchMetadataNodesMatchingQuery); - List> queryServers = new ArrayList<>(stubs.size()); + CurrentTraceContext currentTraceContext = Tracing.current().currentTraceContext(); + try { + try (var scope = new StructuredTaskScope()) { + List> searchSubtasks = + nodesAndSnapshotsToQuery.entrySet().stream() + .limit(LIMIT_SCHEMA_NODES_TO_QUERY) + .map( + (searchNode) -> + scope.fork( + currentTraceContext.wrap( + () -> { + KaldbServiceGrpc.KaldbServiceFutureStub stub = + getStub(searchNode.getKey()); - List>> limitedNodesToQuery = - nodesAndSnapshotsToQuery.entrySet().stream().limit(LIMIT_SCHEMA_NODES_TO_QUERY).toList(); - for (Map.Entry> searchNode : limitedNodesToQuery) { - KaldbServiceGrpc.KaldbServiceFutureStub stub = getStub(searchNode.getKey()); - if (stub == null) { - // TODO: insert a failed result in the results object that we return from this method - // mimicing - continue; - } + if (stub == null) { + // TODO: insert a failed result in the results object that we + // return from this method + return null; + } - KaldbSearch.SchemaRequest localSearchReq = - distribSchemaReq.toBuilder().addAllChunkIds(searchNode.getValue()).build(); - - // make sure all underlying futures finish executing (successful/cancelled/failed/other) - // and cannot be pending when the successfulAsList.get(SAME_TIMEOUT_MS) runs - ListenableFuture schemaRequest = - stub.withDeadlineAfter(defaultQueryTimeout.toMillis(), TimeUnit.MILLISECONDS) - .withInterceptors( - GrpcTracing.newBuilder(Tracing.current()).build().newClientInterceptor()) - .schema(localSearchReq); - queryServers.add(schemaRequest); - } - ListenableFuture> searchFuture = - Futures.successfulAsList(queryServers); - try { - List searchResults = - searchFuture.get(requestTimeout.toMillis(), TimeUnit.MILLISECONDS); - KaldbSearch.SchemaResult.Builder schemaBuilder = KaldbSearch.SchemaResult.newBuilder(); - searchResults.forEach( - schemaResult -> - schemaBuilder.putAllFieldDefinition(schemaResult.getFieldDefinitionMap())); - return schemaBuilder.build(); - } catch (TimeoutException e) { - // We provide a deadline to the stub of "defaultQueryTimeout" - if this is sufficiently lower - // than the request timeout, we would expect searchFuture.get(requestTimeout) to never throw - // an exception. This however doesn't necessarily hold true if the query node is CPU - // saturated, and there is not enough cpu time to fail the pending stub queries that have - // exceeded their deadline - causing the searchFuture get to fail with a timeout. - LOG.error( - "Schema failed with timeout exception. This is potentially due to CPU saturation of the query node.", - e); - span.error(e); - return KaldbSearch.SchemaResult.newBuilder().build(); + KaldbSearch.SchemaRequest localSearchReq = + distribSchemaReq.toBuilder() + .addAllChunkIds(searchNode.getValue()) + .build(); + + return stub.withDeadlineAfter( + defaultQueryTimeout.toMillis(), TimeUnit.MILLISECONDS) + .withInterceptors( + GrpcTracing.newBuilder(Tracing.current()) + .build() + .newClientInterceptor()) + .schema(localSearchReq) + .get(); + }))) + .toList(); + + try { + scope.joinUntil(Instant.now().plusSeconds(defaultQueryTimeout.toSeconds())); + } catch (TimeoutException timeoutException) { + scope.shutdown(); + scope.join(); + } + + KaldbSearch.SchemaResult.Builder schemaBuilder = KaldbSearch.SchemaResult.newBuilder(); + // List response = new ArrayList(searchSubtasks.size()); + for (StructuredTaskScope.Subtask schemaResult : searchSubtasks) { + try { + if (schemaResult.state().equals(StructuredTaskScope.Subtask.State.SUCCESS)) { + if (schemaResult.get() != null) { + schemaBuilder.putAllFieldDefinition(schemaResult.get().getFieldDefinitionMap()); + } else { + // todo - log + } + } else { + // todo - log + } + } catch (Exception e) { + LOG.error("Error fetching search result", e); + // todo + } + } + return schemaBuilder.build(); + } } catch (Exception e) { - LOG.error("Schema failed with ", e); + LOG.error("Search failed with ", e); span.error(e); return KaldbSearch.SchemaResult.newBuilder().build(); } finally { - // always request future cancellation, so that any exceptions or incomplete futures don't - // continue to consume CPU on work that will not be used - searchFuture.cancel(false); - LOG.debug("Finished distributed search for request: {}", distribSchemaReq); span.finish(); } } From 1b3f4607d69eaec7c02e463f2d9ce304b14137d8 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Thu, 1 Feb 2024 13:14:32 -0700 Subject: [PATCH 2/4] Fix null aggs check --- .../com/slack/kaldb/elasticsearchApi/OpenSearchRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/OpenSearchRequest.java b/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/OpenSearchRequest.java index fa2de9f7f4..c31b1e44c6 100644 --- a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/OpenSearchRequest.java +++ b/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/OpenSearchRequest.java @@ -98,7 +98,7 @@ private static long getEndTimeEpochMs(JsonNode body) { } private static KaldbSearch.SearchRequest.SearchAggregation getAggregations(JsonNode body) { - if (body.get("aggs") == null) { + if (!body.has("aggs") || body.get("aggs") == null) { return KaldbSearch.SearchRequest.SearchAggregation.newBuilder().build(); } if (Iterators.size(body.get("aggs").fieldNames()) != 1) { From ca17444154d56d0a5d547c510da74714976f25b2 Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Thu, 1 Feb 2024 15:54:40 -0700 Subject: [PATCH 3/4] Add better date histogram handling --- .../kaldb/elasticsearchApi/OpenSearchRequest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/OpenSearchRequest.java b/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/OpenSearchRequest.java index c31b1e44c6..5ac8346e38 100644 --- a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/OpenSearchRequest.java +++ b/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/OpenSearchRequest.java @@ -440,7 +440,15 @@ private static List getRecursive(Js } private static String getDateHistogramInterval(JsonNode dateHistogram) { - return dateHistogram.get("interval").asText(); + if (dateHistogram.has("interval")) { + return dateHistogram.get("interval").asText(); + } + if (dateHistogram.has("fixed_interval")) { + return dateHistogram.get("fixed_interval").asText(); + } + + // else defualt + return "30s"; } private static String getHistogramInterval(JsonNode dateHistogram) { From 4508f2838d8bf018956b23bb6432ae3b9d5efc1e Mon Sep 17 00:00:00 2001 From: Bryan Burkholder Date: Mon, 5 Feb 2024 11:15:24 -0700 Subject: [PATCH 4/4] WIP query by id --- .../opensearch/OpenSearchAdapter.java | 97 ++++++++++--------- .../SchemaAwareLogDocumentBuilderImpl.java | 21 ++-- .../search/LogIndexSearcherImplTest.java | 17 ++++ 3 files changed, 84 insertions(+), 51 deletions(-) diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java index 9c5f43887f..d95f4fc7a2 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/opensearch/OpenSearchAdapter.java @@ -57,6 +57,7 @@ import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.IdsQueryBuilder; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.QueryStringQueryBuilder; import org.opensearch.index.query.RangeQueryBuilder; @@ -156,51 +157,57 @@ public Query buildQuery( similarityService, mapperService); try { - BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); - // only add a range filter if either start or end time is provided - if (startTimeMsEpoch != null || endTimeMsEpoch != null) { - RangeQueryBuilder rangeQueryBuilder = - new RangeQueryBuilder(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName); - - // todo - consider supporting something other than GTE/LTE (ie GT/LT?) - if (startTimeMsEpoch != null) { - rangeQueryBuilder.gte(startTimeMsEpoch); - } - - if (endTimeMsEpoch != null) { - rangeQueryBuilder.lte(endTimeMsEpoch); - } - - boolQueryBuilder.filter(rangeQueryBuilder); - } - - // todo - dataset? - - // Only add the query string clause if this is not attempting to fetch all records - // Since we do analyze the wildcard this can cause unexpected behavior if only a wildcard is - // provided - if (queryStr != null - && !queryStr.isEmpty() - && !queryStr.equals("*:*") - && !queryStr.equals("*")) { - QueryStringQueryBuilder queryStringQueryBuilder = new QueryStringQueryBuilder(queryStr); - - if (queryShardContext.getMapperService().fieldType(LogMessage.SystemField.ALL.fieldName) - != null) { - queryStringQueryBuilder.defaultField(LogMessage.SystemField.ALL.fieldName); - // setting lenient=false will not throw error when the query fails to parse against - // numeric fields - queryStringQueryBuilder.lenient(false); - } else { - queryStringQueryBuilder.lenient(true); - } - - queryStringQueryBuilder.analyzeWildcard(true); - - boolQueryBuilder.filter(queryStringQueryBuilder); - } - return boolQueryBuilder.rewrite(queryShardContext).toQuery(queryShardContext); + IdsQueryBuilder idsQueryBuilder = new IdsQueryBuilder(); + idsQueryBuilder.addIds("1"); + return idsQueryBuilder.rewrite(queryShardContext).toQuery(queryShardContext); + +// BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); +// +// // only add a range filter if either start or end time is provided +// if (startTimeMsEpoch != null || endTimeMsEpoch != null) { +// RangeQueryBuilder rangeQueryBuilder = +// new RangeQueryBuilder(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName); +// +// // todo - consider supporting something other than GTE/LTE (ie GT/LT?) +// if (startTimeMsEpoch != null) { +// rangeQueryBuilder.gte(startTimeMsEpoch); +// } +// +// if (endTimeMsEpoch != null) { +// rangeQueryBuilder.lte(endTimeMsEpoch); +// } +// +// boolQueryBuilder.filter(rangeQueryBuilder); +// } +// +// // todo - dataset? +// +// // Only add the query string clause if this is not attempting to fetch all records +// // Since we do analyze the wildcard this can cause unexpected behavior if only a wildcard is +// // provided +// if (queryStr != null +// && !queryStr.isEmpty() +// && !queryStr.equals("*:*") +// && !queryStr.equals("*")) { +// QueryStringQueryBuilder queryStringQueryBuilder = new QueryStringQueryBuilder(queryStr); +// +// if (queryShardContext.getMapperService().fieldType(LogMessage.SystemField.ALL.fieldName) +// != null) { +// queryStringQueryBuilder.defaultField(LogMessage.SystemField.ALL.fieldName); +// // setting lenient=false will not throw error when the query fails to parse against +// // numeric fields +// queryStringQueryBuilder.lenient(false); +// } else { +// queryStringQueryBuilder.lenient(true); +// } +// +// queryStringQueryBuilder.analyzeWildcard(true); +// +// +// boolQueryBuilder.filter(queryStringQueryBuilder); +// } +// return boolQueryBuilder.rewrite(queryShardContext).toQuery(queryShardContext); } catch (Exception e) { LOG.error("Query parse exception", e); throw new IllegalArgumentException(e); @@ -428,7 +435,7 @@ private static boolean tryRegisterField( String fieldName, CheckedConsumer buildField) { MappedFieldType fieldType = mapperService.fieldType(fieldName); - if (mapperService.isMetadataField(fieldName)) { + if (fieldName.equals("_index")) { LOG.trace("Skipping metadata field '{}'", fieldName); return false; } else if (fieldType != null) { diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java index 16edce5ea5..1d5347fbca 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.util.Strings; import org.apache.lucene.document.Document; import org.slf4j.Logger; @@ -74,8 +75,15 @@ public static ImmutableMap getDefaultLuceneFieldDefiniti }; for (String fieldName : fieldsAsString) { - fieldDefBuilder.put( - fieldName, new LuceneFieldDef(fieldName, FieldType.STRING.name, false, true, true)); + if (fieldName.equals(LogMessage.SystemField.ID.fieldName)) { + fieldDefBuilder.put( + fieldName, new LuceneFieldDef(fieldName, FieldType.STRING.name, true, true, true)); + } else { + fieldDefBuilder.put( + fieldName, new LuceneFieldDef(fieldName, FieldType.STRING.name, false, true, true)); + } + + } String[] fieldsAsLong = { @@ -233,7 +241,7 @@ private void indexNewField(Document doc, String key, Object value, FieldType val defaultPropDescription.isIndexed, defaultPropDescription.storeDocValue); // add the document to this field. - totalFieldsCounter.increment(); + totalFieldsGauge.incrementAndGet(); fieldDefMap.put(key, newFieldDef); indexTypedField(doc, key, value, newFieldDef); } @@ -311,6 +319,7 @@ public static SchemaAwareLogDocumentBuilderImpl build( static final String CONVERT_FIELD_VALUE_COUNTER = "convert_field_value"; static final String CONVERT_AND_DUPLICATE_FIELD_COUNTER = "convert_and_duplicate_field"; public static final String TOTAL_FIELDS_COUNTER = "total_fields"; + public static final String TOTAL_FIELDS_GAUGE = "index_fields"; private final FieldConflictPolicy indexFieldConflictPolicy; private final boolean enableFullTextSearch; @@ -319,7 +328,7 @@ public static SchemaAwareLogDocumentBuilderImpl build( private final Counter convertErrorCounter; private final Counter convertFieldValueCounter; private final Counter convertAndDuplicateFieldCounter; - private final Counter totalFieldsCounter; + private final AtomicInteger totalFieldsGauge; SchemaAwareLogDocumentBuilderImpl( FieldConflictPolicy indexFieldConflictPolicy, @@ -333,9 +342,9 @@ public static SchemaAwareLogDocumentBuilderImpl build( convertFieldValueCounter = meterRegistry.counter(CONVERT_FIELD_VALUE_COUNTER); convertAndDuplicateFieldCounter = meterRegistry.counter(CONVERT_AND_DUPLICATE_FIELD_COUNTER); convertErrorCounter = meterRegistry.counter(CONVERT_ERROR_COUNTER); - totalFieldsCounter = meterRegistry.counter(TOTAL_FIELDS_COUNTER); + totalFieldsGauge = + meterRegistry.gauge(TOTAL_FIELDS_GAUGE, new AtomicInteger(initialFields.size())); - totalFieldsCounter.increment(initialFields.size()); fieldDefMap.putAll(initialFields); } diff --git a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java index 575fbc2ab3..56638cb87a 100644 --- a/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/logstore/search/LogIndexSearcherImplTest.java @@ -86,6 +86,23 @@ private void loadTestData(Instant time) { strictLogStore.logStore.refresh(); } + @Test + public void testIndexSearch() { + Instant time = Instant.ofEpochSecond(1593365471); + loadTestData(time); + + SearchResult index = + strictLogStore.logSearcher.search( + TEST_DATASET_NAME, + "", + time.toEpochMilli(), + time.plusSeconds(2).toEpochMilli(), + 10, + new DateHistogramAggBuilder( + "1", LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, "1s")); + assertThat(index.hits.size()).isEqualTo(1); + } + @Test public void testTimeBoundSearch() { Instant time =