Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <[email protected]>
  • Loading branch information
zane-neo committed Apr 15, 2024
1 parent f18868f commit 73596ed
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.commons.lang3.StringUtils;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.env.Environment;
import org.opensearch.index.mapper.IndexFieldMapper;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor;
Expand Down Expand Up @@ -208,12 +209,15 @@ private void buildMapWithProcessorKeyAndOriginalValueForMapType(

private void validateEmbeddingFieldsValue(IngestDocument ingestDocument) {
Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString();
ProcessorDocumentUtils.validateMapTypeValue(
"field_map",
sourceAndMetadataMap,
fieldMap,
1,
ProcessorDocumentUtils.getMaxDepth(sourceAndMetadataMap, clusterService, environment),
indexName,
clusterService,
environment,
false
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,15 @@ private int getMaxTokenCount(final Map<String, Object> sourceAndMetadataMap) {
@Override
public IngestDocument execute(final IngestDocument ingestDocument) {
Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString();
ProcessorDocumentUtils.validateMapTypeValue(
"field_map",
sourceAndMetadataMap,
fieldMap,
1,
ProcessorDocumentUtils.getMaxDepth(sourceAndMetadataMap, clusterService, environment),
indexName,
clusterService,
environment,
true
);
// fixed token length algorithm needs runtime parameter max_token_count for tokenization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.action.ActionListener;
import org.opensearch.env.Environment;
import org.opensearch.index.mapper.IndexFieldMapper;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor;
Expand Down Expand Up @@ -171,12 +172,15 @@ Map<String, Object> buildTextEmbeddingResult(final String knnKey, List<Float> mo

private void validateEmbeddingFieldsValue(final IngestDocument ingestDocument) {
Map<String, Object> sourceAndMetadataMap = ingestDocument.getSourceAndMetadata();
String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString();
ProcessorDocumentUtils.validateMapTypeValue(
"field_map",
sourceAndMetadataMap,
fieldMap,
1,
ProcessorDocumentUtils.getMaxDepth(sourceAndMetadataMap, clusterService, environment),
indexName,
clusterService,
environment,
false
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,19 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.env.Environment;
import org.opensearch.index.mapper.IndexFieldMapper;
import org.opensearch.index.mapper.MapperService;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

/**
* This class is used to accommodate the common code pieces of parsing, validating and processing the document for multiple
* pipeline processors.
*/
public class ProcessorDocumentUtils {

/**
* This method is used to get the max depth of the index or from system settings.
*
* @param sourceAndMetadataMap _source and metadata info in document.
* @param clusterService cluster service passed from OpenSearch core.
* @param environment environment passed from OpenSearch core.
* @return max depth of the index or from system settings.
*/
public static long getMaxDepth(Map<String, Object> sourceAndMetadataMap, ClusterService clusterService, Environment environment) {
String indexName = sourceAndMetadataMap.get(IndexFieldMapper.NAME).toString();
IndexMetadata indexMetadata = clusterService.state().metadata().index(indexName);
if (indexMetadata != null) {
Settings settings = indexMetadata.getSettings();
return MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.get(settings);
}
return MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.get(environment.settings());
}

/**
* Validates a map type value recursively up to a specified depth. Supports Map type, List type and String type.
* If current sourceValue is Map or List type, recursively validates its values, otherwise validates its value.
Expand All @@ -49,7 +31,9 @@ public static long getMaxDepth(Map<String, Object> sourceAndMetadataMap, Cluster
* @param sourceValue the source map being validated, the first level is always the sourceAndMetadataMap.
* @param fieldMap the configuration map for validation, the first level is always the value of "field_map" in the processor configuration.
* @param depth the current depth of recursion
* @param maxDepth the maximum allowed depth for recursion
* @param clusterService cluster service passed from OpenSearch core.
* @param environment environment passed from OpenSearch core.
* @param indexName the maximum allowed depth for recursion
* @param allowEmpty flag to allow empty values in map type validation.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
Expand All @@ -58,11 +42,13 @@ public static void validateMapTypeValue(
final Map<String, Object> sourceValue,
final Object fieldMap,
final long depth,
final long maxDepth,
final String indexName,
final ClusterService clusterService,
final Environment environment,
final boolean allowEmpty
) {
if (sourceValue == null) return; // allow map type value to be null.
validateDepth(sourceKey, depth, maxDepth);
validateDepth(sourceKey, depth, indexName, clusterService, environment);
if (!(fieldMap instanceof Map)) { // source value is map type means configuration has to be map type
throw new IllegalArgumentException(
String.format(
Expand All @@ -79,9 +65,27 @@ public static void validateMapTypeValue(
Object nextSourceValue = sourceValue.get(key);
if (nextSourceValue != null) {
if (nextSourceValue instanceof List) {
validateListTypeValue(key, (List) nextSourceValue, fieldMap, depth + 1, maxDepth, allowEmpty);
validateListTypeValue(
key,
(List) nextSourceValue,
fieldMap,
depth + 1,
indexName,
clusterService,
environment,
allowEmpty
);
} else if (nextSourceValue instanceof Map) {
validateMapTypeValue(key, (Map<String, Object>) nextSourceValue, nextFieldMap, depth + 1, maxDepth, allowEmpty);
validateMapTypeValue(
key,
(Map<String, Object>) nextSourceValue,
nextFieldMap,
depth + 1,
indexName,
clusterService,
environment,
allowEmpty
);
} else if (!(nextSourceValue instanceof String)) {
throw new IllegalArgumentException("map type field [" + key + "] is neither string nor nested type, cannot process it");
} else if (!allowEmpty && StringUtils.isBlank((String) nextSourceValue)) {
Expand All @@ -93,14 +97,16 @@ public static void validateMapTypeValue(

@SuppressWarnings({ "rawtypes", "unchecked" })
private static void validateListTypeValue(
String sourceKey,
List sourceValue,
Object fieldMap,
long depth,
long maxDepth,
boolean allowEmpty
final String sourceKey,
final List sourceValue,
final Object fieldMap,
final long depth,
final String indexName,
final ClusterService clusterService,
final Environment environment,
final boolean allowEmpty
) {
validateDepth(sourceKey, depth, maxDepth);
validateDepth(sourceKey, depth, indexName, clusterService, environment);
if (CollectionUtils.isEmpty(sourceValue)) return;
for (Object element : sourceValue) {
if (element == null) {
Expand All @@ -114,7 +120,9 @@ private static void validateListTypeValue(
(Map<String, Object>) element,
((Map) fieldMap).get(sourceKey),
depth + 1,
maxDepth,
indexName,
clusterService,
environment,
allowEmpty
);
} else if (!(element instanceof String)) {
Expand All @@ -125,7 +133,17 @@ private static void validateListTypeValue(
}
}

private static void validateDepth(String sourceKey, long depth, long maxDepth) {
private static void validateDepth(
String sourceKey,
long depth,
String indexName,
ClusterService clusterService,
Environment environment
) {
Settings settings = Optional.ofNullable(clusterService.state().metadata().index(indexName))
.map(IndexMetadata::getSettings)
.orElse(environment.settings());
long maxDepth = MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING.get(settings);
if (depth > maxDepth) {
throw new IllegalArgumentException("map type field [" + sourceKey + "] reaches max depth limit, cannot process it");
}
Expand Down

0 comments on commit 73596ed

Please sign in to comment.