From 73596ed63a8be7c53eaab5ceaa1f1b6831f5f3b1 Mon Sep 17 00:00:00 2001 From: zane-neo Date: Mon, 15 Apr 2024 13:59:17 +0800 Subject: [PATCH] Address comments Signed-off-by: zane-neo --- .../processor/InferenceProcessor.java | 6 +- .../processor/TextChunkingProcessor.java | 5 +- .../TextImageEmbeddingProcessor.java | 6 +- .../util/ProcessorDocumentUtils.java | 84 +++++++++++-------- 4 files changed, 65 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java b/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java index 98da1c6c2..6869fd5bc 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java @@ -16,6 +16,7 @@ import org.apache.commons.lang3.StringUtils; import org.opensearch.cluster.service.ClusterService; import org.opensearch.env.Environment; +import org.opensearch.index.mapper.IndexFieldMapper; import org.opensearch.ingest.AbstractProcessor; import org.opensearch.ingest.IngestDocument; import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor; @@ -208,12 +209,15 @@ private void buildMapWithProcessorKeyAndOriginalValueForMapType( private void validateEmbeddingFieldsValue(IngestDocument ingestDocument) { Map sourceAndMetadataMap = ingestDocument.getSourceAndMetadata(); + String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString(); ProcessorDocumentUtils.validateMapTypeValue( "field_map", sourceAndMetadataMap, fieldMap, 1, - ProcessorDocumentUtils.getMaxDepth(sourceAndMetadataMap, clusterService, environment), + indexName, + clusterService, + environment, false ); } diff --git a/src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java b/src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java index 989092d26..efed35a3c 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/TextChunkingProcessor.java @@ -167,12 +167,15 @@ private int getMaxTokenCount(final Map sourceAndMetadataMap) { @Override public IngestDocument execute(final IngestDocument ingestDocument) { Map sourceAndMetadataMap = ingestDocument.getSourceAndMetadata(); + String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString(); ProcessorDocumentUtils.validateMapTypeValue( "field_map", sourceAndMetadataMap, fieldMap, 1, - ProcessorDocumentUtils.getMaxDepth(sourceAndMetadataMap, clusterService, environment), + indexName, + clusterService, + environment, true ); // fixed token length algorithm needs runtime parameter max_token_count for tokenization diff --git a/src/main/java/org/opensearch/neuralsearch/processor/TextImageEmbeddingProcessor.java b/src/main/java/org/opensearch/neuralsearch/processor/TextImageEmbeddingProcessor.java index 723aadd90..e24bd5855 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/TextImageEmbeddingProcessor.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/TextImageEmbeddingProcessor.java @@ -17,6 +17,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.action.ActionListener; import org.opensearch.env.Environment; +import org.opensearch.index.mapper.IndexFieldMapper; import org.opensearch.ingest.AbstractProcessor; import org.opensearch.ingest.IngestDocument; import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor; @@ -171,12 +172,15 @@ Map buildTextEmbeddingResult(final String knnKey, List mo private void validateEmbeddingFieldsValue(final IngestDocument ingestDocument) { Map sourceAndMetadataMap = ingestDocument.getSourceAndMetadata(); + String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString(); ProcessorDocumentUtils.validateMapTypeValue( "field_map", sourceAndMetadataMap, fieldMap, 1, - ProcessorDocumentUtils.getMaxDepth(sourceAndMetadataMap, clusterService, environment), + indexName, + clusterService, + environment, false ); } diff --git a/src/main/java/org/opensearch/neuralsearch/util/ProcessorDocumentUtils.java b/src/main/java/org/opensearch/neuralsearch/util/ProcessorDocumentUtils.java index 27ffcbade..41c219e52 100644 --- a/src/main/java/org/opensearch/neuralsearch/util/ProcessorDocumentUtils.java +++ b/src/main/java/org/opensearch/neuralsearch/util/ProcessorDocumentUtils.java @@ -10,12 +10,12 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.env.Environment; -import org.opensearch.index.mapper.IndexFieldMapper; import org.opensearch.index.mapper.MapperService; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; /** * This class is used to accommodate the common code pieces of parsing, validating and processing the document for multiple @@ -23,24 +23,6 @@ */ public class ProcessorDocumentUtils { - /** - * This method is used to get the max depth of the index or from system settings. - * - * @param sourceAndMetadataMap _source and metadata info in document. - * @param clusterService cluster service passed from OpenSearch core. - * @param environment environment passed from OpenSearch core. - * @return max depth of the index or from system settings. - */ - public static long getMaxDepth(Map sourceAndMetadataMap, ClusterService clusterService, Environment environment) { - String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString(); - IndexMetadata indexMetadata = clusterService.state().metadata().index(indexName); - if (indexMetadata != null) { - Settings settings = indexMetadata.getSettings(); - return MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.get(settings); - } - return MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.get(environment.settings()); - } - /** * Validates a map type value recursively up to a specified depth. Supports Map type, List type and String type. * If current sourceValue is Map or List type, recursively validates its values, otherwise validates its value. @@ -49,7 +31,9 @@ public static long getMaxDepth(Map sourceAndMetadataMap, Cluster * @param sourceValue the source map being validated, the first level is always the sourceAndMetadataMap. * @param fieldMap the configuration map for validation, the first level is always the value of "field_map" in the processor configuration. * @param depth the current depth of recursion - * @param maxDepth the maximum allowed depth for recursion + * @param clusterService cluster service passed from OpenSearch core. + * @param environment environment passed from OpenSearch core. + * @param indexName the maximum allowed depth for recursion * @param allowEmpty flag to allow empty values in map type validation. */ @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -58,11 +42,13 @@ public static void validateMapTypeValue( final Map sourceValue, final Object fieldMap, final long depth, - final long maxDepth, + final String indexName, + final ClusterService clusterService, + final Environment environment, final boolean allowEmpty ) { if (sourceValue == null) return; // allow map type value to be null. - validateDepth(sourceKey, depth, maxDepth); + validateDepth(sourceKey, depth, indexName, clusterService, environment); if (!(fieldMap instanceof Map)) { // source value is map type means configuration has to be map type throw new IllegalArgumentException( String.format( @@ -79,9 +65,27 @@ public static void validateMapTypeValue( Object nextSourceValue = sourceValue.get(key); if (nextSourceValue != null) { if (nextSourceValue instanceof List) { - validateListTypeValue(key, (List) nextSourceValue, fieldMap, depth + 1, maxDepth, allowEmpty); + validateListTypeValue( + key, + (List) nextSourceValue, + fieldMap, + depth + 1, + indexName, + clusterService, + environment, + allowEmpty + ); } else if (nextSourceValue instanceof Map) { - validateMapTypeValue(key, (Map) nextSourceValue, nextFieldMap, depth + 1, maxDepth, allowEmpty); + validateMapTypeValue( + key, + (Map) nextSourceValue, + nextFieldMap, + depth + 1, + indexName, + clusterService, + environment, + allowEmpty + ); } else if (!(nextSourceValue instanceof String)) { throw new IllegalArgumentException("map type field [" + key + "] is neither string nor nested type, cannot process it"); } else if (!allowEmpty && StringUtils.isBlank((String) nextSourceValue)) { @@ -93,14 +97,16 @@ public static void validateMapTypeValue( @SuppressWarnings({ "rawtypes", "unchecked" }) private static void validateListTypeValue( - String sourceKey, - List sourceValue, - Object fieldMap, - long depth, - long maxDepth, - boolean allowEmpty + final String sourceKey, + final List sourceValue, + final Object fieldMap, + final long depth, + final String indexName, + final ClusterService clusterService, + final Environment environment, + final boolean allowEmpty ) { - validateDepth(sourceKey, depth, maxDepth); + validateDepth(sourceKey, depth, indexName, clusterService, environment); if (CollectionUtils.isEmpty(sourceValue)) return; for (Object element : sourceValue) { if (element == null) { @@ -114,7 +120,9 @@ private static void validateListTypeValue( (Map) element, ((Map) fieldMap).get(sourceKey), depth + 1, - maxDepth, + indexName, + clusterService, + environment, allowEmpty ); } else if (!(element instanceof String)) { @@ -125,7 +133,17 @@ private static void validateListTypeValue( } } - private static void validateDepth(String sourceKey, long depth, long maxDepth) { + private static void validateDepth( + String sourceKey, + long depth, + String indexName, + ClusterService clusterService, + Environment environment + ) { + Settings settings = Optional.ofNullable(clusterService.state().metadata().index(indexName)) + .map(IndexMetadata::getSettings) + .orElse(environment.settings()); + long maxDepth = MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.get(settings); if (depth > maxDepth) { throw new IllegalArgumentException("map type field [" + sourceKey + "] reaches max depth limit, cannot process it"); }