diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java index 326323cd35bda..ade5ff69eae4e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/EnrichPolicy.java @@ -38,9 +38,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment { public static final String MATCH_TYPE = "match"; public static final String GEO_MATCH_TYPE = "geo_match"; + public static final String RANGE_TYPE = "range"; public static final String[] SUPPORTED_POLICY_TYPES = new String[]{ MATCH_TYPE, - GEO_MATCH_TYPE + GEO_MATCH_TYPE, + RANGE_TYPE }; private static final ParseField QUERY = new ParseField("query"); diff --git a/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java b/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java index be206eff07cce..2d9d5be12cd65 100644 --- a/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java +++ b/x-pack/plugin/enrich/qa/common/src/main/java/org/elasticsearch/test/enrich/CommonEnrichRestTestCase.java @@ -28,19 +28,22 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; public abstract class CommonEnrichRestTestCase extends ESRestTestCase { @After public void deletePolicies() throws Exception { Map responseMap = toMap(adminClient().performRequest(new Request("GET", "/_enrich/policy"))); - @SuppressWarnings("unchecked") - List> policies = (List>) responseMap.get("policies"); + List> policies = unsafeGetProperty(responseMap, "policies"); for (Map entry : policies) { - client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("config.match.name", entry))); + Map> config = unsafeGetProperty(entry, "config"); + Map policy = config.values().iterator().next(); + String endpoint = "/_enrich/policy/" + policy.get("name"); + assertOK(client().performRequest(new Request("DELETE", endpoint))); - List sourceIndices = (List) XContentMapValues.extractValue("config.match.indices", entry); + List sourceIndices = (List) policy.get("indices"); for (Object sourceIndex : sourceIndices) { try { client().performRequest(new Request("DELETE", "/" + sourceIndex)); @@ -51,17 +54,49 @@ public void deletePolicies() throws Exception { } } - private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception { + @SuppressWarnings("unchecked") + private Property unsafeGetProperty(Map map, String key) { + return (Property) map.get(key); + } + + private void setupGenericLifecycleTest(boolean deletePipeilne, String field, String type, String value) throws Exception { // Create source index: createSourceIndex("my-source-index"); + // Create the policy: Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy"); - putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index")); + putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index", field, type)); assertOK(client().performRequest(putPolicyRequest)); // Add entry to source index and then refresh: Request indexRequest = new Request("PUT", "/my-source-index/_doc/elastic.co"); - indexRequest.setJsonEntity("{\"host\": \"elastic.co\",\"globalRank\": 25,\"tldRank\": 7,\"tld\": \"co\"}"); + indexRequest.setJsonEntity("{" + + "\"host\": \"elastic.co\"," + + "\"globalRank\": 25," + + "\"tldRank\": 7," + + "\"tld\": \"co\", " + + "\"date\": {" + + "\"gte\" : \"2021-09-05\"," + + "\"lt\" : \"2021-09-07\"" + + "}, " + + "\"integer\": {" + + "\"gte\" : 40," + + "\"lt\" : 42" + + "}, " + + "\"long\": {" + + "\"gte\" : 8000000," + + "\"lt\" : 9000000" + + "}, " + + "\"double\": {" + + "\"gte\" : 10.10," + + "\"lt\" : 20.20" + + "}, " + + "\"float\": {" + + "\"gte\" : 10000.5," + + "\"lt\" : 10000.7" + + "}, " + + "\"ip\": \"100.0.0.0/4\"" + + "}"); assertOK(client().performRequest(indexRequest)); Request refreshRequest = new Request("POST", "/my-source-index/_refresh"); assertOK(client().performRequest(refreshRequest)); @@ -73,14 +108,14 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception // Create pipeline Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline"); putPipelineRequest.setJsonEntity( - "{\"processors\":[" + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" + "]}" + "{\"processors\":[" + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\""+field+"\",\"target_field\":\"entry\"}}" + "]}" ); assertOK(client().performRequest(putPipelineRequest)); // Index document using pipeline with enrich processor: indexRequest = new Request("PUT", "/my-index/_doc/1"); indexRequest.addParameter("pipeline", "my_pipeline"); - indexRequest.setJsonEntity("{\"host\": \"elastic.co\"}"); + indexRequest.setJsonEntity("{\""+field+"\": \""+value+"\"}"); assertOK(client().performRequest(indexRequest)); // Check if document has been enriched @@ -88,10 +123,12 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception Map response = toMap(client().performRequest(getRequest)); Map entry = (Map) ((Map) response.get("_source")).get("entry"); assertThat(entry.size(), equalTo(4)); - assertThat(entry.get("host"), equalTo("elastic.co")); + assertThat(entry.get(field), notNullValue()); assertThat(entry.get("tld"), equalTo("co")); assertThat(entry.get("globalRank"), equalTo(25)); assertThat(entry.get("tldRank"), equalTo(7)); + Object originalMatchValue = ((Map) response.get("_source")).get(field); + assertThat(originalMatchValue, equalTo(value)); if (deletePipeilne) { // delete the pipeline so the policies can be deleted @@ -99,8 +136,38 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception } } - public void testBasicFlow() throws Exception { - setupGenericLifecycleTest(true); + public void testBasicFlowKeyword() throws Exception { + setupGenericLifecycleTest(true, "host", "match", "elastic.co"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowDate() throws Exception { + setupGenericLifecycleTest(true, "date", "range", "2021-09-06"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowInteger() throws Exception { + setupGenericLifecycleTest(true, "integer", "range", "41"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowLong() throws Exception { + setupGenericLifecycleTest(true, "long", "range", "8411017"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowDouble() throws Exception { + setupGenericLifecycleTest(true, "double", "range", "15.15"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowFloat() throws Exception { + setupGenericLifecycleTest(true, "float", "range", "10000.66666"); + assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); + } + + public void testBasicFlowIp() throws Exception { + setupGenericLifecycleTest(true, "ip", "range", "100.120.140.160"); assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES); } @@ -129,7 +196,7 @@ public void testDeleteIsCaseSensitive() throws Exception { public void testDeleteExistingPipeline() throws Exception { // lets not delete the pipeline at first, to test the failure - setupGenericLifecycleTest(false); + setupGenericLifecycleTest(false, "host", "match", "elastic.co"); Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline"); putPipelineRequest.setJsonEntity( @@ -156,14 +223,18 @@ public void testDeleteExistingPipeline() throws Exception { } public static String generatePolicySource(String index) throws IOException { - XContentBuilder source = jsonBuilder().startObject().startObject("match"); + return generatePolicySource(index, "host", "match"); + } + + public static String generatePolicySource(String index, String field, String type) throws IOException { + XContentBuilder source = jsonBuilder().startObject().startObject(type); { source.field("indices", index); if (randomBoolean()) { source.field("query", QueryBuilders.matchAllQuery()); } - source.field("match_field", "host"); - source.field("enrich_fields", new String[] { "globalRank", "tldRank", "tld" }); + source.field("match_field", field); + source.field("enrich_fields", new String[]{"globalRank", "tldRank", "tld"}); } source.endObject().endObject(); return Strings.toString(source); @@ -179,7 +250,13 @@ public static String createSourceIndexMapping() { + "{\"host\": {\"type\":\"keyword\"}," + "\"globalRank\":{\"type\":\"keyword\"}," + "\"tldRank\":{\"type\":\"keyword\"}," - + "\"tld\":{\"type\":\"keyword\"}" + + "\"tld\":{\"type\":\"keyword\"}," + + "\"date\":{\"type\":\"date_range\"" + (randomBoolean() ? "" : ", \"format\": \"yyyy-MM-dd\"") + "}," + + "\"integer\":{\"type\":\"integer_range\"}," + + "\"long\":{\"type\":\"long_range\"}," + + "\"double\":{\"type\":\"double_range\"}," + + "\"float\":{\"type\":\"float_range\"}," + + "\"ip\":{\"type\":\"ip_range\"}" + "}"; } @@ -207,8 +284,8 @@ private static void verifyEnrichMonitoring() throws IOException { List hits = (List) XContentMapValues.extractValue("hits.hits", response); assertThat(hits.size(), greaterThanOrEqualTo(1)); - for (int i = 0; i < hits.size(); i++) { - Map hit = (Map) hits.get(i); + for (Object o : hits) { + Map hit = (Map) o; int foundRemoteRequestsTotal = (int) XContentMapValues.extractValue( "_source.enrich_coordinator_stats.remote_requests_total", diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java index db7f5bbd026cd..39fac55628c31 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ObjectPath; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -56,8 +57,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.function.LongSupplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN; @@ -123,10 +127,17 @@ public void run() { l.onFailure(e); return; } - prepareAndCreateEnrichIndex(); + prepareAndCreateEnrichIndex(toMappings(getIndexResponse)); })); } + private List> toMappings(GetIndexResponse response) { + return StreamSupport.stream(response.mappings().values().spliterator(), false) + .map(cursor -> cursor.value) + .map(MappingMetadata::getSourceAsMap) + .collect(Collectors.toList()); + } + private Map getMappings(final GetIndexResponse getIndexResponse, final String sourceIndexName) { ImmutableOpenMap mappings = getIndexResponse.mappings(); MappingMetadata indexMapping = mappings.get(sourceIndexName); @@ -232,19 +243,84 @@ private static void validateField(Map properties, String fieldName, boolea } } - private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { - // Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type - final String keyType; - final CheckedFunction matchFieldMapping; + private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy, final List> mappings) { if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) { - matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false); - // No need to also configure index_options, because keyword type defaults to 'docs'. + return createEnrichMappingBuilder((builder) -> builder.field("type", "keyword").field("doc_values", false)); + } else if (EnrichPolicy.RANGE_TYPE.equals(policy.getType())) { + return createRangeEnrichMappingBuilder(policy, mappings); } else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) { - matchFieldMapping = (builder) -> builder.field("type", "geo_shape"); + return createEnrichMappingBuilder((builder) -> builder.field("type", "geo_shape")); } else { throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType()); } + } + + private XContentBuilder createRangeEnrichMappingBuilder(EnrichPolicy policy, List> mappings) { + String matchFieldPath = "properties." + policy.getMatchField().replace(".", ".properties."); + List> matchFieldMappings = mappings.stream() + .map(map -> ObjectPath.>eval(matchFieldPath, map)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + Set types = matchFieldMappings.stream().map(map -> map.get("type")).collect(Collectors.toSet()); + if (types.size() == 1) { + String type = types.iterator().next(); + switch (type) { + case "integer_range": + case "float_range": + case "long_range": + case "double_range": + case "ip_range": + return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false)); + + // date_range types mappings allow for the format to be specified, should be preserved in the created index + case "date_range": + Set formatEntries = matchFieldMappings.stream().map(map -> map.get("format")).collect(Collectors.toSet()); + if (formatEntries.size() == 1) { + return createEnrichMappingBuilder((builder) -> { + builder.field("type", type).field("doc_values", false); + String format = formatEntries.iterator().next(); + if (format != null) { + builder.field("format", format); + } + return builder; + }); + } + if (formatEntries.isEmpty()) { + // no format specify rely on default + return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false)); + } + throw new ElasticsearchException( + "Multiple distinct date format specified for match field '{}' - indices({}) format entries({})", + policy.getMatchField(), + Strings.collectionToCommaDelimitedString(policy.getIndices()), + (formatEntries.contains(null) ? "(DEFAULT), " : "") + Strings.collectionToCommaDelimitedString(formatEntries) + ); + + default: + throw new ElasticsearchException( + "Field '{}' has type [{}] which doesn't appear to be a range type", + policy.getMatchField(), + type + ); + } + } + if (types.isEmpty()) { + throw new ElasticsearchException( + "No mapping type found for match field '{}' - indices({})", + policy.getMatchField(), + Strings.collectionToCommaDelimitedString(policy.getIndices()) + ); + } + throw new ElasticsearchException( + "Multiple distinct mapping types for match field '{}' - indices({}) types({})", + policy.getMatchField(), + Strings.collectionToCommaDelimitedString(policy.getIndices()), + Strings.collectionToCommaDelimitedString(types) + ); + } + private XContentBuilder createEnrichMappingBuilder(CheckedFunction matchFieldMapping) { // Enable _source on enrich index. Explicitly mark key mapping type. try { XContentBuilder builder = JsonXContent.contentBuilder(); @@ -283,7 +359,7 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) { } } - private void prepareAndCreateEnrichIndex() { + private void prepareAndCreateEnrichIndex(List> mappings) { long nowTimestamp = nowSupplier.getAsLong(); String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp; Settings enrichIndexSettings = Settings.builder() @@ -295,7 +371,7 @@ private void prepareAndCreateEnrichIndex() { .put("index.warmer.enabled", false) .build(); CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings); - createEnrichIndexRequest.mapping(resolveEnrichMapping(policy)); + createEnrichIndexRequest.mapping(resolveEnrichMapping(policy, mappings)); logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName); enrichOriginClient().admin() .indices() diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index 61f4413d48054..2b7a426f74ce9 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -81,6 +81,7 @@ public Processor create(Map processorFactories, Strin BiConsumer> searchRunner = createSearchRunner(client, enrichCache); switch (policyType) { case EnrichPolicy.MATCH_TYPE: + case EnrichPolicy.RANGE_TYPE: return new MatchProcessor( tag, description, diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java index 3e71108541361..1683d579b21f5 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java @@ -140,12 +140,7 @@ public void testRunner() throws Exception { // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); @@ -217,12 +212,7 @@ public void testRunnerGeoMatchType() throws Exception { // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); @@ -255,6 +245,186 @@ public void testRunnerGeoMatchType() throws Exception { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } + public void testRunnerIntegerRangeMatchType() throws Exception { + testNumberRangeMatchType("integer"); + } + + public void testRunnerLongRangeMatchType() throws Exception { + testNumberRangeMatchType("long"); + } + + public void testRunnerFloatRangeMatchType() throws Exception { + testNumberRangeMatchType("float"); + } + + public void testRunnerDoubleRangeMatchType() throws Exception { + testNumberRangeMatchType("double"); + } + + private void testNumberRangeMatchType(String rangeType) throws Exception { + final String sourceIndex = "source-index"; + createIndex(sourceIndex, Settings.EMPTY, "_doc", "range", "type=" + rangeType + "_range"); + IndexResponse indexRequest = client().index( + new IndexRequest().index(sourceIndex) + .id("id") + .source("{" + "\"range\":" + "{" + "\"gt\":1," + "\"lt\":10" + "}," + "\"zipcode\":90210" + "}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("range"), is(equalTo(Map.of("lt", 10, "gt", 1)))); + assertThat(sourceDocMap.get("zipcode"), is(equalTo(90210))); + + List enrichFields = List.of("zipcode"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.RANGE_TYPE, null, List.of(sourceIndex), "range", enrichFields, null); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + validateMappingMetadata(mapping, policyName, policy); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map field1 = (Map) properties.get("range"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo(rangeType + "_range"))); + assertEquals(Boolean.FALSE, field1.get("doc_values")); + + // Validate document structure + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(2))); + assertThat(enrichDocument.get("range"), is(equalTo(Map.of("lt", 10, "gt", 1)))); + assertThat(enrichDocument.get("zipcode"), is(equalTo(90210))); + + // Validate segments + validateSegments(createdEnrichIndex, 1); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + + private GetIndexResponse getGetIndexResponseAndCheck(String createdEnrichIndex) { + GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); + assertThat(enrichIndex.getIndices().length, equalTo(1)); + assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); + Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); + assertNotNull(settings); + assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + return enrichIndex; + } + + public void testRunnerRangeTypeWithIpRange() throws Exception { + final String sourceIndexName = "source-index"; + createIndex(sourceIndexName, Settings.EMPTY, "_doc", "subnet", "type=ip_range"); + IndexResponse indexRequest = client().index( + new IndexRequest().index(sourceIndexName) + .id("id") + .source("{" + "\"subnet\":" + "\"10.0.0.0/8\"," + "\"department\":\"research\"" + "}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + GetIndexResponse sourceIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(sourceIndexName)).actionGet(); + // Validate Mapping + Map sourceIndexMapping = sourceIndex.getMappings().get(sourceIndexName).sourceAsMap(); + Map sourceIndexProperties = (Map) sourceIndexMapping.get("properties"); + Map subnetField = (Map) sourceIndexProperties.get("subnet"); + assertNotNull(subnetField); + assertThat(subnetField.get("type"), is(equalTo("ip_range"))); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndexName).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("subnet"), is(equalTo("10.0.0.0/8"))); + assertThat(sourceDocMap.get("department"), is(equalTo("research"))); + + List enrichFields = List.of("department"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.RANGE_TYPE, null, List.of(sourceIndexName), "subnet", enrichFields); + String policyName = "test1"; + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + validateMappingMetadata(mapping, policyName, policy); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map field1 = (Map) properties.get("subnet"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo("ip_range"))); + assertThat(field1.get("doc_values"), is(false)); + + // Validate document structure and lookup of element in range + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1").source( + SearchSourceBuilder.searchSource().query(QueryBuilders.matchQuery("subnet", "10.0.0.1")) + ) + ).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(2))); + assertThat(enrichDocument.get("subnet"), is(equalTo("10.0.0.0/8"))); + assertThat(enrichDocument.get("department"), is(equalTo("research"))); + + // Validate segments + validateSegments(createdEnrichIndex, 1); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + public void testRunnerMultiSource() throws Exception { String baseSourceName = "source-index-"; int numberOfSourceIndices = 3; @@ -319,12 +489,7 @@ public void testRunnerMultiSource() throws Exception { // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); @@ -431,12 +596,7 @@ public void testRunnerMultiSourceDocIdCollisions() throws Exception { // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); @@ -541,12 +701,7 @@ public void testRunnerMultiSourceEnrichKeyCollisions() throws Exception { // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); @@ -839,12 +994,7 @@ public void testRunnerObjectSourceMapping() throws Exception { // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); @@ -957,12 +1107,7 @@ public void testRunnerExplicitObjectSourceMapping() throws Exception { // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); @@ -1004,21 +1149,20 @@ public void testRunnerExplicitObjectSourceMapping() throws Exception { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } - public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { + public void testRunnerExplicitObjectSourceMappingRangePolicy() throws Exception { final String sourceIndex = "source-index"; XContentBuilder mappingBuilder = JsonXContent.contentBuilder(); mappingBuilder.startObject() .startObject(MapperService.SINGLE_MAPPING_NAME) .startObject("properties") .startObject("data") + .field("type", "object") .startObject("properties") - .startObject("fields") - .startObject("properties") - .startObject("field1") - .field("type", "keyword") + .startObject("subnet") + .field("type", "ip_range") .endObject() - .startObject("field2") - .field("type", "integer") + .startObject("department") + .field("type", "keyword") .endObject() .startObject("field3") .field("type", "keyword") @@ -1027,8 +1171,6 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { .endObject() .endObject() .endObject() - .endObject() - .endObject() .endObject(); CreateIndexResponse createResponse = client().admin() .indices() @@ -1042,12 +1184,10 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { .source( "{" + "\"data\":{" - + "\"fields\":{" - + "\"field1\":\"value1\"," - + "\"field2\":2," + + "\"subnet\":\"10.0.0.0/8\"," + + "\"department\":\"research\"," + "\"field3\":\"ignored\"" + "}" - + "}" + "}", XContentType.JSON ) @@ -1063,15 +1203,13 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { assertNotNull(sourceDocMap); Map dataField = ((Map) sourceDocMap.get("data")); assertNotNull(dataField); - Map fieldsField = ((Map) dataField.get("fields")); - assertNotNull(fieldsField); - assertThat(fieldsField.get("field1"), is(equalTo("value1"))); - assertThat(fieldsField.get("field2"), is(equalTo(2))); - assertThat(fieldsField.get("field3"), is(equalTo("ignored"))); + assertThat(dataField.get("subnet"), is(equalTo("10.0.0.0/8"))); + assertThat(dataField.get("department"), is(equalTo("research"))); + assertThat(dataField.get("field3"), is(equalTo("ignored"))); String policyName = "test1"; - List enrichFields = List.of("data.fields.field2", "missingField"); - EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.fields.field1", enrichFields); + List enrichFields = List.of("data.department", "missingField"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.RANGE_TYPE, null, List.of(sourceIndex), "data.subnet", enrichFields); final long createTime = randomNonNegativeLong(); final AtomicReference exception = new AtomicReference<>(); @@ -1088,12 +1226,7 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); @@ -1108,19 +1241,15 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { Map dataProperties = (Map) data.get("properties"); assertNotNull(dataProperties); assertThat(dataProperties.size(), is(equalTo(1))); - Map fields = (Map) dataProperties.get("fields"); - assertNotNull(fields); - assertThat(fields.size(), is(equalTo(1))); - Map fieldsProperties = (Map) fields.get("properties"); - assertNotNull(fieldsProperties); - assertThat(fieldsProperties.size(), is(equalTo(1))); - Map field1 = (Map) fieldsProperties.get("field1"); + Map field1 = (Map) dataProperties.get("subnet"); assertNotNull(field1); - assertThat(field1.get("type"), is(equalTo("keyword"))); + assertThat(field1.get("type"), is(equalTo("ip_range"))); assertThat(field1.get("doc_values"), is(false)); SearchResponse enrichSearchResponse = client().search( - new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + new SearchRequest(".enrich-test1").source( + SearchSourceBuilder.searchSource().query(QueryBuilders.matchQuery("data.subnet", "10.0.0.1")) + ) ).actionGet(); assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); @@ -1129,12 +1258,10 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { assertThat(enrichDocument.size(), is(equalTo(1))); Map resultDataField = ((Map) enrichDocument.get("data")); assertNotNull(resultDataField); - Map resultFieldsField = ((Map) resultDataField.get("fields")); - assertNotNull(resultFieldsField); - assertThat(resultFieldsField.size(), is(equalTo(2))); - assertThat(resultFieldsField.get("field1"), is(equalTo("value1"))); - assertThat(resultFieldsField.get("field2"), is(equalTo(2))); - assertNull(resultFieldsField.get("field3")); + assertThat(resultDataField.size(), is(equalTo(2))); + assertThat(resultDataField.get("subnet"), is(equalTo("10.0.0.0/8"))); + assertThat(resultDataField.get("department"), is(equalTo("research"))); + assertNull(resultDataField.get("field3")); // Validate segments validateSegments(createdEnrichIndex, 1); @@ -1143,23 +1270,31 @@ public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { ensureEnrichIndexIsReadOnly(createdEnrichIndex); } - public void testRunnerDottedKeyNameSourceMapping() throws Exception { + public void testRunnerTwoObjectLevelsSourceMapping() throws Exception { final String sourceIndex = "source-index"; XContentBuilder mappingBuilder = JsonXContent.contentBuilder(); mappingBuilder.startObject() .startObject(MapperService.SINGLE_MAPPING_NAME) .startObject("properties") - .startObject("data.field1") + .startObject("data") + .startObject("properties") + .startObject("fields") + .startObject("properties") + .startObject("field1") .field("type", "keyword") .endObject() - .startObject("data.field2") + .startObject("field2") .field("type", "integer") .endObject() - .startObject("data.field3") + .startObject("field3") .field("type", "keyword") .endObject() .endObject() .endObject() + .endObject() + .endObject() + .endObject() + .endObject() .endObject(); CreateIndexResponse createResponse = client().admin() .indices() @@ -1170,7 +1305,18 @@ public void testRunnerDottedKeyNameSourceMapping() throws Exception { IndexResponse indexRequest = client().index( new IndexRequest().index(sourceIndex) .id("id") - .source("{" + "\"data.field1\":\"value1\"," + "\"data.field2\":2," + "\"data.field3\":\"ignored\"" + "}", XContentType.JSON) + .source( + "{" + + "\"data\":{" + + "\"fields\":{" + + "\"field1\":\"value1\"," + + "\"field2\":2," + + "\"field3\":\"ignored\"" + + "}" + + "}" + + "}", + XContentType.JSON + ) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) ).actionGet(); assertEquals(RestStatus.CREATED, indexRequest.status()); @@ -1181,10 +1327,420 @@ public void testRunnerDottedKeyNameSourceMapping() throws Exception { assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); assertNotNull(sourceDocMap); - assertThat(sourceDocMap.get("data.field1"), is(equalTo("value1"))); - assertThat(sourceDocMap.get("data.field2"), is(equalTo(2))); - assertThat(sourceDocMap.get("data.field3"), is(equalTo("ignored"))); - + Map dataField = ((Map) sourceDocMap.get("data")); + assertNotNull(dataField); + Map fieldsField = ((Map) dataField.get("fields")); + assertNotNull(fieldsField); + assertThat(fieldsField.get("field1"), is(equalTo("value1"))); + assertThat(fieldsField.get("field2"), is(equalTo(2))); + assertThat(fieldsField.get("field3"), is(equalTo("ignored"))); + + String policyName = "test1"; + List enrichFields = List.of("data.fields.field2", "missingField"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.fields.field1", enrichFields); + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + validateMappingMetadata(mapping, policyName, policy); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map data = (Map) properties.get("data"); + assertNotNull(data); + assertThat(data.size(), is(equalTo(1))); + Map dataProperties = (Map) data.get("properties"); + assertNotNull(dataProperties); + assertThat(dataProperties.size(), is(equalTo(1))); + Map fields = (Map) dataProperties.get("fields"); + assertNotNull(fields); + assertThat(fields.size(), is(equalTo(1))); + Map fieldsProperties = (Map) fields.get("properties"); + assertNotNull(fieldsProperties); + assertThat(fieldsProperties.size(), is(equalTo(1))); + Map field1 = (Map) fieldsProperties.get("field1"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo("keyword"))); + assertThat(field1.get("doc_values"), is(false)); + + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(1))); + Map resultDataField = ((Map) enrichDocument.get("data")); + assertNotNull(resultDataField); + Map resultFieldsField = ((Map) resultDataField.get("fields")); + assertNotNull(resultFieldsField); + assertThat(resultFieldsField.size(), is(equalTo(2))); + assertThat(resultFieldsField.get("field1"), is(equalTo("value1"))); + assertThat(resultFieldsField.get("field2"), is(equalTo(2))); + assertNull(resultFieldsField.get("field3")); + + // Validate segments + validateSegments(createdEnrichIndex, 1); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + + public void testRunnerTwoObjectLevelsSourceMappingRangePolicy() throws Exception { + final String sourceIndex = "source-index"; + XContentBuilder mappingBuilder = JsonXContent.contentBuilder(); + mappingBuilder.startObject() + .startObject(MapperService.SINGLE_MAPPING_NAME) + .startObject("properties") + .startObject("data") + .startObject("properties") + .startObject("fields") + .startObject("properties") + .startObject("subnet") + .field("type", "ip_range") + .endObject() + .startObject("department") + .field("type", "keyword") + .endObject() + .startObject("field3") + .field("type", "keyword") + .endObject() + .endObject() + .endObject() + .endObject() + .endObject() + .endObject() + .endObject() + .endObject(); + CreateIndexResponse createResponse = client().admin() + .indices() + .create(new CreateIndexRequest(sourceIndex).mapping(mappingBuilder)) + .actionGet(); + assertTrue(createResponse.isAcknowledged()); + + IndexResponse indexRequest = client().index( + new IndexRequest().index(sourceIndex) + .id("id") + .source( + "{" + + "\"data\":{" + + "\"fields\":{" + + "\"subnet\":\"10.0.0.0/8\"," + + "\"department\":\"research\"," + + "\"field3\":\"ignored\"" + + "}" + + "}" + + "}", + XContentType.JSON + ) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + Map dataField = ((Map) sourceDocMap.get("data")); + assertNotNull(dataField); + Map fieldsField = ((Map) dataField.get("fields")); + assertNotNull(fieldsField); + assertThat(fieldsField.get("subnet"), is(equalTo("10.0.0.0/8"))); + assertThat(fieldsField.get("department"), is(equalTo("research"))); + assertThat(fieldsField.get("field3"), is(equalTo("ignored"))); + + String policyName = "test1"; + List enrichFields = List.of("data.fields.department", "missingField"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.RANGE_TYPE, null, List.of(sourceIndex), "data.fields.subnet", enrichFields); + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + validateMappingMetadata(mapping, policyName, policy); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map data = (Map) properties.get("data"); + assertNotNull(data); + assertThat(data.size(), is(equalTo(1))); + Map dataProperties = (Map) data.get("properties"); + assertNotNull(dataProperties); + assertThat(dataProperties.size(), is(equalTo(1))); + Map fields = (Map) dataProperties.get("fields"); + assertNotNull(fields); + assertThat(fields.size(), is(equalTo(1))); + Map fieldsProperties = (Map) fields.get("properties"); + assertNotNull(fieldsProperties); + assertThat(fieldsProperties.size(), is(equalTo(1))); + Map field1 = (Map) fieldsProperties.get("subnet"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo("ip_range"))); + assertThat(field1.get("doc_values"), is(false)); + + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1").source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(1))); + Map resultDataField = ((Map) enrichDocument.get("data")); + assertNotNull(resultDataField); + Map resultFieldsField = ((Map) resultDataField.get("fields")); + assertNotNull(resultFieldsField); + assertThat(resultFieldsField.size(), is(equalTo(2))); + assertThat(resultFieldsField.get("subnet"), is(equalTo("10.0.0.0/8"))); + assertThat(resultFieldsField.get("department"), is(equalTo("research"))); + assertNull(resultFieldsField.get("field3")); + + // Validate segments + validateSegments(createdEnrichIndex, 1); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + + public void testRunnerTwoObjectLevelsSourceMappingDateRangeWithFormat() throws Exception { + final String sourceIndex = "source-index"; + XContentBuilder mappingBuilder = JsonXContent.contentBuilder(); + mappingBuilder.startObject() + .startObject(MapperService.SINGLE_MAPPING_NAME) + .startObject("properties") + .startObject("data") + .startObject("properties") + .startObject("fields") + .startObject("properties") + .startObject("period") + .field("type", "date_range") + .field("format", "yyyy'/'MM'/'dd' at 'HH':'mm||strict_date_time") + .endObject() + .startObject("status") + .field("type", "keyword") + .endObject() + .startObject("field3") + .field("type", "keyword") + .endObject() + .endObject() + .endObject() + .endObject() + .endObject() + .endObject() + .endObject() + .endObject(); + CreateIndexResponse createResponse = client().admin() + .indices() + .create(new CreateIndexRequest(sourceIndex).mapping(mappingBuilder)) + .actionGet(); + assertTrue(createResponse.isAcknowledged()); + + IndexResponse indexRequest = client().index( + new IndexRequest().index(sourceIndex) + .id("id") + .source( + "{" + + "\"data\":{" + + "\"fields\":{" + + "\"period\": {" + + " \"gte\" : \"2021/08/20 at 12:00\"," + + " \"lte\" : \"2021/08/28 at 23:00\"" + + "}," + + "\"status\":\"enrolled\"," + + "\"field3\":\"ignored\"" + + "}" + + "}" + + "}", + XContentType.JSON + ) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + Map dataField = ((Map) sourceDocMap.get("data")); + assertNotNull(dataField); + Map fieldsField = ((Map) dataField.get("fields")); + assertNotNull(fieldsField); + Map periodField = ((Map) fieldsField.get("period")); + assertNotNull(periodField); + assertThat(periodField.get("gte"), is(equalTo("2021/08/20 at 12:00"))); + assertThat(periodField.get("lte"), is(equalTo("2021/08/28 at 23:00"))); + assertThat(fieldsField.get("status"), is(equalTo("enrolled"))); + assertThat(fieldsField.get("field3"), is(equalTo("ignored"))); + + String policyName = "test1"; + List enrichFields = List.of("data.fields.status", "missingField"); + EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.RANGE_TYPE, null, List.of(sourceIndex), "data.fields.period", enrichFields); + + final long createTime = randomNonNegativeLong(); + final AtomicReference exception = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + ActionListener listener = createTestListener(latch, exception::set); + EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(policyName, policy, listener, createTime); + + logger.info("Starting policy run"); + enrichPolicyRunner.run(); + latch.await(); + if (exception.get() != null) { + throw exception.get(); + } + + // Validate Index definition + String createdEnrichIndex = ".enrich-test1-" + createTime; + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); + + // Validate Mapping + Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); + validateMappingMetadata(mapping, policyName, policy); + assertThat(mapping.get("dynamic"), is("false")); + Map properties = (Map) mapping.get("properties"); + assertNotNull(properties); + assertThat(properties.size(), is(equalTo(1))); + Map data = (Map) properties.get("data"); + assertNotNull(data); + assertThat(data.size(), is(equalTo(1))); + Map dataProperties = (Map) data.get("properties"); + assertNotNull(dataProperties); + assertThat(dataProperties.size(), is(equalTo(1))); + Map fields = (Map) dataProperties.get("fields"); + assertNotNull(fields); + assertThat(fields.size(), is(equalTo(1))); + Map fieldsProperties = (Map) fields.get("properties"); + assertNotNull(fieldsProperties); + assertThat(fieldsProperties.size(), is(equalTo(1))); + Map field1 = (Map) fieldsProperties.get("period"); + assertNotNull(field1); + assertThat(field1.get("type"), is(equalTo("date_range"))); + assertThat(field1.get("doc_values"), is(false)); + + SearchResponse enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1").source( + SearchSourceBuilder.searchSource().query(QueryBuilders.matchQuery("data.fields.period", "2021-08-19T14:00:00Z")) + ) + ).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(0L)); + + enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1").source( + SearchSourceBuilder.searchSource().query(QueryBuilders.matchQuery("data.fields.period", "2021-08-20T14:00:00Z")) + ) + ).actionGet(); + + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map enrichDocument = enrichSearchResponse.getHits().iterator().next().getSourceAsMap(); + assertNotNull(enrichDocument); + assertThat(enrichDocument.size(), is(equalTo(1))); + Map resultDataField = ((Map) enrichDocument.get("data")); + assertNotNull(resultDataField); + Map resultFieldsField = ((Map) resultDataField.get("fields")); + assertNotNull(resultFieldsField); + assertThat(resultFieldsField.size(), is(equalTo(2))); + Map resultsPeriodField = ((Map) resultFieldsField.get("period")); + assertNotNull(periodField); + assertThat(resultsPeriodField.get("gte"), is(equalTo("2021/08/20 at 12:00"))); + assertThat(resultsPeriodField.get("lte"), is(equalTo("2021/08/28 at 23:00"))); + assertThat(resultFieldsField.get("status"), is(equalTo("enrolled"))); + assertNull(resultFieldsField.get("field3")); + + enrichSearchResponse = client().search( + new SearchRequest(".enrich-test1").source( + SearchSourceBuilder.searchSource().query(QueryBuilders.matchQuery("data.fields.period", "2021/08/20 at 14:00")) + ) + ).actionGet(); + assertThat(enrichSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + + // Validate segments + validateSegments(createdEnrichIndex, 1); + + // Validate Index is read only + ensureEnrichIndexIsReadOnly(createdEnrichIndex); + } + + public void testRunnerDottedKeyNameSourceMapping() throws Exception { + final String sourceIndex = "source-index"; + XContentBuilder mappingBuilder = JsonXContent.contentBuilder(); + mappingBuilder.startObject() + .startObject(MapperService.SINGLE_MAPPING_NAME) + .startObject("properties") + .startObject("data.field1") + .field("type", "keyword") + .endObject() + .startObject("data.field2") + .field("type", "integer") + .endObject() + .startObject("data.field3") + .field("type", "keyword") + .endObject() + .endObject() + .endObject() + .endObject(); + CreateIndexResponse createResponse = client().admin() + .indices() + .create(new CreateIndexRequest(sourceIndex).mapping(mappingBuilder)) + .actionGet(); + assertTrue(createResponse.isAcknowledged()); + + IndexResponse indexRequest = client().index( + new IndexRequest().index(sourceIndex) + .id("id") + .source("{" + "\"data.field1\":\"value1\"," + "\"data.field2\":2," + "\"data.field3\":\"ignored\"" + "}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + ).actionGet(); + assertEquals(RestStatus.CREATED, indexRequest.status()); + + SearchResponse sourceSearchResponse = client().search( + new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery())) + ).actionGet(); + assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L)); + Map sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap(); + assertNotNull(sourceDocMap); + assertThat(sourceDocMap.get("data.field1"), is(equalTo("value1"))); + assertThat(sourceDocMap.get("data.field2"), is(equalTo(2))); + assertThat(sourceDocMap.get("data.field3"), is(equalTo("ignored"))); + String policyName = "test1"; List enrichFields = List.of("data.field2", "missingField"); EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "data.field1", enrichFields); @@ -1204,12 +1760,7 @@ public void testRunnerDottedKeyNameSourceMapping() throws Exception { // Validate Index definition String createdEnrichIndex = ".enrich-test1-" + createTime; - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); @@ -1373,12 +1924,7 @@ protected void ensureSingleSegment(String destinationIndexName, int attempt) { assertThat(forceMergeAttempts.get(), equalTo(2)); // Validate Index definition - GetIndexResponse enrichIndex = client().admin().indices().getIndex(new GetIndexRequest().indices(".enrich-test1")).actionGet(); - assertThat(enrichIndex.getIndices().length, equalTo(1)); - assertThat(enrichIndex.getIndices()[0], equalTo(createdEnrichIndex)); - Settings settings = enrichIndex.getSettings().get(createdEnrichIndex); - assertNotNull(settings); - assertThat(settings.get("index.auto_expand_replicas"), is(equalTo("0-all"))); + GetIndexResponse enrichIndex = getGetIndexResponseAndCheck(createdEnrichIndex); // Validate Mapping Map mapping = enrichIndex.getMappings().get(createdEnrichIndex).sourceAsMap(); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreCrudTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreCrudTests.java index 88d6c172ab16e..3af9f45263d10 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreCrudTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichStoreCrudTests.java @@ -107,7 +107,10 @@ public void testPutValidation() throws Exception { IllegalArgumentException.class, () -> saveEnrichPolicy("name", invalidPolicy, clusterService) ); - assertThat(error.getMessage(), equalTo("unsupported policy type [unsupported_type], supported types are [match, geo_match]")); + assertThat( + error.getMessage(), + equalTo("unsupported policy type [unsupported_type], supported types are [match, geo_match, range]") + ); } }