Skip to content

Commit

Permalink
Merge branch '2.13' into backport/backport-713-to-2.13
Browse files Browse the repository at this point in the history
Signed-off-by: yuye-aws <[email protected]>
  • Loading branch information
yuye-aws authored May 1, 2024
2 parents bd32dd2 + 8552a2c commit 616684f
Show file tree
Hide file tree
Showing 12 changed files with 532 additions and 203 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Features
### Enhancements
- Allowing execution of hybrid query on index alias with filters ([#670](https://github.com/opensearch-project/neural-search/pull/670))
- Removed stream.findFirst implementation to use more native iteration implement to improve hybrid query latencies by 35% ([#706](https://github.com/opensearch-project/neural-search/pull/706))
- Removed map of subquery to subquery index in favor of storing index as part of disi wrapper to improve hybrid query latencies by 20% ([#711](https://github.com/opensearch-project/neural-search/pull/711))
### Bug Fixes
- Add support for request_cache flag in hybrid query ([#663](https://github.com/opensearch-project/neural-search/pull/663))
- Fix multi node "no such index" error in text chunking processor ([#713](https://github.com/opensearch-project/neural-search/pull/713))
- Avoid change max_chunk_limit exceed exception in text chunking processor ([#717](https://github.com/opensearch-project/neural-search/pull/717))
### Infrastructure
### Documentation
### Maintenance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.env.Environment;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -28,6 +29,7 @@
import static org.opensearch.neuralsearch.processor.chunker.Chunker.MAX_CHUNK_LIMIT_FIELD;
import static org.opensearch.neuralsearch.processor.chunker.Chunker.DEFAULT_MAX_CHUNK_LIMIT;
import static org.opensearch.neuralsearch.processor.chunker.Chunker.DISABLED_MAX_CHUNK_LIMIT;
import static org.opensearch.neuralsearch.processor.chunker.Chunker.CHUNK_STRING_COUNT_FIELD;
import static org.opensearch.neuralsearch.processor.chunker.ChunkerParameterParser.parseIntegerParameter;

/**
Expand Down Expand Up @@ -165,9 +167,11 @@ public IngestDocument execute(final IngestDocument ingestDocument) {
// fixed token length algorithm needs runtime parameter max_token_count for tokenization
Map<String, Object> runtimeParameters = new HashMap<>();
int maxTokenCount = getMaxTokenCount(sourceAndMetadataMap);
int chunkStringCount = getChunkStringCountFromMap(sourceAndMetadataMap, fieldMap);
runtimeParameters.put(FixedTokenLengthChunker.MAX_TOKEN_COUNT_FIELD, maxTokenCount);
runtimeParameters.put(MAX_CHUNK_LIMIT_FIELD, maxChunkLimit);
chunkMapType(sourceAndMetadataMap, fieldMap, runtimeParameters, 0);
runtimeParameters.put(CHUNK_STRING_COUNT_FIELD, chunkStringCount);
chunkMapType(sourceAndMetadataMap, fieldMap, runtimeParameters);
return ingestDocument;
}

Expand Down Expand Up @@ -225,13 +229,51 @@ private void validateListTypeValue(final String sourceKey, final Object sourceVa
}

@SuppressWarnings("unchecked")
private int chunkMapType(
private int getChunkStringCountFromMap(Map<String, Object> sourceAndMetadataMap, final Map<String, Object> fieldMap) {
int chunkStringCount = 0;
for (Map.Entry<String, Object> fieldMapEntry : fieldMap.entrySet()) {
String originalKey = fieldMapEntry.getKey();
Object targetKey = fieldMapEntry.getValue();
if (targetKey instanceof Map) {
// call this method recursively when target key is a map
Object sourceObject = sourceAndMetadataMap.get(originalKey);
if (sourceObject instanceof List) {
List<Object> sourceObjectList = (List<Object>) sourceObject;
for (Object source : sourceObjectList) {
if (source instanceof Map) {
chunkStringCount += getChunkStringCountFromMap((Map<String, Object>) source, (Map<String, Object>) targetKey);
}
}
} else if (sourceObject instanceof Map) {
chunkStringCount += getChunkStringCountFromMap((Map<String, Object>) sourceObject, (Map<String, Object>) targetKey);
}
} else {
// chunk the object when target key is of leaf type (null, string and list of string)
Object chunkObject = sourceAndMetadataMap.get(originalKey);
chunkStringCount += getChunkStringCountFromLeafType(chunkObject);
}
}
return chunkStringCount;
}

@SuppressWarnings("unchecked")
private int getChunkStringCountFromLeafType(final Object value) {
// leaf type means null, String or List<String>
// the result should be an empty list when the input is null
if (value instanceof String) {
return StringUtils.isEmpty((String) value) ? 0 : 1;
} else if (isListOfString(value)) {
return (int) ((List<String>) value).stream().filter(s -> !StringUtils.isEmpty(s)).count();
}
return 0;
}

@SuppressWarnings("unchecked")
private void chunkMapType(
Map<String, Object> sourceAndMetadataMap,
final Map<String, Object> fieldMap,
final Map<String, Object> runtimeParameters,
final int chunkCount
final Map<String, Object> runtimeParameters
) {
int updatedChunkCount = chunkCount;
for (Map.Entry<String, Object> fieldMapEntry : fieldMap.entrySet()) {
String originalKey = fieldMapEntry.getKey();
Object targetKey = fieldMapEntry.getValue();
Expand All @@ -242,21 +284,11 @@ private int chunkMapType(
List<Object> sourceObjectList = (List<Object>) sourceObject;
for (Object source : sourceObjectList) {
if (source instanceof Map) {
updatedChunkCount = chunkMapType(
(Map<String, Object>) source,
(Map<String, Object>) targetKey,
runtimeParameters,
updatedChunkCount
);
chunkMapType((Map<String, Object>) source, (Map<String, Object>) targetKey, runtimeParameters);
}
}
} else if (sourceObject instanceof Map) {
updatedChunkCount = chunkMapType(
(Map<String, Object>) sourceObject,
(Map<String, Object>) targetKey,
runtimeParameters,
updatedChunkCount
);
chunkMapType((Map<String, Object>) sourceObject, (Map<String, Object>) targetKey, runtimeParameters);
}
} else {
// chunk the object when target key is of leaf type (null, string and list of string)
Expand All @@ -265,15 +297,21 @@ private int chunkMapType(
sourceAndMetadataMap.put(String.valueOf(targetKey), chunkedResult);
}
}
return updatedChunkCount;
}

/**
* Chunk the content, update the runtime max_chunk_limit and return the result
*/
private List<String> chunkString(final String content, final Map<String, Object> runTimeParameters) {
// update runtime max_chunk_limit if not disabled
// return an empty list for empty string
if (StringUtils.isEmpty(content)) {
return List.of();
}
List<String> contentResult = chunker.chunk(content, runTimeParameters);
// update chunk_string_count for each string
int chunkStringCount = parseIntegerParameter(runTimeParameters, CHUNK_STRING_COUNT_FIELD, 1);
runTimeParameters.put(CHUNK_STRING_COUNT_FIELD, chunkStringCount - 1);
// update runtime max_chunk_limit if not disabled
int runtimeMaxChunkLimit = parseIntegerParameter(runTimeParameters, MAX_CHUNK_LIMIT_FIELD, maxChunkLimit);
if (runtimeMaxChunkLimit != DISABLED_MAX_CHUNK_LIMIT) {
runTimeParameters.put(MAX_CHUNK_LIMIT_FIELD, runtimeMaxChunkLimit - contentResult.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
public interface Chunker {

String MAX_CHUNK_LIMIT_FIELD = "max_chunk_limit";
String CHUNK_STRING_COUNT_FIELD = "chunk_string_count";
int DEFAULT_MAX_CHUNK_LIMIT = 100;
int DISABLED_MAX_CHUNK_LIMIT = -1;

Expand All @@ -33,4 +34,16 @@ public interface Chunker {
* @return chunked passages
*/
List<String> chunk(String content, Map<String, Object> runtimeParameters);

/**
* Checks whether the chunking results would exceed the max chunk limit after adding a passage
* If exceeds, then return true
*
* @param chunkResultSize the size of chunking result
* @param runtimeMaxChunkLimit runtime max_chunk_limit, used to check with chunkResultSize
* @param chunkStringCount runtime chunk_string_count, used to check with chunkResultSize
*/
static boolean checkRunTimeMaxChunkLimit(int chunkResultSize, int runtimeMaxChunkLimit, int chunkStringCount) {
return runtimeMaxChunkLimit != DISABLED_MAX_CHUNK_LIMIT && chunkResultSize + chunkStringCount >= runtimeMaxChunkLimit;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,29 @@ public void parseParameters(Map<String, Object> parameters) {
* @param content input string
* @param runtimeParameters a map for runtime parameters, containing the following runtime parameters:
* 1. max_chunk_limit field level max chunk limit
* 2. chunk_string_count number of non-empty strings (including itself) which need to be chunked later
*/
@Override
public List<String> chunk(final String content, final Map<String, Object> runtimeParameters) {
int runtimeMaxChunkLimit = parseIntegerParameter(runtimeParameters, MAX_CHUNK_LIMIT_FIELD, maxChunkLimit);
int chunkStringCount = parseIntegerParameter(runtimeParameters, CHUNK_STRING_COUNT_FIELD, 1);

List<String> chunkResult = new ArrayList<>();
int start = 0, end;
int nextDelimiterPosition = content.indexOf(delimiter);

while (nextDelimiterPosition != -1) {
ChunkerUtil.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, maxChunkLimit);
if (Chunker.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, chunkStringCount)) {
break;
}
end = nextDelimiterPosition + delimiter.length();
chunkResult.add(content.substring(start, end));
start = end;
nextDelimiterPosition = content.indexOf(delimiter, start);
}

// add the rest content into the chunk result
if (start < content.length()) {
ChunkerUtil.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, maxChunkLimit);
chunkResult.add(content.substring(start));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,13 @@ public void parseParameters(Map<String, Object> parameters) {
* @param runtimeParameters a map for runtime parameters, containing the following runtime parameters:
* 1. max_token_count the max token limit for the tokenizer
* 2. max_chunk_limit field level max chunk limit
* 3. chunk_string_count number of non-empty strings (including itself) which need to be chunked later
*/
@Override
public List<String> chunk(final String content, final Map<String, Object> runtimeParameters) {
int maxTokenCount = parsePositiveIntegerParameter(runtimeParameters, MAX_TOKEN_COUNT_FIELD, DEFAULT_MAX_TOKEN_COUNT);
int runtimeMaxChunkLimit = parseIntegerParameter(runtimeParameters, MAX_CHUNK_LIMIT_FIELD, this.maxChunkLimit);
int chunkStringCount = parseIntegerParameter(runtimeParameters, CHUNK_STRING_COUNT_FIELD, 1);

List<AnalyzeToken> tokens = tokenize(content, tokenizer, maxTokenCount);
List<String> chunkResult = new ArrayList<>();
Expand All @@ -131,13 +133,17 @@ public List<String> chunk(final String content, final Map<String, Object> runtim
int overlapTokenNumber = (int) Math.floor(tokenLimit * overlapRate);

while (startTokenIndex < tokens.size()) {
ChunkerUtil.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, maxChunkLimit);
if (startTokenIndex == 0) {
// include all characters till the start if no previous passage
startContentPosition = 0;
} else {
startContentPosition = tokens.get(startTokenIndex).getStartOffset();
}
if (Chunker.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, chunkStringCount)) {
// include all characters till the end if exceeds max chunk limit
chunkResult.add(content.substring(startContentPosition));
break;
}
if (startTokenIndex + tokenLimit >= tokens.size()) {
// include all characters till the end if no next passage
endContentPosition = content.length();
Expand Down
Loading

0 comments on commit 616684f

Please sign in to comment.