From b277b07e89e25f1abaa9de3a326fda3556dc8a77 Mon Sep 17 00:00:00 2001 From: Martin Gaievski Date: Wed, 24 Apr 2024 08:34:09 -0700 Subject: [PATCH] Replaced stream.findFirst by `for` loop for hybrid query (#706) * Change stream.findFirst to for loop Signed-off-by: Martin Gaievski Co-authored-by: Navneet Verma --- CHANGELOG.md | 1 + .../neuralsearch/query/HybridQueryScorer.java | 46 +++++++++++-------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7828007a1..683fda670 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - BWC tests for text chunking processor ([#661](https://github.com/opensearch-project/neural-search/pull/661)) - Allowing execution of hybrid query on index alias with filters ([#670](https://github.com/opensearch-project/neural-search/pull/670)) - Allowing query by raw tokens in neural_sparse query ([#693](https://github.com/opensearch-project/neural-search/pull/693)) +- Removed stream.findFirst implementation to use more native iteration implement to improve hybrid query latencies by 35% ([#706](https://github.com/opensearch-project/neural-search/pull/706)) ### Bug Fixes - Add support for request_cache flag in hybrid query ([#663](https://github.com/opensearch-project/neural-search/pull/663)) ### Infrastructure diff --git a/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java b/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java index f31d0abd9..1da610f53 100644 --- a/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java +++ b/src/main/java/org/opensearch/neuralsearch/query/HybridQueryScorer.java @@ -14,6 +14,7 @@ import java.util.Map; import java.util.Objects; +import com.google.common.primitives.Ints; import org.apache.lucene.search.DisiPriorityQueue; import org.apache.lucene.search.DisiWrapper; import org.apache.lucene.search.DisjunctionDISIApproximation; @@ -42,7 +43,7 @@ public final class HybridQueryScorer extends Scorer { private final float[] subScores; - private final Map> queryToIndex; + private final Map queryToIndex; private final DocIdSetIterator approximation; private final HybridScoreBlockBoundaryPropagator disjunctionBlockPropagator; @@ -201,29 +202,33 @@ public float[] hybridScores() throws IOException { continue; } Query query = scorer.getWeight().getQuery(); - List indexes = queryToIndex.get(query); + int[] indexes = queryToIndex.get(query); // we need to find the index of first sub-query that hasn't been set yet. Such score will have initial value of "0.0" - int index = indexes.stream() - .mapToInt(idx -> idx) - .filter(idx -> Float.compare(scores[idx], 0.0f) == 0) - .findFirst() - .orElseThrow( - () -> new IllegalStateException( - String.format( - Locale.ROOT, - "cannot set score for one of hybrid search subquery [%s] and document [%d]", - query.toString(), - scorer.docID() - ) + int index = -1; + for (int idx : indexes) { + if (Float.compare(scores[idx], 0.0f) == 0) { + index = idx; + break; + } + } + if (index == -1) { + throw new IllegalStateException( + String.format( + Locale.ROOT, + "cannot set score for one of hybrid search subquery [%s] and document [%d]", + query.toString(), + scorer.docID() ) ); + } scores[index] = scorer.score(); } return scores; } - private Map> mapQueryToIndex() { - Map> queryToIndex = new HashMap<>(); + private Map mapQueryToIndex() { + // we need list as number of identical queries is unknown + Map> queryToListOfIndexes = new HashMap<>(); int idx = 0; for (Scorer scorer : subScorers) { if (scorer == null) { @@ -231,10 +236,13 @@ private Map> mapQueryToIndex() { continue; } Query query = scorer.getWeight().getQuery(); - queryToIndex.putIfAbsent(query, new ArrayList<>()); - queryToIndex.get(query).add(idx); + queryToListOfIndexes.putIfAbsent(query, new ArrayList<>()); + queryToListOfIndexes.get(query).add(idx); idx++; } + // convert to the int array for better performance + Map queryToIndex = new HashMap<>(); + queryToListOfIndexes.forEach((key, value) -> queryToIndex.put(key, Ints.toArray(value))); return queryToIndex; } @@ -242,7 +250,7 @@ private DisiPriorityQueue initializeSubScorersPQ() { Objects.requireNonNull(queryToIndex, "should not be null"); Objects.requireNonNull(subScorers, "should not be null"); // we need to count this way in order to include all identical sub-queries - int numOfSubQueries = queryToIndex.values().stream().map(List::size).reduce(0, Integer::sum); + int numOfSubQueries = queryToIndex.values().stream().map(array -> array.length).reduce(0, Integer::sum); DisiPriorityQueue subScorersPQ = new DisiPriorityQueue(numOfSubQueries); for (Scorer scorer : subScorers) { if (scorer == null) {