From ebcfacb7edd40afdd87060c6f2a4855824165ede Mon Sep 17 00:00:00 2001 From: yuye-aws Date: Wed, 1 May 2024 09:27:07 +0800 Subject: [PATCH] Fix: text chunking processor ingestion bug on multi-node cluster (#713) * fix multi node text chunking processor index bug Signed-off-by: yuye-aws * add change log Signed-off-by: yuye-aws * bug fix: no max token count setting in index Signed-off-by: yuye-aws * make program faster without creating index settings object Signed-off-by: yuye-aws * add comment Signed-off-by: yuye-aws * fix comment Signed-off-by: yuye-aws * resolve code review Signed-off-by: yuye-aws * simplify the code given toInt in NumberUtils Signed-off-by: yuye-aws * resolve code review comments Signed-off-by: yuye-aws --------- Signed-off-by: yuye-aws (cherry picked from commit 2d42408c70e01b95825744bea0182ff361090a4e) --- CHANGELOG.md | 1 + .../bwc/TextChunkingProcessorIT.java | 2 +- .../neuralsearch/plugin/NeuralSearch.java | 7 +----- .../processor/TextChunkingProcessor.java | 11 +++------- .../factory/TextChunkingProcessorFactory.java | 22 ++----------------- .../processor/TextChunkingProcessorTests.java | 4 +--- .../TextChunkingProcessorFactoryTests.java | 9 +------- 7 files changed, 10 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c1a88ca4e..36c6be493 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Features ### Enhancements ### Bug Fixes +- Fix multi node "no such index" error in text chunking processor ([#713](https://github.com/opensearch-project/neural-search/pull/713)) ### Infrastructure ### Documentation ### Maintenance diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java index bab2d78d5..ca314300c 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/TextChunkingProcessorIT.java @@ -28,7 +28,7 @@ public class TextChunkingProcessorIT extends AbstractRestartUpgradeRestTestCase "standard tokenizer in OpenSearch." ); - // Test rolling-upgrade text chunking processor + // Test restart-upgrade text chunking processor // Create Text Chunking Processor, Ingestion Pipeline and add document // Validate process, pipeline and document count in restart-upgrade scenario public void testTextChunkingProcessor_E2EFlow() throws Exception { diff --git a/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java b/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java index d54c644c4..5e2b1fd61 100644 --- a/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java +++ b/src/main/java/org/opensearch/neuralsearch/plugin/NeuralSearch.java @@ -118,12 +118,7 @@ public Map getProcessors(Processor.Parameters paramet TextImageEmbeddingProcessor.TYPE, new TextImageEmbeddingProcessorFactory(clientAccessor, parameters.env, parameters.ingestService.getClusterService()), TextChunkingProcessor.TYPE, - new TextChunkingProcessorFactory( - parameters.env, - parameters.ingestService.getClusterService(), - parameters.indicesService, - parameters.analysisRegistry - ) + new TextChunkingProcessorFactory(parameters.env, parameters.ingestService.getClusterService(), parameters.analysisRegistry) ); } diff --git a/src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java b/src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java index 555310627..c59478de6 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java @@ -15,11 +15,9 @@ import org.apache.commons.lang3.StringUtils; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.env.Environment; -import org.opensearch.index.IndexService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.mapper.MapperService; -import org.opensearch.indices.IndicesService; import org.opensearch.index.IndexSettings; import org.opensearch.ingest.AbstractProcessor; import org.opensearch.ingest.IngestDocument; @@ -52,7 +50,6 @@ public final class TextChunkingProcessor extends AbstractProcessor { private Chunker chunker; private final Map fieldMap; private final ClusterService clusterService; - private final IndicesService indicesService; private final AnalysisRegistry analysisRegistry; private final Environment environment; @@ -63,14 +60,12 @@ public TextChunkingProcessor( final Map algorithmMap, final Environment environment, final ClusterService clusterService, - final IndicesService indicesService, final AnalysisRegistry analysisRegistry ) { super(tag, description); this.fieldMap = fieldMap; this.environment = environment; this.clusterService = clusterService; - this.indicesService = indicesService; this.analysisRegistry = analysisRegistry; parseAlgorithmMap(algorithmMap); } @@ -151,14 +146,14 @@ private boolean isListOfString(final Object value) { } private int getMaxTokenCount(final Map sourceAndMetadataMap) { + int defaultMaxTokenCount = IndexSettings.MAX_TOKEN_COUNT_SETTING.get(environment.settings()); String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString(); IndexMetadata indexMetadata = clusterService.state().metadata().index(indexName); if (Objects.isNull(indexMetadata)) { - return IndexSettings.MAX_TOKEN_COUNT_SETTING.get(environment.settings()); + return defaultMaxTokenCount; } // if the index is specified in the metadata, read maxTokenCount from the index setting - IndexService indexService = indicesService.indexServiceSafe(indexMetadata.getIndex()); - return indexService.getIndexSettings().getMaxTokenCount(); + return IndexSettings.MAX_TOKEN_COUNT_SETTING.get(indexMetadata.getSettings()); } /** diff --git a/src/main/java/org/opensearch/neuralsearch/processor/factory/TextChunkingProcessorFactory.java b/src/main/java/org/opensearch/neuralsearch/processor/factory/TextChunkingProcessorFactory.java index efffcc908..91b9ac5c1 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/factory/TextChunkingProcessorFactory.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/factory/TextChunkingProcessorFactory.java @@ -9,7 +9,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.env.Environment; import org.opensearch.index.analysis.AnalysisRegistry; -import org.opensearch.indices.IndicesService; import org.opensearch.ingest.Processor; import org.opensearch.neuralsearch.processor.TextChunkingProcessor; import static org.opensearch.neuralsearch.processor.TextChunkingProcessor.TYPE; @@ -29,19 +28,11 @@ public class TextChunkingProcessorFactory implements Processor.Factory { private final ClusterService clusterService; - private final IndicesService indicesService; - private final AnalysisRegistry analysisRegistry; - public TextChunkingProcessorFactory( - Environment environment, - ClusterService clusterService, - IndicesService indicesService, - AnalysisRegistry analysisRegistry - ) { + public TextChunkingProcessorFactory(Environment environment, ClusterService clusterService, AnalysisRegistry analysisRegistry) { this.environment = environment; this.clusterService = clusterService; - this.indicesService = indicesService; this.analysisRegistry = analysisRegistry; } @@ -54,15 +45,6 @@ public TextChunkingProcessor create( ) throws Exception { Map fieldMap = readMap(TYPE, processorTag, config, FIELD_MAP_FIELD); Map algorithmMap = readMap(TYPE, processorTag, config, ALGORITHM_FIELD); - return new TextChunkingProcessor( - processorTag, - description, - fieldMap, - algorithmMap, - environment, - clusterService, - indicesService, - analysisRegistry - ); + return new TextChunkingProcessor(processorTag, description, fieldMap, algorithmMap, environment, clusterService, analysisRegistry); } } diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorTests.java b/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorTests.java index 7109dcb41..efc9745ed 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorTests.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextChunkingProcessorTests.java @@ -32,7 +32,6 @@ import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.analysis.TokenizerFactory; import org.opensearch.index.mapper.IndexFieldMapper; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.analysis.AnalysisModule; import org.opensearch.ingest.IngestDocument; import org.opensearch.ingest.Processor; @@ -85,11 +84,10 @@ public void setup() { when(environment.settings()).thenReturn(settings); ClusterState clusterState = mock(ClusterState.class); ClusterService clusterService = mock(ClusterService.class); - IndicesService indicesService = mock(IndicesService.class); when(metadata.index(anyString())).thenReturn(null); when(clusterState.metadata()).thenReturn(metadata); when(clusterService.state()).thenReturn(clusterState); - textChunkingProcessorFactory = new TextChunkingProcessorFactory(environment, clusterService, indicesService, getAnalysisRegistry()); + textChunkingProcessorFactory = new TextChunkingProcessorFactory(environment, clusterService, getAnalysisRegistry()); } private Map createFixedTokenLengthParameters() { diff --git a/src/test/java/org/opensearch/neuralsearch/processor/factory/TextChunkingProcessorFactoryTests.java b/src/test/java/org/opensearch/neuralsearch/processor/factory/TextChunkingProcessorFactoryTests.java index 3d9993e7b..15786e891 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/factory/TextChunkingProcessorFactoryTests.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/factory/TextChunkingProcessorFactoryTests.java @@ -19,7 +19,6 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.analysis.TokenizerFactory; -import org.opensearch.indices.IndicesService; import org.opensearch.indices.analysis.AnalysisModule; import org.opensearch.ingest.Processor; import org.opensearch.neuralsearch.processor.TextChunkingProcessor; @@ -62,13 +61,7 @@ public Map> getTokeniz public void setup() { Environment environment = mock(Environment.class); ClusterService clusterService = mock(ClusterService.class); - IndicesService indicesService = mock(IndicesService.class); - this.textChunkingProcessorFactory = new TextChunkingProcessorFactory( - environment, - clusterService, - indicesService, - getAnalysisRegistry() - ); + this.textChunkingProcessorFactory = new TextChunkingProcessorFactory(environment, clusterService, getAnalysisRegistry()); } @SneakyThrows