Skip to content

Commit

Permalink
feat: implement text chunking processor with fixed token length and d…
Browse files Browse the repository at this point in the history
…elimiter algorithm (opensearch-project#607)

* implement chunking processor and fixed token length

Signed-off-by: yuye-aws <[email protected]>

* initialize node client for document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* initialize document chunking processor with analysis registry

Signed-off-by: yuye-aws <[email protected]>

* chunker factory create with analysis registry

Signed-off-by: yuye-aws <[email protected]>

* implement tokenizer in fixed token length algorithm with analysis registry

Signed-off-by: yuye-aws <[email protected]>

* add max token count parsing logic

Signed-off-by: yuye-aws <[email protected]>

* bug fix for non-existing index

Signed-off-by: yuye-aws <[email protected]>

* change error log

Signed-off-by: yuye-aws <[email protected]>

* implement evenly chunk

Signed-off-by: yuye-aws <[email protected]>

* unit tests for chunker factory

Signed-off-by: yuye-aws <[email protected]>

* unit tests for chunker factory

Signed-off-by: yuye-aws <[email protected]>

* add error message for chunker factory tests

Signed-off-by: yuye-aws <[email protected]>

* resolve comments

Signed-off-by: yuye-aws <[email protected]>

* Revert "implement evenly chunk"

This reverts commit 93dd2f4.

Signed-off-by: yuye-aws <[email protected]>

* add default value logic back

Signed-off-by: yuye-aws <[email protected]>

* implement unit test for fixed token length chunker

Signed-off-by: yuye-aws <[email protected]>

* add test cases in unit test for fixed token length chunker

Signed-off-by: yuye-aws <[email protected]>

* support map type as an input

Signed-off-by: yuye-aws <[email protected]>

* support map type as an input

Signed-off-by: yuye-aws <[email protected]>

* bug fix for map type

Signed-off-by: yuye-aws <[email protected]>

* bug fix for map type

Signed-off-by: yuye-aws <[email protected]>

* bug fix for map type in document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* remove system out println

Signed-off-by: yuye-aws <[email protected]>

* add delimiter chunker

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* add UT for delimiter chunker

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* add delimiter chunker processor

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* add more UTs

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* add more UTs

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* basic unit tests for document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* fix tests for getProcessors in neural search

Signed-off-by: yuye-aws <[email protected]>

* add unit tests with string, map and nested map type for document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* add unit tests for parameter valdiation in document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* add back deleted xml file

Signed-off-by: yuye-aws <[email protected]>

* restore xml file

Signed-off-by: yuye-aws <[email protected]>

* integration tests for document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* add back Run_Neural_Search.xml

Signed-off-by: yuye-aws <[email protected]>

* restore Run_Neural_Search.xml

Signed-off-by: yuye-aws <[email protected]>

* add changelog

Signed-off-by: yuye-aws <[email protected]>

* update integration test for cascade processor

Signed-off-by: yuye-aws <[email protected]>

* add max chunk limit

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* remove useless and apply spotless

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* update error message

Signed-off-by: yuye-aws <[email protected]>

* change field UT

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* remove useless and apply spotless

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* change logic of max chunk number

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* add max chunk limit into fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* Support list<list<string>> type in embedding and extract validation logic to common class

Signed-off-by: zane-neo <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* fix unit tests for inference processor

Signed-off-by: yuye-aws <[email protected]>

* implement unit tests for unit tests with max_chunk_limit in fixed token length

Signed-off-by: yuye-aws <[email protected]>

* constructor for inference processor

Signed-off-by: yuye-aws <[email protected]>

* use inference processor

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* draft code for extending inference processor with document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* api refactor for document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* remove nested list key for chunking processor

Signed-off-by: yuye-aws <[email protected]>

* remove unused function

Signed-off-by: yuye-aws <[email protected]>

* remove processor validator

Signed-off-by: yuye-aws <[email protected]>

* remove processor validator

Signed-off-by: yuye-aws <[email protected]>

* Revert InferenceProcessor.java

Signed-off-by: Yuye Zhu <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* revert changes in text embedding and sparse encoding processor

Signed-off-by: yuye-aws <[email protected]>

* implement chunk with map in document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* add default delimiter value

Signed-off-by: Lu <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* implement max chunk logic in document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* add initial value for max chunk limit in document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* bug fix in chunking processor: allow 0 max_chunk_limit

Signed-off-by: yuye-aws <[email protected]>

* implement overlap rate with big decimal

Signed-off-by: yuye-aws <[email protected]>

* update max chunk limit in delimiter

Signed-off-by: yuye-aws <[email protected]>

* update parameter setting for fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* update max chunk limit implementation in chunking processor

Signed-off-by: yuye-aws <[email protected]>

* fix unit tests for fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* spotless apply for document chunking processor

Signed-off-by: yuye-aws <[email protected]>

* initialize current chunk count

Signed-off-by: yuye-aws <[email protected]>

* parameter validation for max chunk limit

Signed-off-by: yuye-aws <[email protected]>

* fix integration tests

Signed-off-by: yuye-aws <[email protected]>

* fix current UT

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* change delimiter UT

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* remove delimiter useless code

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* add more UT

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* add UT for list inside map

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* add UT for list inside map

Signed-off-by: xinyual <[email protected]>
Signed-off-by: yuye-aws <[email protected]>

* update unit tests for chunking processor

Signed-off-by: yuye-aws <[email protected]>

* add more unit tests for chunking processor

Signed-off-by: yuye-aws <[email protected]>

* resolve code review comments

Signed-off-by: yuye-aws <[email protected]>

* add java doc

Signed-off-by: yuye-aws <[email protected]>

* update java doc

Signed-off-by: yuye-aws <[email protected]>

* update java doc

Signed-off-by: yuye-aws <[email protected]>

* fix import order

Signed-off-by: yuye-aws <[email protected]>

* update java doc

Signed-off-by: yuye-aws <[email protected]>

* fix java doc error

Signed-off-by: yuye-aws <[email protected]>

* fix update ut for fixed token length chunker

Signed-off-by: yuye-aws <[email protected]>

* resolve code review comments

Signed-off-by: yuye-aws <[email protected]>

* resolve code review comments

Signed-off-by: yuye-aws <[email protected]>

* resolve code review comments

Signed-off-by: yuye-aws <[email protected]>

* resolve code review comments

Signed-off-by: yuye-aws <[email protected]>

* implement chunk count wrapper for max chunk limit

Signed-off-by: yuye-aws <[email protected]>

* rename variable end to nextDelimiterPosition

Signed-off-by: yuye-aws <[email protected]>

* adjust method place

Signed-off-by: yuye-aws <[email protected]>

* update java doc for fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* reanme interface name and fixed token length algorithm name

Signed-off-by: yuye-aws <[email protected]>

* update fixed token length algorithm configuration for integration tests

Signed-off-by: yuye-aws <[email protected]>

* make delimiter member variables static

Signed-off-by: yuye-aws <[email protected]>

* remove redundant set field value in execute method

Signed-off-by: yuye-aws <[email protected]>

* resolve code review comments

Signed-off-by: yuye-aws <[email protected]>

* add integration tests with more tokenizers

Signed-off-by: yuye-aws <[email protected]>

* bug fix: unit test failure due to invalid tokenizer

Signed-off-by: yuye-aws <[email protected]>

* bug fix: token concatenation in fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* update chunker interface

Signed-off-by: yuye-aws <[email protected]>

* track chunkCount within function

Signed-off-by: yuye-aws <[email protected]>

* bug fix: allow white space as the delimiter

Signed-off-by: yuye-aws <[email protected]>

* fix fixed length chunker

Signed-off-by: xinyual <[email protected]>

* fix delimiter chunker

Signed-off-by: xinyual <[email protected]>

* fix chunker factory

Signed-off-by: xinyual <[email protected]>

* fix UTs

Signed-off-by: xinyual <[email protected]>

* fix UT and chunker factory

Signed-off-by: xinyual <[email protected]>

* move analysis_registry to non-runtime parameters

Signed-off-by: xinyual <[email protected]>

* fix Uts

Signed-off-by: xinyual <[email protected]>

* avoid java doc change

Signed-off-by: xinyual <[email protected]>

* move validate to commonUtlis

Signed-off-by: xinyual <[email protected]>

* remove useless function

Signed-off-by: xinyual <[email protected]>

* change java doc

Signed-off-by: xinyual <[email protected]>

* fix Document process ut

Signed-off-by: xinyual <[email protected]>

* fixed token length: re-implement with start and end offset

Signed-off-by: yuye-aws <[email protected]>

* update exception message

Signed-off-by: yuye-aws <[email protected]>

* fix document chunking processor IT

Signed-off-by: yuye-aws <[email protected]>

* bug fix: adjust start, end content position in fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* update changelog for 2.x release

Signed-off-by: yuye-aws <[email protected]>

* rename processor

Signed-off-by: yuye-aws <[email protected]>

* update default delimiter to be \n\n

Signed-off-by: yuye-aws <[email protected]>

* remove change log in 3.0 unreleased

Signed-off-by: yuye-aws <[email protected]>

* fix IT failure due to chunking processor rename

Signed-off-by: yuye-aws <[email protected]>

* update javadoc for text chunking processor factory

Signed-off-by: yuye-aws <[email protected]>

* adjust functions in chunker interface

Signed-off-by: yuye-aws <[email protected]>

* move algorithm name definition to concrete chunker class

Signed-off-by: yuye-aws <[email protected]>

* update string formatted message for text chunking processor

Signed-off-by: yuye-aws <[email protected]>

* update string formatted message for chunker factory

Signed-off-by: yuye-aws <[email protected]>

* update string formatted message for chunker parameter validator

Signed-off-by: yuye-aws <[email protected]>

* update java doc for delimiter algorithm

Signed-off-by: yuye-aws <[email protected]>

* support range double in chunker parameter validator

Signed-off-by: yuye-aws <[email protected]>

* update string formatted message for fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* update sneaky throw with text chunking processor it

Signed-off-by: yuye-aws <[email protected]>

* add word tokenizer restriction for fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* update error message for multiple algorithms in text chunking processor

Signed-off-by: yuye-aws <[email protected]>

* add comment in text chunking processor

Signed-off-by: yuye-aws <[email protected]>

* validate max chunk limit with util parameter class

Signed-off-by: yuye-aws <[email protected]>

* update comments

Signed-off-by: yuye-aws <[email protected]>

* update comments

Signed-off-by: yuye-aws <[email protected]>

* update java doc

Signed-off-by: yuye-aws <[email protected]>

* update java doc

Signed-off-by: yuye-aws <[email protected]>

* make parameter final

Signed-off-by: yuye-aws <[email protected]>

* implement a map from chunker name to constuctor function in chunker factory

Signed-off-by: yuye-aws <[email protected]>

* bug fix in chunker factory

Signed-off-by: yuye-aws <[email protected]>

* remove get all chunkers in chunker factory

Signed-off-by: yuye-aws <[email protected]>

* remove type check for parameter check for max token count

Signed-off-by: yuye-aws <[email protected]>

* remove type check for parameter check for analysis registry

Signed-off-by: yuye-aws <[email protected]>

* implement parser and validator

Signed-off-by: yuye-aws <[email protected]>

* update comment

Signed-off-by: yuye-aws <[email protected]>

* provide fixed token length as the default algorithm

Signed-off-by: yuye-aws <[email protected]>

* adjust exception message

Signed-off-by: yuye-aws <[email protected]>

* adjust exception message

Signed-off-by: yuye-aws <[email protected]>

* use object nonnull and require nonnull

Signed-off-by: yuye-aws <[email protected]>

* apply final to ingest document and chunk count

Signed-off-by: yuye-aws <[email protected]>

* merge parameter validator into the parser

Signed-off-by: yuye-aws <[email protected]>

* assign positive default value for max chunk limit

Signed-off-by: yuye-aws <[email protected]>

* validate supported chunker algorithm in text chunking processor

Signed-off-by: yuye-aws <[email protected]>

* update parameter setting of max chunk limit

Signed-off-by: yuye-aws <[email protected]>

* add unit test with non list of string

Signed-off-by: yuye-aws <[email protected]>

* add unit test with null input

Signed-off-by: yuye-aws <[email protected]>

* add unit test for tokenization excpetion in fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* tune method name in text chunking processor unit test

Signed-off-by: yuye-aws <[email protected]>

* tune method name in delimiter algorithm unit test

Signed-off-by: yuye-aws <[email protected]>

* add unit test for overlap rate too small in fixed token length algorithm

Signed-off-by: yuye-aws <[email protected]>

* tune method modifier for all classes

Signed-off-by: yuye-aws <[email protected]>

* tune code

Signed-off-by: yuye-aws <[email protected]>

* tune code

Signed-off-by: yuye-aws <[email protected]>

* tune exception type in parameter parser

Signed-off-by: yuye-aws <[email protected]>

* tune comment

Signed-off-by: yuye-aws <[email protected]>

* tune comment

Signed-off-by: yuye-aws <[email protected]>

* include max chunk limit in both algorithms

Signed-off-by: yuye-aws <[email protected]>

* tune comment

Signed-off-by: yuye-aws <[email protected]>

* allow 0 for max chunk limit

Signed-off-by: yuye-aws <[email protected]>

* update runtime max chunk limit in text chunking processor

Signed-off-by: yuye-aws <[email protected]>

* tune code for chunker

Signed-off-by: yuye-aws <[email protected]>

* implement test for multiple field max chunk limit exceed

Signed-off-by: yuye-aws <[email protected]>

* tune methods name in text chunking proceesor unit tests

Signed-off-by: yuye-aws <[email protected]>

* add unit tests for both algorithms with max chunk limit

Signed-off-by: yuye-aws <[email protected]>

* optimize code

Signed-off-by: yuye-aws <[email protected]>

* extract max chunk limit check to util class

Signed-off-by: yuye-aws <[email protected]>

* resolve code review comments

Signed-off-by: yuye-aws <[email protected]>

* fix unit tests

Signed-off-by: yuye-aws <[email protected]>

* bug fix: only update runtime max chunk limit when enabled

Signed-off-by: yuye-aws <[email protected]>

---------

Signed-off-by: yuye-aws <[email protected]>
Signed-off-by: xinyual <[email protected]>
Signed-off-by: zane-neo <[email protected]>
Signed-off-by: Yuye Zhu <[email protected]>
Signed-off-by: Lu <[email protected]>
Co-authored-by: xinyual <[email protected]>
Co-authored-by: zane-neo <[email protected]>
Co-authored-by: Lu <[email protected]>
  • Loading branch information
4 people authored Mar 18, 2024
1 parent 28f1019 commit eea53aa
Show file tree
Hide file tree
Showing 25 changed files with 2,455 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x](https://github.com/opensearch-project/neural-search/compare/2.12...2.x)
### Features
- Implement document chunking processor with fixed token length and delimiter algorithm ([#607](https://github.com/opensearch-project/neural-search/pull/607/))
- Enabled support for applying default modelId in neural sparse query ([#614](https://github.com/opensearch-project/neural-search/pull/614)
### Enhancements
- Adding aggregations in hybrid query ([#630](https://github.com/opensearch-project/neural-search/pull/630))
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import org.opensearch.neuralsearch.processor.NormalizationProcessorWorkflow;
import org.opensearch.neuralsearch.processor.SparseEncodingProcessor;
import org.opensearch.neuralsearch.processor.TextEmbeddingProcessor;
import org.opensearch.neuralsearch.processor.TextChunkingProcessor;
import org.opensearch.neuralsearch.processor.TextImageEmbeddingProcessor;
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationFactory;
import org.opensearch.neuralsearch.processor.combination.ScoreCombiner;
import org.opensearch.neuralsearch.processor.factory.TextChunkingProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.NormalizationProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.RerankProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.SparseEncodingProcessorFactory;
Expand Down Expand Up @@ -114,14 +116,21 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
SparseEncodingProcessor.TYPE,
new SparseEncodingProcessorFactory(clientAccessor, parameters.env),
TextImageEmbeddingProcessor.TYPE,
new TextImageEmbeddingProcessorFactory(clientAccessor, parameters.env, parameters.ingestService.getClusterService())
new TextImageEmbeddingProcessorFactory(clientAccessor, parameters.env, parameters.ingestService.getClusterService()),
TextChunkingProcessor.TYPE,
new TextChunkingProcessorFactory(
parameters.env,
parameters.ingestService.getClusterService(),
parameters.indicesService,
parameters.analysisRegistry
)
);
}

@Override
public Optional<QueryPhaseSearcher> getQueryPhaseSearcher() {
// we're using "is_disabled" flag as there are no proper implementation of FeatureFlags.isDisabled(). Both
// cases when flag is not set or it is "false" are interpretted in the same way. In such case core is reading
// cases when flag is not set, or it is "false" are interpreted in the same way. In such case core is reading
// the actual value from settings.
if (FeatureFlags.isEnabled(NEURAL_SEARCH_HYBRID_SEARCH_DISABLED.getKey())) {
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.processor;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Locale;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.IndicesService;
import org.opensearch.index.IndexSettings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.neuralsearch.processor.chunker.Chunker;
import org.opensearch.index.mapper.IndexFieldMapper;
import org.opensearch.neuralsearch.processor.chunker.ChunkerFactory;
import org.opensearch.neuralsearch.processor.chunker.FixedTokenLengthChunker;

import static org.opensearch.neuralsearch.processor.chunker.Chunker.MAX_CHUNK_LIMIT_FIELD;
import static org.opensearch.neuralsearch.processor.chunker.Chunker.DEFAULT_MAX_CHUNK_LIMIT;
import static org.opensearch.neuralsearch.processor.chunker.Chunker.DISABLED_MAX_CHUNK_LIMIT;
import static org.opensearch.neuralsearch.processor.chunker.ChunkerParameterParser.parseIntegerParameter;

/**
* This processor is used for text chunking.
* The text chunking results could be fed to downstream embedding processor.
* The processor needs two fields: algorithm and field_map,
* where algorithm defines chunking algorithm and parameters,
* and field_map specifies which fields needs chunking and the corresponding keys for the chunking results.
*/
public final class TextChunkingProcessor extends AbstractProcessor {

public static final String TYPE = "text_chunking";
public static final String FIELD_MAP_FIELD = "field_map";
public static final String ALGORITHM_FIELD = "algorithm";
private static final String DEFAULT_ALGORITHM = FixedTokenLengthChunker.ALGORITHM_NAME;

private int maxChunkLimit;
private Chunker chunker;
private final Map<String, Object> fieldMap;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final AnalysisRegistry analysisRegistry;
private final Environment environment;

public TextChunkingProcessor(
final String tag,
final String description,
final Map<String, Object> fieldMap,
final Map<String, Object> algorithmMap,
final Environment environment,
final ClusterService clusterService,
final IndicesService indicesService,
final AnalysisRegistry analysisRegistry
) {
super(tag, description);
this.fieldMap = fieldMap;
this.environment = environment;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.analysisRegistry = analysisRegistry;
parseAlgorithmMap(algorithmMap);
}

public String getType() {
return TYPE;
}

@SuppressWarnings("unchecked")
private void parseAlgorithmMap(final Map<String, Object> algorithmMap) {
if (algorithmMap.size() > 1) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Unable to create %s processor as [%s] contains multiple algorithms", TYPE, ALGORITHM_FIELD)
);
}

String algorithmKey;
Object algorithmValue;
if (algorithmMap.isEmpty()) {
algorithmKey = DEFAULT_ALGORITHM;
algorithmValue = new HashMap<>();
} else {
Entry<String, Object> algorithmEntry = algorithmMap.entrySet().iterator().next();
algorithmKey = algorithmEntry.getKey();
algorithmValue = algorithmEntry.getValue();
if (!(algorithmValue instanceof Map)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Unable to create %s processor as parameters for [%s] algorithm must be an object",
TYPE,
algorithmKey
)
);
}
}

if (!ChunkerFactory.CHUNKER_ALGORITHMS.contains(algorithmKey)) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Chunking algorithm [%s] is not supported. Supported chunking algorithms are %s",
algorithmKey,
ChunkerFactory.CHUNKER_ALGORITHMS
)
);
}
Map<String, Object> chunkerParameters = (Map<String, Object>) algorithmValue;
// parse processor level max chunk limit
this.maxChunkLimit = parseIntegerParameter(chunkerParameters, MAX_CHUNK_LIMIT_FIELD, DEFAULT_MAX_CHUNK_LIMIT);
if (maxChunkLimit < 0 && maxChunkLimit != DISABLED_MAX_CHUNK_LIMIT) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Parameter [%s] must be positive or %s to disable this parameter",
MAX_CHUNK_LIMIT_FIELD,
DISABLED_MAX_CHUNK_LIMIT
)
);
}
// fixed token length algorithm needs analysis registry for tokenization
chunkerParameters.put(FixedTokenLengthChunker.ANALYSIS_REGISTRY_FIELD, analysisRegistry);
this.chunker = ChunkerFactory.create(algorithmKey, chunkerParameters);
}

@SuppressWarnings("unchecked")
private boolean isListOfString(final Object value) {
// an empty list is also List<String>
if (!(value instanceof List)) {
return false;
}
for (Object element : (List<Object>) value) {
if (!(element instanceof String)) {
return false;
}
}
return true;
}

private int getMaxTokenCount(final Map<String, Object> sourceAndMetadataMap) {
String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString();
IndexMetadata indexMetadata = clusterService.state().metadata().index(indexName);
if (Objects.isNull(indexMetadata)) {
return IndexSettings.MAX_TOKEN_COUNT_SETTING.get(environment.settings());
}
// if the index is specified in the metadata, read maxTokenCount from the index setting
IndexService indexService = indicesService.indexServiceSafe(indexMetadata.getIndex());
return indexService.getIndexSettings().getMaxTokenCount();
}

/**
* This method will be invoked by PipelineService to perform chunking and then write back chunking results to the document.
* @param ingestDocument {@link IngestDocument} which is the document passed to processor.
*/
@Override
public IngestDocument execute(final IngestDocument ingestDocument) {
Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
validateFieldsValue(sourceAndMetadataMap);
// fixed token length algorithm needs runtime parameter max_token_count for tokenization
Map<String, Object> runtimeParameters = new HashMap<>();
int maxTokenCount = getMaxTokenCount(sourceAndMetadataMap);
runtimeParameters.put(FixedTokenLengthChunker.MAX_TOKEN_COUNT_FIELD, maxTokenCount);
runtimeParameters.put(MAX_CHUNK_LIMIT_FIELD, maxChunkLimit);
chunkMapType(sourceAndMetadataMap, fieldMap, runtimeParameters, 0);
return ingestDocument;
}

private void validateFieldsValue(final Map<String, Object> sourceAndMetadataMap) {
for (Map.Entry<String, Object> embeddingFieldsEntry : fieldMap.entrySet()) {
Object sourceValue = sourceAndMetadataMap.get(embeddingFieldsEntry.getKey());
if (Objects.nonNull(sourceValue)) {
String sourceKey = embeddingFieldsEntry.getKey();
if (sourceValue instanceof List || sourceValue instanceof Map) {
validateNestedTypeValue(sourceKey, sourceValue, 1);
} else if (!(sourceValue instanceof String)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "field [%s] is neither string nor nested type, cannot process it", sourceKey)
);
}
}
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void validateNestedTypeValue(final String sourceKey, final Object sourceValue, final int maxDepth) {
if (maxDepth > MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.get(environment.settings())) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "map type field [%s] reached max depth limit, cannot process it", sourceKey)
);
} else if (sourceValue instanceof List) {
validateListTypeValue(sourceKey, sourceValue, maxDepth);
} else if (sourceValue instanceof Map) {
((Map) sourceValue).values()
.stream()
.filter(Objects::nonNull)
.forEach(x -> validateNestedTypeValue(sourceKey, x, maxDepth + 1));
} else if (!(sourceValue instanceof String)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "map type field [%s] has non-string type, cannot process it", sourceKey)
);
}
}

@SuppressWarnings({ "rawtypes" })
private void validateListTypeValue(final String sourceKey, final Object sourceValue, final int maxDepth) {
for (Object value : (List) sourceValue) {
if (value instanceof Map) {
validateNestedTypeValue(sourceKey, value, maxDepth + 1);
} else if (value == null) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "list type field [%s] has null, cannot process it", sourceKey)
);
} else if (!(value instanceof String)) {
throw new IllegalArgumentException(
String.format(Locale.ROOT, "list type field [%s] has non-string value, cannot process it", sourceKey)
);
}
}
}

@SuppressWarnings("unchecked")
private int chunkMapType(
Map<String, Object> sourceAndMetadataMap,
final Map<String, Object> fieldMap,
final Map<String, Object> runtimeParameters,
final int chunkCount
) {
int updatedChunkCount = chunkCount;
for (Map.Entry<String, Object> fieldMapEntry : fieldMap.entrySet()) {
String originalKey = fieldMapEntry.getKey();
Object targetKey = fieldMapEntry.getValue();
if (targetKey instanceof Map) {
// call this method recursively when target key is a map
Object sourceObject = sourceAndMetadataMap.get(originalKey);
if (sourceObject instanceof List) {
List<Object> sourceObjectList = (List<Object>) sourceObject;
for (Object source : sourceObjectList) {
if (source instanceof Map) {
updatedChunkCount = chunkMapType(
(Map<String, Object>) source,
(Map<String, Object>) targetKey,
runtimeParameters,
updatedChunkCount
);
}
}
} else if (sourceObject instanceof Map) {
updatedChunkCount = chunkMapType(
(Map<String, Object>) sourceObject,
(Map<String, Object>) targetKey,
runtimeParameters,
updatedChunkCount
);
}
} else {
// chunk the object when target key is of leaf type (null, string and list of string)
Object chunkObject = sourceAndMetadataMap.get(originalKey);
List<String> chunkedResult = chunkLeafType(chunkObject, runtimeParameters);
sourceAndMetadataMap.put(String.valueOf(targetKey), chunkedResult);
}
}
return updatedChunkCount;
}

/**
* Chunk the content, update the runtime max_chunk_limit and return the result
*/
private List<String> chunkString(final String content, final Map<String, Object> runTimeParameters) {
// update runtime max_chunk_limit if not disabled
List<String> contentResult = chunker.chunk(content, runTimeParameters);
int runtimeMaxChunkLimit = parseIntegerParameter(runTimeParameters, MAX_CHUNK_LIMIT_FIELD, maxChunkLimit);
if (runtimeMaxChunkLimit != DISABLED_MAX_CHUNK_LIMIT) {
runTimeParameters.put(MAX_CHUNK_LIMIT_FIELD, runtimeMaxChunkLimit - contentResult.size());
}
return contentResult;
}

private List<String> chunkList(final List<String> contentList, final Map<String, Object> runTimeParameters) {
// flatten original output format from List<List<String>> to List<String>
List<String> result = new ArrayList<>();
for (String content : contentList) {
result.addAll(chunkString(content, runTimeParameters));
}
return result;
}

@SuppressWarnings("unchecked")
private List<String> chunkLeafType(final Object value, final Map<String, Object> runTimeParameters) {
// leaf type means null, String or List<String>
// the result should be an empty list when the input is null
List<String> result = new ArrayList<>();
if (value instanceof String) {
result = chunkString(value.toString(), runTimeParameters);
} else if (isListOfString(value)) {
result = chunkList((List<String>) value, runTimeParameters);
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.processor.chunker;

import java.util.Map;
import java.util.List;

/**
* The interface for all chunking algorithms.
* All algorithms need to parse parameters and chunk the content.
*/
public interface Chunker {

String MAX_CHUNK_LIMIT_FIELD = "max_chunk_limit";
int DEFAULT_MAX_CHUNK_LIMIT = 100;
int DISABLED_MAX_CHUNK_LIMIT = -1;

/**
* Parse the parameters for chunking algorithm.
* Throw IllegalArgumentException when parameters are invalid.
*
* @param parameters a map containing non-runtime parameters for chunking algorithms
*/
void parseParameters(Map<String, Object> parameters);

/**
* Chunk the input string according to parameters and return chunked passages
*
* @param content input string
* @param runtimeParameters a map containing runtime parameters for chunking algorithms
* @return chunked passages
*/
List<String> chunk(String content, Map<String, Object> runtimeParameters);
}
Loading

0 comments on commit eea53aa

Please sign in to comment.