diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/NamedPolicy.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/NamedPolicy.java index 759efdf0ccc14..201fe33ef680c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/NamedPolicy.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/NamedPolicy.java @@ -24,6 +24,7 @@ public final class NamedPolicy { static final ParseField INDICES_FIELD = new ParseField("indices"); static final ParseField MATCH_FIELD_FIELD = new ParseField("match_field"); static final ParseField ENRICH_FIELDS_FIELD = new ParseField("enrich_fields"); + static final ParseField FORMAT_FIELD = new ParseField("format"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( @@ -35,7 +36,8 @@ public final class NamedPolicy { (BytesReference) args[1], (List) args[2], (String) args[3], - (List) args[4] + (List) args[4], + (String) args[5] ) ); @@ -53,6 +55,8 @@ private static void declareParserOptions(ConstructingObjectParser parser) parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD); parser.declareString(ConstructingObjectParser.constructorArg(), MATCH_FIELD_FIELD); parser.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_FIELDS_FIELD); + parser.declareString(ConstructingObjectParser.optionalConstructorArg(), FORMAT_FIELD); + } public static NamedPolicy fromXContent(XContentParser parser) throws IOException { @@ -82,14 +86,17 @@ public static NamedPolicy fromXContent(XContentParser parser) throws IOException private final List indices; private final String matchField; private final List enrichFields; + private final String format; - NamedPolicy(String type, String name, BytesReference query, List indices, String matchField, List enrichFields) { + NamedPolicy(String type, String name, BytesReference query, List indices, String matchField, List enrichFields, + String format) { this.type = type; this.name = name; this.query = query; this.indices = indices; this.matchField = matchField; this.enrichFields = enrichFields; + this.format = format; } public String getType() { @@ -100,6 +107,10 @@ public String getName() { return name; } + public String getFormat() { + return format; + } + public BytesReference getQuery() { return query; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/PutPolicyRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/PutPolicyRequest.java index 19e816b5bed75..afc7da6febbc1 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/PutPolicyRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/enrich/PutPolicyRequest.java @@ -30,8 +30,13 @@ public final class PutPolicyRequest implements Validatable, ToXContentObject { private final List indices; private final String matchField; private final List enrichFields; + private final String format; public PutPolicyRequest(String name, String type, List indices, String matchField, List enrichFields) { + this(name, type, indices, matchField, enrichFields, null); + } + + public PutPolicyRequest(String name, String type, List indices, String matchField, List enrichFields, String format) { if (Strings.hasLength(name) == false) { throw new IllegalArgumentException("name must be a non-null and non-empty string"); } @@ -53,6 +58,7 @@ public PutPolicyRequest(String name, String type, List indices, String m this.indices = indices; this.matchField = matchField; this.enrichFields = enrichFields; + this.format = format; } public String getName() { @@ -63,6 +69,10 @@ public String getType() { return type; } + public String getFormat() { + return format; + } + public BytesReference getQuery() { return query; } @@ -102,6 +112,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field(NamedPolicy.MATCH_FIELD_FIELD.getPreferredName(), matchField); builder.field(NamedPolicy.ENRICH_FIELDS_FIELD.getPreferredName(), enrichFields); + + if (format != null) { + builder.field(NamedPolicy.FORMAT_FIELD.getPreferredName(), format); + } } builder.endObject(); } @@ -119,12 +133,13 @@ public boolean equals(Object o) { Objects.equals(query, that.query) && Objects.equals(indices, that.indices) && Objects.equals(matchField, that.matchField) && - Objects.equals(enrichFields, that.enrichFields); + Objects.equals(enrichFields, that.enrichFields) && + Objects.equals(format, that.format); } @Override public int hashCode() { - return Objects.hash(name, type, query, indices, matchField, enrichFields); + return Objects.hash(name, type, query, indices, matchField, enrichFields, format); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/GetPolicyResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/GetPolicyResponseTests.java index 6d6bed496d70a..5801b6cd060ca 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/GetPolicyResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/enrich/GetPolicyResponseTests.java @@ -74,7 +74,8 @@ private static EnrichPolicy createRandomEnrichPolicy(XContentType xContentType){ randomBoolean() ? new EnrichPolicy.QuerySource(querySource, xContentType) : null, Arrays.asList(generateRandomStringArray(8, 4, false, false)), randomAlphaOfLength(4), - Arrays.asList(generateRandomStringArray(8, 4, false, false)) + Arrays.asList(generateRandomStringArray(8, 4, false, false)), + null ); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java index 4cce4323243f3..958f9c84ce9da 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser.ValueType; import org.elasticsearch.common.xcontent.ToXContentFragment; @@ -22,31 +23,39 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser.Token; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.mapper.RangeType; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * Represents an enrich policy including its configuration. */ public final class EnrichPolicy implements Writeable, ToXContentFragment { + public static final Version FORMAT_VERSION = Version.V_8_0_0; + public static final String ENRICH_INDEX_NAME_BASE = ".enrich-"; public static final String ENRICH_INDEX_PATTERN = ENRICH_INDEX_NAME_BASE + "*"; public static final String MATCH_TYPE = "match"; + public static final List RANGE_MATCH_TYPES = Arrays.stream(RangeType.values()) + .map(t -> t.typeName() + "_match").collect(Collectors.toList()); public static final String GEO_MATCH_TYPE = "geo_match"; - public static final String[] SUPPORTED_POLICY_TYPES = new String[]{ + public static final String[] SUPPORTED_POLICY_TYPES = ArrayUtils.concat(new String[]{ MATCH_TYPE, GEO_MATCH_TYPE - }; + }, RANGE_MATCH_TYPES.toArray(new String[0])); private static final ParseField QUERY = new ParseField("query"); private static final ParseField INDICES = new ParseField("indices"); private static final ParseField MATCH_FIELD = new ParseField("match_field"); private static final ParseField ENRICH_FIELDS = new ParseField("enrich_fields"); + private static final ParseField FORMAT = new ParseField("format"); private static final ParseField ELASTICSEARCH_VERSION = new ParseField("elasticsearch_version"); @SuppressWarnings("unchecked") @@ -59,8 +68,8 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment { (List) args[1], (String) args[2], (List) args[3], - (Version) args[4] - ) + (Version) args[4], + (String) args[5]) ); static { @@ -78,6 +87,7 @@ private static void declareCommonConstructorParsingOptions(ConstructingObjec parser.declareStringArray(ConstructingObjectParser.constructorArg(), ENRICH_FIELDS); parser.declareField(ConstructingObjectParser.optionalConstructorArg(), ((p, c) -> Version.fromString(p.text())), ELASTICSEARCH_VERSION, ValueType.STRING); + parser.declareString(ConstructingObjectParser.optionalConstructorArg(), FORMAT); } public static EnrichPolicy fromXContent(XContentParser parser) throws IOException { @@ -107,6 +117,7 @@ public static EnrichPolicy fromXContent(XContentParser parser) throws IOExceptio private final String matchField; private final List enrichFields; private final Version elasticsearchVersion; + private final String format; public EnrichPolicy(StreamInput in) throws IOException { this( @@ -115,16 +126,16 @@ public EnrichPolicy(StreamInput in) throws IOException { in.readStringList(), in.readString(), in.readStringList(), - Version.readVersion(in) - ); + Version.readVersion(in), + in.getVersion().onOrAfter(FORMAT_VERSION) ? in.readOptionalString() : null); } public EnrichPolicy(String type, QuerySource query, List indices, String matchField, - List enrichFields) { - this(type, query, indices, matchField, enrichFields, Version.CURRENT); + List enrichFields, String format) { + this(type, query, indices, matchField, enrichFields, Version.CURRENT, format); } public EnrichPolicy(String type, @@ -132,13 +143,14 @@ public EnrichPolicy(String type, List indices, String matchField, List enrichFields, - Version elasticsearchVersion) { + Version elasticsearchVersion, String format) { this.type = type; this.query = query; this.indices = indices; this.matchField = matchField; this.enrichFields = enrichFields; this.elasticsearchVersion = elasticsearchVersion != null ? elasticsearchVersion : Version.CURRENT; + this.format = format; } public String getType() { @@ -161,6 +173,10 @@ public List getEnrichFields() { return enrichFields; } + public String getFormat() { + return format; + } + public Version getElasticsearchVersion() { return elasticsearchVersion; } @@ -177,6 +193,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(matchField); out.writeStringCollection(enrichFields); Version.writeVersion(elasticsearchVersion, out); + if (out.getVersion().onOrAfter(FORMAT_VERSION)) { + out.writeOptionalString(format); + } } @Override @@ -199,6 +218,9 @@ private void toInnerXContent(XContentBuilder builder, Params params) throws IOEx if (params.paramAsBoolean("include_version", false) && elasticsearchVersion != null) { builder.field(ELASTICSEARCH_VERSION.getPreferredName(), elasticsearchVersion.toString()); } + if (format != null) { + builder.field(FORMAT.getPreferredName(), format); + } } @Override @@ -211,7 +233,8 @@ public boolean equals(Object o) { indices.equals(policy.indices) && matchField.equals(policy.matchField) && enrichFields.equals(policy.enrichFields) && - elasticsearchVersion.equals(policy.elasticsearchVersion); + elasticsearchVersion.equals(policy.elasticsearchVersion) && + Objects.equals(format, policy.format); } @Override @@ -222,7 +245,8 @@ public int hashCode() { indices, matchField, enrichFields, - elasticsearchVersion + elasticsearchVersion, + format ); } @@ -291,7 +315,8 @@ public static class NamedPolicy implements Writeable, ToXContentFragment { (List) args[2], (String) args[3], (List) args[4], - (Version) args[5]) + (Version) args[5], + (String) args[6]) ) ); diff --git a/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java b/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java index be206eff07cce..702f52336c3b3 100644 --- a/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java +++ b/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java @@ -28,19 +28,22 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; public abstract class CommonEnrichRestTestCase extends ESRestTestCase { @After + @SuppressWarnings("unchecked") public void deletePolicies() throws Exception { Map responseMap = toMap(adminClient().performRequest(new Request("GET", "/_enrich/policy"))); - @SuppressWarnings("unchecked") List> policies = (List>) responseMap.get("policies"); for (Map entry : policies) { - client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("config.match.name", entry))); + Map config = ((Map>) entry.get("config")).values().iterator().next(); + String endpoint = "/_enrich/policy/" + config.get("name"); + assertOK(client().performRequest(new Request("DELETE", endpoint))); - List sourceIndices = (List) XContentMapValues.extractValue("config.match.indices", entry); + List sourceIndices = (List) config.get("indices"); for (Object sourceIndex : sourceIndices) { try { client().performRequest(new Request("DELETE", "/" + sourceIndex)); @@ -51,21 +54,22 @@ public void deletePolicies() throws Exception { } } - private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception { + private void setupGenericLifecycleTest(boolean deletePipeilne, String field, String type) throws Exception { // Create source index: createSourceIndex("my-source-index"); // Create the policy: Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy"); - putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index")); + putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index", field, type)); assertOK(client().performRequest(putPolicyRequest)); // Add entry to source index and then refresh: Request indexRequest = new Request("PUT", "/my-source-index/_doc/elastic.co"); - indexRequest.setJsonEntity("{\"host\": \"elastic.co\",\"globalRank\": 25,\"tldRank\": 7,\"tld\": \"co\"}"); + indexRequest.setJsonEntity("{\"host\": \"elastic.co\",\"globalRank\": 25,\"tldRank\": 7,\"tld\": \"co\"," + + "\"ip\":\"8.8.8.0/24\",\"date\":{\"gte\":1,\"lte\": 5},\"integer\":{\"gte\":1,\"lte\":5},\"long\":{\"gte\":1,\"lte\":5}," + + "\"double\":{\"gte\":1,\"lte\":5},\"float\":{\"gte\":1,\"lte\":5}}"); assertOK(client().performRequest(indexRequest)); Request refreshRequest = new Request("POST", "/my-source-index/_refresh"); assertOK(client().performRequest(refreshRequest)); - // Execute the policy: Request executePolicyRequest = new Request("POST", "/_enrich/policy/my_policy/_execute"); assertOK(client().performRequest(executePolicyRequest)); @@ -73,14 +77,15 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception // Create pipeline Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline"); putPipelineRequest.setJsonEntity( - "{\"processors\":[" + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" + "]}" + "{\"processors\":[" + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"" + field + "\",\"target_field\":\"entry\"}}]}" ); assertOK(client().performRequest(putPipelineRequest)); // Index document using pipeline with enrich processor: indexRequest = new Request("PUT", "/my-index/_doc/1"); indexRequest.addParameter("pipeline", "my_pipeline"); - indexRequest.setJsonEntity("{\"host\": \"elastic.co\"}"); + indexRequest.setJsonEntity("{\"host\": \"elastic.co\",\"ip\":\"8.8.8.8\",\"date\":3,\"integer\":3,\"long\":3,\"double\":3," + + "\"float\":3}"); assertOK(client().performRequest(indexRequest)); // Check if document has been enriched @@ -88,7 +93,7 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception Map response = toMap(client().performRequest(getRequest)); Map entry = (Map) ((Map) response.get("_source")).get("entry"); assertThat(entry.size(), equalTo(4)); - assertThat(entry.get("host"), equalTo("elastic.co")); + assertThat(entry.get(field), notNullValue()); assertThat(entry.get("tld"), equalTo("co")); assertThat(entry.get("globalRank"), equalTo(25)); assertThat(entry.get("tldRank"), equalTo(7)); @@ -99,8 +104,38 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception } } - public void testBasicFlow() throws Exception { - setupGenericLifecycleTest(true); + public void testBasicFlowKeyword() throws Exception { + setupGenericLifecycleTest(true, "host", "match"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowDate() throws Exception { + setupGenericLifecycleTest(true, "date", "date_range_match"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowInteger() throws Exception { + setupGenericLifecycleTest(true, "integer", "integer_range_match"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowLong() throws Exception { + setupGenericLifecycleTest(true, "long", "long_range_match"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowDouble() throws Exception { + setupGenericLifecycleTest(true, "double", "double_range_match"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowFloat() throws Exception { + setupGenericLifecycleTest(true, "float", "float_range_match"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowIp() throws Exception { + setupGenericLifecycleTest(true, "ip", "ip_range_match"); assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); } @@ -129,7 +164,7 @@ public void testDeleteIsCaseSensitive() throws Exception { public void testDeleteExistingPipeline() throws Exception { // lets not delete the pipeline at first, to test the failure - setupGenericLifecycleTest(false); + setupGenericLifecycleTest(false, "host", "match"); Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline"); putPipelineRequest.setJsonEntity( @@ -156,14 +191,18 @@ public void testDeleteExistingPipeline() throws Exception { } public static String generatePolicySource(String index) throws IOException { - XContentBuilder source = jsonBuilder().startObject().startObject("match"); + return generatePolicySource(index, "host", "match"); + } + + public static String generatePolicySource(String index, String field, String type) throws IOException { + XContentBuilder source = jsonBuilder().startObject().startObject(type); { source.field("indices", index); if (randomBoolean()) { source.field("query", QueryBuilders.matchAllQuery()); } - source.field("match_field", "host"); - source.field("enrich_fields", new String[] { "globalRank", "tldRank", "tld" }); + source.field("match_field", field); + source.field("enrich_fields", new String[]{"globalRank", "tldRank", "tld"}); } source.endObject().endObject(); return Strings.toString(source); @@ -179,7 +218,13 @@ public static String createSourceIndexMapping() { + "{\"host\": {\"type\":\"keyword\"}," + "\"globalRank\":{\"type\":\"keyword\"}," + "\"tldRank\":{\"type\":\"keyword\"}," - + "\"tld\":{\"type\":\"keyword\"}" + + "\"tld\":{\"type\":\"keyword\"}," + + "\"date\":{\"type\":\"date_range\"}," + + "\"integer\":{\"type\":\"integer_range\"}," + + "\"long\":{\"type\":\"long_range\"}," + + "\"double\":{\"type\":\"double_range\"}," + + "\"float\":{\"type\":\"float_range\"}," + + "\"ip\":{\"type\":\"ip_range\"}" + "}"; } diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java index e8fda24229a39..9e9f53da23a41 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichMultiNodeIT.java @@ -77,7 +77,8 @@ public void testEnrichAPIs() { null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, - List.of(DECORATE_FIELDS) + List.of(DECORATE_FIELDS), + null ); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); @@ -208,7 +209,8 @@ private static void createAndExecutePolicy() { null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, - List.of(DECORATE_FIELDS) + List.of(DECORATE_FIELDS), + null ); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(POLICY_NAME, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java index 1c7ac94ca6ec2..2fa4ba05bebf2 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichRestartIT.java @@ -42,7 +42,8 @@ public void testRestart() throws Exception { null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, - List.of(DECORATE_FIELDS) + List.of(DECORATE_FIELDS), + null ); createSourceIndices(client(), enrichPolicy); for (int i = 0; i < numPolicies; i++) { diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index d2d44e571fa6b..08d7a91b5f42b 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -233,12 +233,17 @@ private static void validateField(Map properties, String fieldName, boolea } private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { - // Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type - final String keyType; final CheckedFunction matchFieldMapping; if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) { matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false); // No need to also configure index_options, because keyword type defaults to 'docs'. + } else if (EnrichPolicy.RANGE_MATCH_TYPES.contains(policy.getType())) { + matchFieldMapping = (builder) -> { + if (policy.getFormat() != null && policy.getType().startsWith("date")) { + builder.field("format", policy.getFormat()); + } + return builder.field("type", policy.getType().replace("_match", "")); + }; } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) { matchFieldMapping = (builder) -> builder.field("type", "geo_shape"); } else { diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index a528a6461bd58..005b4c3cb9b56 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -68,6 +68,11 @@ public Processor create(Map processorFactories, Strin throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 128"); } + if (EnrichPolicy.RANGE_MATCH_TYPES.contains(policyType)) { + // range match uses the same processor as simple match + policyType = EnrichPolicy.MATCH_TYPE; + } + switch (policyType) { case EnrichPolicy.MATCH_TYPE: return new MatchProcessor( diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java index a1ea44b9063fb..fd7b9b2c1b17f 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichStore.java @@ -86,7 +86,8 @@ public static void putPolicy( policy.getIndices(), policy.getMatchField(), policy.getEnrichFields(), - Version.CURRENT + Version.CURRENT, + policy.getFormat() ); } else { finalPolicy = policy; diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java index c61c050f7b95c..33399e2194388 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/BasicEnrichTests.java @@ -73,7 +73,8 @@ public void testIngestDataWithMatchProcessor() { null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, - List.of(DECORATE_FIELDS) + List.of(DECORATE_FIELDS), + null ); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); @@ -131,36 +132,63 @@ public void testIngestDataWithMatchProcessor() { assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo((long) numDocs)); } + public void testIngestDataWithIpRangeMatchProcessor() { + testIngestData("ip_range_match", "8.8.8.0/24", "8.8.8.8", "9.9.9.9"); + } + + public void testIngestDataWithIntegerRangeMatchProcessor() { + testIngestData("integer_range_match", Map.of("gt", 1, "lt", 10), 5, 15); + } + + public void testIngestDataWithLongRangeMatchProcessor() { + testIngestData("long_range_match", Map.of("gt", 1, "lt", 10), 5, 15); + } + + public void testIngestDataWithFloatRangeMatchProcessor() { + testIngestData("float_range_match", Map.of("gt", 1, "lt", 10), 5, 15); + } + + public void testIngestDataWithDoubleRangeMatchProcessor() { + testIngestData("double_range_match", Map.of("gt", 1, "lt", 10), 5, 15); + } + + public void testIngestDataWithDateRangeMatchProcessor() { + testIngestData("date_range_match", Map.of("gt", 1, "lt", 10), 5, 15); + } + public void testIngestDataWithGeoMatchProcessor() { - String matchField = "location"; + testIngestData( + EnrichPolicy.GEO_MATCH_TYPE, + "POLYGON((" + + "-122.08592534065245 37.38501746624134," + + "-122.08193421363829 37.38501746624134," + + "-122.08193421363829 37.3879329075567," + + "-122.08592534065245 37.3879329075567," + + "-122.08592534065245 37.38501746624134))", + "37.386444, -122.083863", + "0.0, -122.083863" + ); + } + + public void testIngestDataWithDateRangeMatchProcessorWithFormat() { + String matchField = "matched_field"; String enrichField = "zipcode"; // create enrich index { IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME); - indexRequest.source( - Map.of( - matchField, - "POLYGON((" - + "-122.08592534065245 37.38501746624134," - + "-122.08193421363829 37.38501746624134," - + "-122.08193421363829 37.3879329075567," - + "-122.08592534065245 37.3879329075567," - + "-122.08592534065245 37.38501746624134))", - "zipcode", - "94040" - ) - ); + indexRequest.source(Map.of(matchField, Map.of("gt", "20200101", "lt", "20201231"), "zipcode", "94040")); client().index(indexRequest).actionGet(); client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet(); } String policyName = "my-policy"; EnrichPolicy enrichPolicy = new EnrichPolicy( - EnrichPolicy.GEO_MATCH_TYPE, + "date_range_match", null, List.of(SOURCE_INDEX_NAME), matchField, - List.of(enrichField) + List.of(enrichField), + "basic_date" ); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); @@ -179,7 +207,7 @@ public void testIngestDataWithGeoMatchProcessor() { IndexRequest indexRequest = new IndexRequest(); indexRequest.id("_id"); indexRequest.setPipeline(pipelineName); - indexRequest.source(Map.of(matchField, "37.386444, -122.083863")); // point within match boundary + indexRequest.source(Map.of(matchField, "20200601")); bulkRequest.add(indexRequest); BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false)); @@ -213,7 +241,14 @@ public void testMultiplePolicies() { client().index(indexRequest).actionGet(); client().admin().indices().refresh(new RefreshRequest("source-" + i)).actionGet(); - EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source-" + i), "key", List.of("value")); + EnrichPolicy enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of("source-" + i), + "key", + List.of("value"), + null + ); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); @@ -256,7 +291,14 @@ public void testAsyncTaskExecute() throws Exception { client().admin().indices().refresh(new RefreshRequest(sourceIndexName)).actionGet(); } - EnrichPolicy enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value")); + EnrichPolicy enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of(sourceIndexName), + "key", + List.of("value"), + null + ); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); ExecuteEnrichPolicyAction.Response executeResponse = client().execute( @@ -310,7 +352,8 @@ public void testTemplating() throws Exception { null, List.of(SOURCE_INDEX_NAME), MATCH_FIELD, - List.of(DECORATE_FIELDS) + List.of(DECORATE_FIELDS), + null ); PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); @@ -358,4 +401,68 @@ private List createSourceMatchIndex(int numKeys, int numDocsPerKey) { client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet(); return List.copyOf(keys); } + + private void testIngestData(String type, Object enrichValue, Object matchedValue, Object mismatchedValue) { + String matchField = "matched_field"; + String enrichField = "zipcode"; + // create enrich index + { + IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME); + indexRequest.source(Map.of(matchField, enrichValue, "zipcode", "94040")); + client().index(indexRequest).actionGet(); + client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet(); + } + + String policyName = "my-policy"; + EnrichPolicy enrichPolicy = new EnrichPolicy(type, null, List.of(SOURCE_INDEX_NAME), matchField, List.of(enrichField), null); + PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet(); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet(); + + String pipelineName = "my-pipeline"; + String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + + policyName + + "\", \"field\": \"" + + matchField + + "\", \"target_field\": \"enriched\", \"max_matches\": 1 }}]}"; + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON); + client().admin().cluster().putPipeline(putPipelineRequest).actionGet(); + + BulkRequest bulkRequest = new BulkRequest("my-index"); + IndexRequest indexRequest = new IndexRequest(); + indexRequest.id("_id"); + indexRequest.setPipeline(pipelineName); + indexRequest.source(Map.of(matchField, matchedValue)); + bulkRequest.add(indexRequest); + indexRequest = new IndexRequest(); + indexRequest.id("_id2"); + indexRequest.setPipeline(pipelineName); + indexRequest.source(Map.of(matchField, mismatchedValue)); + bulkRequest.add(indexRequest); + BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet(); + assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false)); + assertThat(bulkResponse.getItems().length, equalTo(2)); + assertThat(bulkResponse.getItems()[0].getId(), equalTo("_id")); + assertThat(bulkResponse.getItems()[1].getId(), equalTo("_id2")); + + GetResponse getResponse = client().get(new GetRequest("my-index", "_id")).actionGet(); + Map source = getResponse.getSourceAsMap(); + Map entries = (Map) source.get("enriched"); + assertThat(entries, notNullValue()); + assertThat(entries.size(), equalTo(2)); + assertThat(entries.containsKey(matchField), is(true)); + assertThat(entries.get(enrichField), equalTo("94040")); + + getResponse = client().get(new GetRequest("my-index", "_id2")).actionGet(); + source = getResponse.getSourceAsMap(); + assertThat(source.containsKey("enriched"), is(false)); + + EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, new EnrichStatsAction.Request()) + .actionGet(); + assertThat(statsResponse.getCoordinatorStats().size(), equalTo(1)); + String localNodeId = getInstanceFromNode(ClusterService.class).localNode().getId(); + assertThat(statsResponse.getCoordinatorStats().get(0).getNodeId(), equalTo(localNodeId)); + assertThat(statsResponse.getCoordinatorStats().get(0).getRemoteRequestsTotal(), greaterThanOrEqualTo(1L)); + assertThat(statsResponse.getCoordinatorStats().get(0).getExecutedSearchesTotal(), equalTo(2L)); + } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java index eb049943e86b4..7079cf764a42f 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java @@ -136,7 +136,14 @@ protected Runnable createPolicyRunner( public void testNonConcurrentPolicyExecution() throws InterruptedException { String testPolicyName = "test_policy"; - EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("some_index"), "keyfield", List.of("valuefield")); + EnrichPolicy testPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of("some_index"), + "keyfield", + List.of("valuefield"), + null + ); final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor( Settings.EMPTY, null, @@ -193,7 +200,14 @@ public void testNonConcurrentPolicyExecution() throws InterruptedException { public void testMaximumPolicyExecutionLimit() throws InterruptedException { String testPolicyBaseName = "test_policy_"; Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build(); - EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("some_index"), "keyfield", List.of("valuefield")); + EnrichPolicy testPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of("some_index"), + "keyfield", + List.of("valuefield"), + null + ); final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor( testSettings, null, diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java index a800ec5fc0569..894c9d57abaa9 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceServiceTests.java @@ -120,7 +120,7 @@ private EnrichPolicy randomPolicy() { enrichKeys.add(randomAlphaOfLength(10)); } String sourceIndex = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - return new EnrichPolicy(MATCH_TYPE, null, List.of(sourceIndex), randomAlphaOfLength(10), enrichKeys); + return new EnrichPolicy(MATCH_TYPE, null, List.of(sourceIndex), randomAlphaOfLength(10), enrichKeys, null); } private void addPolicy(String policyName, EnrichPolicy policy) throws InterruptedException { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index 2009e65eae89a..4e8a29c93a388 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -18,7 +18,6 @@ import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.elasticsearch.action.admin.indices.segments.ShardSegments; -import org.elasticsearch.geo.GeoPlugin; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; @@ -31,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.smile.SmileXContent; +import org.elasticsearch.geo.GeoPlugin; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.MapperService; @@ -122,7 +122,7 @@ public void testRunner() throws Exception { assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); List enrichFields = List.of("field2", "field5"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields, null); String policyName = "test1"; final long createTime = randomNonNegativeLong(); @@ -179,6 +179,178 @@ public void testRunner() throws Exception { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } + public void testRunnerIpRangeMatchType() throws Exception { + final String sourceIndex = "source-index"; + IndexResponse indexRequest = client().index( + new IndexRequest().index(sourceIndex) + .id("id") + .source("{" + "\"network\":" + "\"8.8.8.0/24\"," + "\"zipcode\":90210" + "}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("network"), is(equalTo("8.8.8.0/24"))); + assertThat(sourceDocMap.get("zipcode"), is(equalTo(90210))); + + List enrichFields = List.of("zipcode"); + EnrichPolicy policy = new EnrichPolicy("ip_range_match", null, List.of(sourceIndex), "network", enrichFields, null); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + validateMappingMetadata(mapping, policyName, policy); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map field1 = (Map) properties.get("network"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo("ip_range"))); + assertNull(field1.get("doc_values")); + + // Validate document structure + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(2))); + assertThat(enrichDocument.get("network"), is(equalTo("8.8.8.0/24"))); + assertThat(enrichDocument.get("zipcode"), is(equalTo(90210))); + + // Validate segments + validateSegments(createdEnrichIndex, 1); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + + public void testRunnerIntegerRangeMatchType() throws Exception { + testNumberRangeMatchType("integer"); + } + + public void testRunnerLongRangeMatchType() throws Exception { + testNumberRangeMatchType("long"); + } + + public void testRunnerFloatRangeMatchType() throws Exception { + testNumberRangeMatchType("float"); + } + + public void testRunnerDoubleRangeMatchType() throws Exception { + testNumberRangeMatchType("double"); + } + + public void testRunnerDateRangeMatchType() throws Exception { + testNumberRangeMatchType("date"); + } + + private void testNumberRangeMatchType(String rangeType) throws Exception { + final String sourceIndex = "source-index"; + IndexResponse indexRequest = client().index( + new IndexRequest().index(sourceIndex) + .id("id") + .source("{" + "\"range\":" + "{" + "\"gt\":1," + "\"lt\":10" + "}," + "\"zipcode\":90210" + "}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("range"), is(equalTo(Map.of("lt", 10, "gt", 1)))); + assertThat(sourceDocMap.get("zipcode"), is(equalTo(90210))); + + List enrichFields = List.of("zipcode"); + EnrichPolicy policy = new EnrichPolicy(rangeType + "_range_match", null, List.of(sourceIndex), "range", enrichFields, null); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + validateMappingMetadata(mapping, policyName, policy); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map field1 = (Map) properties.get("range"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo(rangeType + "_range"))); + assertNull(field1.get("doc_values")); + + // Validate document structure + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(2))); + assertThat(enrichDocument.get("range"), is(equalTo(Map.of("lt", 10, "gt", 1)))); + assertThat(enrichDocument.get("zipcode"), is(equalTo(90210))); + + // Validate segments + validateSegments(createdEnrichIndex, 1); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + public void testRunnerGeoMatchType() throws Exception { final String sourceIndex = "source-index"; IndexResponse indexRequest = client().index( @@ -199,7 +371,7 @@ public void testRunnerGeoMatchType() throws Exception { assertThat(sourceDocMap.get("zipcode"), is(equalTo(90210))); List enrichFields = List.of("zipcode"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.GEO_MATCH_TYPE, null, List.of(sourceIndex), "location", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.GEO_MATCH_TYPE, null, List.of(sourceIndex), "location", enrichFields, null); String policyName = "test1"; final long createTime = randomNonNegativeLong(); @@ -301,7 +473,7 @@ public void testRunnerMultiSource() throws Exception { String sourceIndexPattern = baseSourceName + "*"; List enrichFields = List.of("idx", "field1", "field2", "field5"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexPattern), "key", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexPattern), "key", enrichFields, null); String policyName = "test1"; final long createTime = randomNonNegativeLong(); @@ -413,7 +585,7 @@ public void testRunnerMultiSourceDocIdCollisions() throws Exception { String sourceIndexPattern = baseSourceName + "*"; List enrichFields = List.of("idx", "field1", "field2", "field5"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexPattern), "key", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexPattern), "key", enrichFields, null); String policyName = "test1"; final long createTime = randomNonNegativeLong(); @@ -523,7 +695,7 @@ public void testRunnerMultiSourceEnrichKeyCollisions() throws Exception { String sourceIndexPattern = baseSourceName + "*"; List enrichFields = List.of("idx", "field1", "field2", "field5"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexPattern), "key", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexPattern), "key", enrichFields, null); String policyName = "test1"; final long createTime = randomNonNegativeLong(); @@ -583,7 +755,7 @@ public void testRunnerNoSourceIndex() throws Exception { final String sourceIndex = "source-index"; List enrichFields = List.of("field2", "field5"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields, null); String policyName = "test1"; final long createTime = randomNonNegativeLong(); @@ -610,7 +782,7 @@ public void testRunnerNoSourceMapping() throws Exception { assertTrue(createResponse.isAcknowledged()); List enrichFields = List.of("field2", "field5"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields, null); String policyName = "test1"; final long createTime = randomNonNegativeLong(); @@ -670,7 +842,7 @@ public void testRunnerKeyNestedSourceMapping() throws Exception { String policyName = "test1"; List enrichFields = List.of("field2"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "nesting.key", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "nesting.key", enrichFields, null); final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); @@ -733,7 +905,7 @@ public void testRunnerValueNestedSourceMapping() throws Exception { String policyName = "test1"; List enrichFields = List.of("nesting.field2", "missingField"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "key", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "key", enrichFields, null); final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); @@ -822,7 +994,7 @@ public void testRunnerObjectSourceMapping() throws Exception { String policyName = "test1"; List enrichFields = List.of("data.field2", "missingField"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields, null); final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); @@ -940,7 +1112,7 @@ public void testRunnerExplicitObjectSourceMapping() throws Exception { String policyName = "test1"; List enrichFields = List.of("data.field2", "missingField"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields, null); final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); @@ -1071,7 +1243,14 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { String policyName = "test1"; List enrichFields = List.of("data.fields.field2", "missingField"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.fields.field1", enrichFields); + EnrichPolicy policy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of(sourceIndex), + "data.fields.field1", + enrichFields, + null + ); final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); @@ -1187,7 +1366,7 @@ public void testRunnerDottedKeyNameSourceMapping() throws Exception { String policyName = "test1"; List enrichFields = List.of("data.field2", "missingField"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields, null); final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); @@ -1280,7 +1459,7 @@ public void testRunnerWithForceMergeRetry() throws Exception { assertThat(sourceDocMap.get("field5"), is(equalTo("value5"))); List enrichFields = List.of("field2", "field5"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields, null); String policyName = "test1"; final long createTime = randomNonNegativeLong(); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyTests.java index 967fb05a1fc94..c50b4b797445c 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyTests.java @@ -66,7 +66,8 @@ public static EnrichPolicy randomEnrichPolicy(XContentType xContentType) { .map(s -> s.toLowerCase(Locale.ROOT)) .collect(Collectors.toList()), randomAlphaOfLength(4), - Arrays.asList(generateRandomStringArray(8, 4, false, false)) + Arrays.asList(generateRandomStringArray(8, 4, false, false)), + null ); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java index 87f4dc5198ac6..ea77141ecebdf 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyUpdateTests.java @@ -40,7 +40,7 @@ public void testUpdatePolicyOnly() { IngestService ingestService = getInstanceFromNode(IngestService.class); createIndex("index", Settings.EMPTY, "_doc", "key1", "type=keyword", "field1", "type=keyword"); - EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("index"), "key1", List.of("field1")); + EnrichPolicy instance1 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("index"), "key1", List.of("field1"), null); createSourceIndices(client(), instance1); PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request("my_policy", instance1); assertAcked(client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet()); @@ -60,7 +60,7 @@ public void testUpdatePolicyOnly() { Pipeline pipelineInstance1 = ingestService.getPipeline("1"); assertThat(pipelineInstance1.getProcessors().get(0), instanceOf(MatchProcessor.class)); - EnrichPolicy instance2 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("index2"), "key2", List.of("field2")); + EnrichPolicy instance2 = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("index2"), "key2", List.of("field2"), null); createSourceIndices(client(), instance2); ResourceAlreadyExistsException exc = expectThrows( ResourceAlreadyExistsException.class, diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java index dec702e6008d5..b151b13cda262 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactoryTests.java @@ -43,7 +43,7 @@ public void initializeScriptService() { public void testCreateProcessorInstance() throws Exception { List enrichValues = List.of("globalRank", "tldRank", "tld"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key", enrichValues); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key", enrichValues, null); try (Client client = new NoOpClient(this.getClass().getSimpleName() + "TestClient")) { EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService); factory.metadata = createMetadata("majestic", policy); @@ -152,7 +152,7 @@ public void testPolicyNameMissing() { public void testUnsupportedPolicy() throws Exception { List enrichValues = List.of("globalRank", "tldRank", "tld"); - EnrichPolicy policy = new EnrichPolicy("unsupported", null, List.of("source_index"), "my_key", enrichValues); + EnrichPolicy policy = new EnrichPolicy("unsupported", null, List.of("source_index"), "my_key", enrichValues, null); EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService); factory.metadata = createMetadata("majestic", policy); @@ -171,7 +171,7 @@ public void testUnsupportedPolicy() throws Exception { public void testCompactEnrichValuesFormat() throws Exception { List enrichValues = List.of("globalRank", "tldRank", "tld"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host", enrichValues); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host", enrichValues, null); try (Client client = new NoOpClient(this.getClass().getSimpleName() + "TestClient")) { EnrichProcessorFactory factory = new EnrichProcessorFactory(client, scriptService); factory.metadata = createMetadata("majestic", policy); @@ -191,7 +191,7 @@ public void testCompactEnrichValuesFormat() throws Exception { public void testNoTargetField() throws Exception { List enrichValues = List.of("globalRank", "tldRank", "tld"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host", enrichValues); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host", enrichValues, null); EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService); factory.metadata = createMetadata("majestic", policy); @@ -205,7 +205,7 @@ public void testNoTargetField() throws Exception { public void testIllegalMaxMatches() throws Exception { List enrichValues = List.of("globalRank", "tldRank", "tld"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key", enrichValues); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key", enrichValues, null); EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService); factory.metadata = createMetadata("majestic", policy); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java index 21fef30f6b11a..466a145b20948 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichResiliencyTests.java @@ -75,7 +75,7 @@ public void testWriteThreadLivenessBackToBack() throws Exception { PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request( enrichPolicyName, - new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value")) + new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value"), null) ) ).actionGet(); @@ -174,7 +174,7 @@ public void testWriteThreadLivenessWithPipeline() throws Exception { PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request( enrichPolicyName, - new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value")) + new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(enrichIndexName), "my_key", List.of("my_value"), null) ) ).actionGet(); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreCrudTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreCrudTests.java index 88d6c172ab16e..ddac56e0493e3 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreCrudTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreCrudTests.java @@ -102,12 +102,18 @@ public void testPutValidation() throws Exception { assertThat(error.getMessage(), equalTo("Invalid policy name [myPolicy], must be lowercase")); } { - EnrichPolicy invalidPolicy = new EnrichPolicy("unsupported_type", null, List.of("index"), "field", List.of("field")); + EnrichPolicy invalidPolicy = new EnrichPolicy("unsupported_type", null, List.of("index"), "field", List.of("field"), null); IllegalArgumentException error = expectThrows( IllegalArgumentException.class, () -> saveEnrichPolicy("name", invalidPolicy, clusterService) ); - assertThat(error.getMessage(), equalTo("unsupported policy type [unsupported_type], supported types are [match, geo_match]")); + assertThat( + error.getMessage(), + equalTo( + "unsupported policy type [unsupported_type], supported types are [match, geo_match, " + + "ip_range_match, date_range_match, float_range_match, double_range_match, integer_range_match, long_range_match]" + ) + ); } }