Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.13] Fix: change max chunk limit exception #720

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Removed map of subquery to subquery index in favor of storing index as part of disi wrapper to improve hybrid query latencies by 20% ([#711](https://github.com/opensearch-project/neural-search/pull/711))
### Bug Fixes
- Add support for request_cache flag in hybrid query ([#663](https://github.com/opensearch-project/neural-search/pull/663))
- Avoid change max_chunk_limit exceed exception in text chunking processor ([#717](https://github.com/opensearch-project/neural-search/pull/717))
### Infrastructure
### Documentation
### Maintenance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexService;
Expand All @@ -30,6 +31,7 @@
import static org.opensearch.neuralsearch.processor.chunker.Chunker.MAX_CHUNK_LIMIT_FIELD;
import static org.opensearch.neuralsearch.processor.chunker.Chunker.DEFAULT_MAX_CHUNK_LIMIT;
import static org.opensearch.neuralsearch.processor.chunker.Chunker.DISABLED_MAX_CHUNK_LIMIT;
import static org.opensearch.neuralsearch.processor.chunker.Chunker.CHUNK_STRING_COUNT_FIELD;
import static org.opensearch.neuralsearch.processor.chunker.ChunkerParameterParser.parseIntegerParameter;

/**
Expand Down Expand Up @@ -170,9 +172,11 @@ public IngestDocument execute(final IngestDocument ingestDocument) {
// fixed token length algorithm needs runtime parameter max_token_count for tokenization
Map<String, Object> runtimeParameters = new HashMap<>();
int maxTokenCount = getMaxTokenCount(sourceAndMetadataMap);
int chunkStringCount = getChunkStringCountFromMap(sourceAndMetadataMap, fieldMap);
runtimeParameters.put(FixedTokenLengthChunker.MAX_TOKEN_COUNT_FIELD, maxTokenCount);
runtimeParameters.put(MAX_CHUNK_LIMIT_FIELD, maxChunkLimit);
chunkMapType(sourceAndMetadataMap, fieldMap, runtimeParameters, 0);
runtimeParameters.put(CHUNK_STRING_COUNT_FIELD, chunkStringCount);
chunkMapType(sourceAndMetadataMap, fieldMap, runtimeParameters);
return ingestDocument;
}

Expand Down Expand Up @@ -230,13 +234,51 @@ private void validateListTypeValue(final String sourceKey, final Object sourceVa
}

@SuppressWarnings("unchecked")
private int chunkMapType(
private int getChunkStringCountFromMap(Map<String, Object> sourceAndMetadataMap, final Map<String, Object> fieldMap) {
int chunkStringCount = 0;
for (Map.Entry<String, Object> fieldMapEntry : fieldMap.entrySet()) {
String originalKey = fieldMapEntry.getKey();
Object targetKey = fieldMapEntry.getValue();
if (targetKey instanceof Map) {
// call this method recursively when target key is a map
Object sourceObject = sourceAndMetadataMap.get(originalKey);
if (sourceObject instanceof List) {
List<Object> sourceObjectList = (List<Object>) sourceObject;
for (Object source : sourceObjectList) {
if (source instanceof Map) {
chunkStringCount += getChunkStringCountFromMap((Map<String, Object>) source, (Map<String, Object>) targetKey);
}
}
} else if (sourceObject instanceof Map) {
chunkStringCount += getChunkStringCountFromMap((Map<String, Object>) sourceObject, (Map<String, Object>) targetKey);
}
} else {
// chunk the object when target key is of leaf type (null, string and list of string)
Object chunkObject = sourceAndMetadataMap.get(originalKey);
chunkStringCount += getChunkStringCountFromLeafType(chunkObject);
}
}
return chunkStringCount;
}

@SuppressWarnings("unchecked")
private int getChunkStringCountFromLeafType(final Object value) {
// leaf type means null, String or List<String>
// the result should be an empty list when the input is null
if (value instanceof String) {
return StringUtils.isEmpty((String) value) ? 0 : 1;
} else if (isListOfString(value)) {
return (int) ((List<String>) value).stream().filter(s -> !StringUtils.isEmpty(s)).count();
}
return 0;
}

@SuppressWarnings("unchecked")
private void chunkMapType(
Map<String, Object> sourceAndMetadataMap,
final Map<String, Object> fieldMap,
final Map<String, Object> runtimeParameters,
final int chunkCount
final Map<String, Object> runtimeParameters
) {
int updatedChunkCount = chunkCount;
for (Map.Entry<String, Object> fieldMapEntry : fieldMap.entrySet()) {
String originalKey = fieldMapEntry.getKey();
Object targetKey = fieldMapEntry.getValue();
Expand All @@ -247,21 +289,11 @@ private int chunkMapType(
List<Object> sourceObjectList = (List<Object>) sourceObject;
for (Object source : sourceObjectList) {
if (source instanceof Map) {
updatedChunkCount = chunkMapType(
(Map<String, Object>) source,
(Map<String, Object>) targetKey,
runtimeParameters,
updatedChunkCount
);
chunkMapType((Map<String, Object>) source, (Map<String, Object>) targetKey, runtimeParameters);
}
}
} else if (sourceObject instanceof Map) {
updatedChunkCount = chunkMapType(
(Map<String, Object>) sourceObject,
(Map<String, Object>) targetKey,
runtimeParameters,
updatedChunkCount
);
chunkMapType((Map<String, Object>) sourceObject, (Map<String, Object>) targetKey, runtimeParameters);
}
} else {
// chunk the object when target key is of leaf type (null, string and list of string)
Expand All @@ -270,15 +302,21 @@ private int chunkMapType(
sourceAndMetadataMap.put(String.valueOf(targetKey), chunkedResult);
}
}
return updatedChunkCount;
}

/**
* Chunk the content, update the runtime max_chunk_limit and return the result
*/
private List<String> chunkString(final String content, final Map<String, Object> runTimeParameters) {
// update runtime max_chunk_limit if not disabled
// return an empty list for empty string
if (StringUtils.isEmpty(content)) {
return List.of();
}
List<String> contentResult = chunker.chunk(content, runTimeParameters);
// update chunk_string_count for each string
int chunkStringCount = parseIntegerParameter(runTimeParameters, CHUNK_STRING_COUNT_FIELD, 1);
runTimeParameters.put(CHUNK_STRING_COUNT_FIELD, chunkStringCount - 1);
// update runtime max_chunk_limit if not disabled
int runtimeMaxChunkLimit = parseIntegerParameter(runTimeParameters, MAX_CHUNK_LIMIT_FIELD, maxChunkLimit);
if (runtimeMaxChunkLimit != DISABLED_MAX_CHUNK_LIMIT) {
runTimeParameters.put(MAX_CHUNK_LIMIT_FIELD, runtimeMaxChunkLimit - contentResult.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
public interface Chunker {

String MAX_CHUNK_LIMIT_FIELD = "max_chunk_limit";
String CHUNK_STRING_COUNT_FIELD = "chunk_string_count";
int DEFAULT_MAX_CHUNK_LIMIT = 100;
int DISABLED_MAX_CHUNK_LIMIT = -1;

Expand All @@ -33,4 +34,16 @@ public interface Chunker {
* @return chunked passages
*/
List<String> chunk(String content, Map<String, Object> runtimeParameters);

/**
* Checks whether the chunking results would exceed the max chunk limit after adding a passage
* If exceeds, then return true
*
* @param chunkResultSize the size of chunking result
* @param runtimeMaxChunkLimit runtime max_chunk_limit, used to check with chunkResultSize
* @param chunkStringCount runtime chunk_string_count, used to check with chunkResultSize
*/
static boolean checkRunTimeMaxChunkLimit(int chunkResultSize, int runtimeMaxChunkLimit, int chunkStringCount) {
return runtimeMaxChunkLimit != DISABLED_MAX_CHUNK_LIMIT && chunkResultSize + chunkStringCount >= runtimeMaxChunkLimit;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,29 @@ public void parseParameters(Map<String, Object> parameters) {
* @param content input string
* @param runtimeParameters a map for runtime parameters, containing the following runtime parameters:
* 1. max_chunk_limit field level max chunk limit
* 2. chunk_string_count number of non-empty strings (including itself) which need to be chunked later
*/
@Override
public List<String> chunk(final String content, final Map<String, Object> runtimeParameters) {
int runtimeMaxChunkLimit = parseIntegerParameter(runtimeParameters, MAX_CHUNK_LIMIT_FIELD, maxChunkLimit);
int chunkStringCount = parseIntegerParameter(runtimeParameters, CHUNK_STRING_COUNT_FIELD, 1);

List<String> chunkResult = new ArrayList<>();
int start = 0, end;
int nextDelimiterPosition = content.indexOf(delimiter);

while (nextDelimiterPosition != -1) {
ChunkerUtil.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, maxChunkLimit);
if (Chunker.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, chunkStringCount)) {
break;
}
end = nextDelimiterPosition + delimiter.length();
chunkResult.add(content.substring(start, end));
start = end;
nextDelimiterPosition = content.indexOf(delimiter, start);
}

// add the rest content into the chunk result
if (start < content.length()) {
ChunkerUtil.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, maxChunkLimit);
chunkResult.add(content.substring(start));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,13 @@ public void parseParameters(Map<String, Object> parameters) {
* @param runtimeParameters a map for runtime parameters, containing the following runtime parameters:
* 1. max_token_count the max token limit for the tokenizer
* 2. max_chunk_limit field level max chunk limit
* 3. chunk_string_count number of non-empty strings (including itself) which need to be chunked later
*/
@Override
public List<String> chunk(final String content, final Map<String, Object> runtimeParameters) {
int maxTokenCount = parsePositiveIntegerParameter(runtimeParameters, MAX_TOKEN_COUNT_FIELD, DEFAULT_MAX_TOKEN_COUNT);
int runtimeMaxChunkLimit = parseIntegerParameter(runtimeParameters, MAX_CHUNK_LIMIT_FIELD, this.maxChunkLimit);
int chunkStringCount = parseIntegerParameter(runtimeParameters, CHUNK_STRING_COUNT_FIELD, 1);

List<AnalyzeToken> tokens = tokenize(content, tokenizer, maxTokenCount);
List<String> chunkResult = new ArrayList<>();
Expand All @@ -131,13 +133,17 @@ public List<String> chunk(final String content, final Map<String, Object> runtim
int overlapTokenNumber = (int) Math.floor(tokenLimit * overlapRate);

while (startTokenIndex < tokens.size()) {
ChunkerUtil.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, maxChunkLimit);
if (startTokenIndex == 0) {
// include all characters till the start if no previous passage
startContentPosition = 0;
} else {
startContentPosition = tokens.get(startTokenIndex).getStartOffset();
}
if (Chunker.checkRunTimeMaxChunkLimit(chunkResult.size(), runtimeMaxChunkLimit, chunkStringCount)) {
// include all characters till the end if exceeds max chunk limit
chunkResult.add(content.substring(startContentPosition));
break;
}
if (startTokenIndex + tokenLimit >= tokens.size()) {
// include all characters till the end if no next passage
endContentPosition = content.length();
Expand Down
Loading
Loading