Skip to content

Commit

Permalink
Fix: text chunking processor ingestion bug on multi-node cluster (#713)
Browse files Browse the repository at this point in the history
* fix multi node text chunking processor index bug

Signed-off-by: yuye-aws <[email protected]>

* add change log

Signed-off-by: yuye-aws <[email protected]>

* bug fix: no max token count setting in index

Signed-off-by: yuye-aws <[email protected]>

* make program faster without creating index settings object

Signed-off-by: yuye-aws <[email protected]>

* add comment

Signed-off-by: yuye-aws <[email protected]>

* fix comment

Signed-off-by: yuye-aws <[email protected]>

* resolve code review

Signed-off-by: yuye-aws <[email protected]>

* simplify the code given toInt in NumberUtils

Signed-off-by: yuye-aws <[email protected]>

* resolve code review comments

Signed-off-by: yuye-aws <[email protected]>

---------

Signed-off-by: yuye-aws <[email protected]>
(cherry picked from commit 2d42408)
  • Loading branch information
yuye-aws authored and github-actions[bot] committed May 1, 2024
1 parent dad9d59 commit ebcfacb
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Features
### Enhancements
### Bug Fixes
- Fix multi node "no such index" error in text chunking processor ([#713](https://github.com/opensearch-project/neural-search/pull/713))
### Infrastructure
### Documentation
### Maintenance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class TextChunkingProcessorIT extends AbstractRestartUpgradeRestTestCase
"standard tokenizer in OpenSearch."
);

// Test rolling-upgrade text chunking processor
// Test restart-upgrade text chunking processor
// Create Text Chunking Processor, Ingestion Pipeline and add document
// Validate process, pipeline and document count in restart-upgrade scenario
public void testTextChunkingProcessor_E2EFlow() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
TextImageEmbeddingProcessor.TYPE,
new TextImageEmbeddingProcessorFactory(clientAccessor, parameters.env, parameters.ingestService.getClusterService()),
TextChunkingProcessor.TYPE,
new TextChunkingProcessorFactory(
parameters.env,
parameters.ingestService.getClusterService(),
parameters.indicesService,
parameters.analysisRegistry
)
new TextChunkingProcessorFactory(parameters.env, parameters.ingestService.getClusterService(), parameters.analysisRegistry)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
import org.apache.commons.lang3.StringUtils;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.IndicesService;
import org.opensearch.index.IndexSettings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
Expand Down Expand Up @@ -52,7 +50,6 @@ public final class TextChunkingProcessor extends AbstractProcessor {
private Chunker chunker;
private final Map<String, Object> fieldMap;
private final ClusterService clusterService;
private final IndicesService indicesService;
private final AnalysisRegistry analysisRegistry;
private final Environment environment;

Expand All @@ -63,14 +60,12 @@ public TextChunkingProcessor(
final Map<String, Object> algorithmMap,
final Environment environment,
final ClusterService clusterService,
final IndicesService indicesService,
final AnalysisRegistry analysisRegistry
) {
super(tag, description);
this.fieldMap = fieldMap;
this.environment = environment;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.analysisRegistry = analysisRegistry;
parseAlgorithmMap(algorithmMap);
}
Expand Down Expand Up @@ -151,14 +146,14 @@ private boolean isListOfString(final Object value) {
}

private int getMaxTokenCount(final Map<String, Object> sourceAndMetadataMap) {
int defaultMaxTokenCount = IndexSettings.MAX_TOKEN_COUNT_SETTING.get(environment.settings());
String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString();
IndexMetadata indexMetadata = clusterService.state().metadata().index(indexName);
if (Objects.isNull(indexMetadata)) {
return IndexSettings.MAX_TOKEN_COUNT_SETTING.get(environment.settings());
return defaultMaxTokenCount;
}
// if the index is specified in the metadata, read maxTokenCount from the index setting
IndexService indexService = indicesService.indexServiceSafe(indexMetadata.getIndex());
return indexService.getIndexSettings().getMaxTokenCount();
return IndexSettings.MAX_TOKEN_COUNT_SETTING.get(indexMetadata.getSettings());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.env.Environment;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.Processor;
import org.opensearch.neuralsearch.processor.TextChunkingProcessor;
import static org.opensearch.neuralsearch.processor.TextChunkingProcessor.TYPE;
Expand All @@ -29,19 +28,11 @@ public class TextChunkingProcessorFactory implements Processor.Factory {

private final ClusterService clusterService;

private final IndicesService indicesService;

private final AnalysisRegistry analysisRegistry;

public TextChunkingProcessorFactory(
Environment environment,
ClusterService clusterService,
IndicesService indicesService,
AnalysisRegistry analysisRegistry
) {
public TextChunkingProcessorFactory(Environment environment, ClusterService clusterService, AnalysisRegistry analysisRegistry) {
this.environment = environment;
this.clusterService = clusterService;
this.indicesService = indicesService;
this.analysisRegistry = analysisRegistry;
}

Expand All @@ -54,15 +45,6 @@ public TextChunkingProcessor create(
) throws Exception {
Map<String, Object> fieldMap = readMap(TYPE, processorTag, config, FIELD_MAP_FIELD);
Map<String, Object> algorithmMap = readMap(TYPE, processorTag, config, ALGORITHM_FIELD);
return new TextChunkingProcessor(
processorTag,
description,
fieldMap,
algorithmMap,
environment,
clusterService,
indicesService,
analysisRegistry
);
return new TextChunkingProcessor(processorTag, description, fieldMap, algorithmMap, environment, clusterService, analysisRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.analysis.TokenizerFactory;
import org.opensearch.index.mapper.IndexFieldMapper;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
Expand Down Expand Up @@ -85,11 +84,10 @@ public void setup() {
when(environment.settings()).thenReturn(settings);
ClusterState clusterState = mock(ClusterState.class);
ClusterService clusterService = mock(ClusterService.class);
IndicesService indicesService = mock(IndicesService.class);
when(metadata.index(anyString())).thenReturn(null);
when(clusterState.metadata()).thenReturn(metadata);
when(clusterService.state()).thenReturn(clusterState);
textChunkingProcessorFactory = new TextChunkingProcessorFactory(environment, clusterService, indicesService, getAnalysisRegistry());
textChunkingProcessorFactory = new TextChunkingProcessorFactory(environment, clusterService, getAnalysisRegistry());
}

private Map<String, Object> createFixedTokenLengthParameters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.env.TestEnvironment;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.analysis.TokenizerFactory;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.ingest.Processor;
import org.opensearch.neuralsearch.processor.TextChunkingProcessor;
Expand Down Expand Up @@ -62,13 +61,7 @@ public Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>> getTokeniz
public void setup() {
Environment environment = mock(Environment.class);
ClusterService clusterService = mock(ClusterService.class);
IndicesService indicesService = mock(IndicesService.class);
this.textChunkingProcessorFactory = new TextChunkingProcessorFactory(
environment,
clusterService,
indicesService,
getAnalysisRegistry()
);
this.textChunkingProcessorFactory = new TextChunkingProcessorFactory(environment, clusterService, getAnalysisRegistry());
}

@SneakyThrows
Expand Down

0 comments on commit ebcfacb

Please sign in to comment.