Skip to content

Commit

Permalink
Allow range types to be used for enrich matching (#76110)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjmbischoff authored Sep 7, 2021
1 parent f170f6c commit c6d4dd8
Show file tree
Hide file tree
Showing 6 changed files with 840 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {

public static final String MATCH_TYPE = "match";
public static final String GEO_MATCH_TYPE = "geo_match";
public static final String RANGE_TYPE = "range";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{
MATCH_TYPE,
GEO_MATCH_TYPE
GEO_MATCH_TYPE,
RANGE_TYPE
};

private static final ParseField QUERY = new ParseField("query");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public abstract class CommonEnrichRestTestCase extends ESRestTestCase {

@After
public void deletePolicies() throws Exception {
Map<String, Object> responseMap = toMap(adminClient().performRequest(new Request("GET", "/_enrich/policy")));
@SuppressWarnings("unchecked")
List<Map<?, ?>> policies = (List<Map<?, ?>>) responseMap.get("policies");
List<Map<?, ?>> policies = unsafeGetProperty(responseMap, "policies");

for (Map<?, ?> entry : policies) {
client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("config.match.name", entry)));
Map<?, Map<String, ?>> config = unsafeGetProperty(entry, "config");
Map<String, ?> policy = config.values().iterator().next();
String endpoint = "/_enrich/policy/" + policy.get("name");
assertOK(client().performRequest(new Request("DELETE", endpoint)));

List<?> sourceIndices = (List<?>) XContentMapValues.extractValue("config.match.indices", entry);
List<?> sourceIndices = (List<?>) policy.get("indices");
for (Object sourceIndex : sourceIndices) {
try {
client().performRequest(new Request("DELETE", "/" + sourceIndex));
Expand All @@ -51,17 +54,49 @@ public void deletePolicies() throws Exception {
}
}

private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception {
@SuppressWarnings("unchecked")
private <Property> Property unsafeGetProperty(Map<?, ?> map, String key) {
return (Property) map.get(key);
}

private void setupGenericLifecycleTest(boolean deletePipeilne, String field, String type, String value) throws Exception {
// Create source index:
createSourceIndex("my-source-index");

// Create the policy:
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index", field, type));
assertOK(client().performRequest(putPolicyRequest));

// Add entry to source index and then refresh:
Request indexRequest = new Request("PUT", "/my-source-index/_doc/elastic.co");
indexRequest.setJsonEntity("{\"host\": \"elastic.co\",\"globalRank\": 25,\"tldRank\": 7,\"tld\": \"co\"}");
indexRequest.setJsonEntity("{" +
"\"host\": \"elastic.co\"," +
"\"globalRank\": 25," +
"\"tldRank\": 7," +
"\"tld\": \"co\", " +
"\"date\": {" +
"\"gte\" : \"2021-09-05\"," +
"\"lt\" : \"2021-09-07\"" +
"}, " +
"\"integer\": {" +
"\"gte\" : 40," +
"\"lt\" : 42" +
"}, " +
"\"long\": {" +
"\"gte\" : 8000000," +
"\"lt\" : 9000000" +
"}, " +
"\"double\": {" +
"\"gte\" : 10.10," +
"\"lt\" : 20.20" +
"}, " +
"\"float\": {" +
"\"gte\" : 10000.5," +
"\"lt\" : 10000.7" +
"}, " +
"\"ip\": \"100.0.0.0/4\"" +
"}");
assertOK(client().performRequest(indexRequest));
Request refreshRequest = new Request("POST", "/my-source-index/_refresh");
assertOK(client().performRequest(refreshRequest));
Expand All @@ -73,34 +108,66 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception
// Create pipeline
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline");
putPipelineRequest.setJsonEntity(
"{\"processors\":[" + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" + "]}"
"{\"processors\":[" + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\""+field+"\",\"target_field\":\"entry\"}}" + "]}"
);
assertOK(client().performRequest(putPipelineRequest));

// Index document using pipeline with enrich processor:
indexRequest = new Request("PUT", "/my-index/_doc/1");
indexRequest.addParameter("pipeline", "my_pipeline");
indexRequest.setJsonEntity("{\"host\": \"elastic.co\"}");
indexRequest.setJsonEntity("{\""+field+"\": \""+value+"\"}");
assertOK(client().performRequest(indexRequest));

// Check if document has been enriched
Request getRequest = new Request("GET", "/my-index/_doc/1");
Map<String, Object> response = toMap(client().performRequest(getRequest));
Map<?, ?> entry = (Map<?, ?>) ((Map<?, ?>) response.get("_source")).get("entry");
assertThat(entry.size(), equalTo(4));
assertThat(entry.get("host"), equalTo("elastic.co"));
assertThat(entry.get(field), notNullValue());
assertThat(entry.get("tld"), equalTo("co"));
assertThat(entry.get("globalRank"), equalTo(25));
assertThat(entry.get("tldRank"), equalTo(7));
Object originalMatchValue = ((Map<?, ?>) response.get("_source")).get(field);
assertThat(originalMatchValue, equalTo(value));

if (deletePipeilne) {
// delete the pipeline so the policies can be deleted
client().performRequest(new Request("DELETE", "/_ingest/pipeline/my_pipeline"));
}
}

public void testBasicFlow() throws Exception {
setupGenericLifecycleTest(true);
public void testBasicFlowKeyword() throws Exception {
setupGenericLifecycleTest(true, "host", "match", "elastic.co");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowDate() throws Exception {
setupGenericLifecycleTest(true, "date", "range", "2021-09-06");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowInteger() throws Exception {
setupGenericLifecycleTest(true, "integer", "range", "41");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowLong() throws Exception {
setupGenericLifecycleTest(true, "long", "range", "8411017");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowDouble() throws Exception {
setupGenericLifecycleTest(true, "double", "range", "15.15");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowFloat() throws Exception {
setupGenericLifecycleTest(true, "float", "range", "10000.66666");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowIp() throws Exception {
setupGenericLifecycleTest(true, "ip", "range", "100.120.140.160");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

Expand Down Expand Up @@ -129,7 +196,7 @@ public void testDeleteIsCaseSensitive() throws Exception {

public void testDeleteExistingPipeline() throws Exception {
// lets not delete the pipeline at first, to test the failure
setupGenericLifecycleTest(false);
setupGenericLifecycleTest(false, "host", "match", "elastic.co");

Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline");
putPipelineRequest.setJsonEntity(
Expand All @@ -156,14 +223,18 @@ public void testDeleteExistingPipeline() throws Exception {
}

public static String generatePolicySource(String index) throws IOException {
XContentBuilder source = jsonBuilder().startObject().startObject("match");
return generatePolicySource(index, "host", "match");
}

public static String generatePolicySource(String index, String field, String type) throws IOException {
XContentBuilder source = jsonBuilder().startObject().startObject(type);
{
source.field("indices", index);
if (randomBoolean()) {
source.field("query", QueryBuilders.matchAllQuery());
}
source.field("match_field", "host");
source.field("enrich_fields", new String[] { "globalRank", "tldRank", "tld" });
source.field("match_field", field);
source.field("enrich_fields", new String[]{"globalRank", "tldRank", "tld"});
}
source.endObject().endObject();
return Strings.toString(source);
Expand All @@ -179,7 +250,13 @@ public static String createSourceIndexMapping() {
+ "{\"host\": {\"type\":\"keyword\"},"
+ "\"globalRank\":{\"type\":\"keyword\"},"
+ "\"tldRank\":{\"type\":\"keyword\"},"
+ "\"tld\":{\"type\":\"keyword\"}"
+ "\"tld\":{\"type\":\"keyword\"},"
+ "\"date\":{\"type\":\"date_range\"" + (randomBoolean() ? "" : ", \"format\": \"yyyy-MM-dd\"") + "},"
+ "\"integer\":{\"type\":\"integer_range\"},"
+ "\"long\":{\"type\":\"long_range\"},"
+ "\"double\":{\"type\":\"double_range\"},"
+ "\"float\":{\"type\":\"float_range\"},"
+ "\"ip\":{\"type\":\"ip_range\"}"
+ "}";
}

Expand Down Expand Up @@ -207,8 +284,8 @@ private static void verifyEnrichMonitoring() throws IOException {
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), greaterThanOrEqualTo(1));

for (int i = 0; i < hits.size(); i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
for (Object o : hits) {
Map<?, ?> hit = (Map<?, ?>) o;

int foundRemoteRequestsTotal = (int) XContentMapValues.extractValue(
"_source.enrich_coordinator_stats.remote_requests_total",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand All @@ -56,8 +57,11 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;

Expand Down Expand Up @@ -123,10 +127,17 @@ public void run() {
l.onFailure(e);
return;
}
prepareAndCreateEnrichIndex();
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
}));
}

private List<Map<String, Object>> toMappings(GetIndexResponse response) {
return StreamSupport.stream(response.mappings().values().spliterator(), false)
.map(cursor -> cursor.value)
.map(MappingMetadata::getSourceAsMap)
.collect(Collectors.toList());
}

private Map<String, Object> getMappings(final GetIndexResponse getIndexResponse, final String sourceIndexName) {
ImmutableOpenMap<String, MappingMetadata> mappings = getIndexResponse.mappings();
MappingMetadata indexMapping = mappings.get(sourceIndexName);
Expand Down Expand Up @@ -232,19 +243,84 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea
}
}

private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
// Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
final String keyType;
final CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping;
private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy, final List<Map<String, Object>> mappings) {
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false);
// No need to also configure index_options, because keyword type defaults to 'docs'.
return createEnrichMappingBuilder((builder) -> builder.field("type", "keyword").field("doc_values", false));
} else if (EnrichPolicy.RANGE_TYPE.equals(policy.getType())) {
return createRangeEnrichMappingBuilder(policy, mappings);
} else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) {
matchFieldMapping = (builder) -> builder.field("type", "geo_shape");
return createEnrichMappingBuilder((builder) -> builder.field("type", "geo_shape"));
} else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
}
}

private XContentBuilder createRangeEnrichMappingBuilder(EnrichPolicy policy, List<Map<String, Object>> mappings) {
String matchFieldPath = "properties." + policy.getMatchField().replace(".", ".properties.");
List<Map<String, String>> matchFieldMappings = mappings.stream()
.map(map -> ObjectPath.<Map<String, String>>eval(matchFieldPath, map))
.filter(Objects::nonNull)
.collect(Collectors.toList());

Set<String> types = matchFieldMappings.stream().map(map -> map.get("type")).collect(Collectors.toSet());
if (types.size() == 1) {
String type = types.iterator().next();
switch (type) {
case "integer_range":
case "float_range":
case "long_range":
case "double_range":
case "ip_range":
return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false));

// date_range types mappings allow for the format to be specified, should be preserved in the created index
case "date_range":
Set<String> formatEntries = matchFieldMappings.stream().map(map -> map.get("format")).collect(Collectors.toSet());
if (formatEntries.size() == 1) {
return createEnrichMappingBuilder((builder) -> {
builder.field("type", type).field("doc_values", false);
String format = formatEntries.iterator().next();
if (format != null) {
builder.field("format", format);
}
return builder;
});
}
if (formatEntries.isEmpty()) {
// no format specify rely on default
return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false));
}
throw new ElasticsearchException(
"Multiple distinct date format specified for match field '{}' - indices({}) format entries({})",
policy.getMatchField(),
Strings.collectionToCommaDelimitedString(policy.getIndices()),
(formatEntries.contains(null) ? "(DEFAULT), " : "") + Strings.collectionToCommaDelimitedString(formatEntries)
);

default:
throw new ElasticsearchException(
"Field '{}' has type [{}] which doesn't appear to be a range type",
policy.getMatchField(),
type
);
}
}
if (types.isEmpty()) {
throw new ElasticsearchException(
"No mapping type found for match field '{}' - indices({})",
policy.getMatchField(),
Strings.collectionToCommaDelimitedString(policy.getIndices())
);
}
throw new ElasticsearchException(
"Multiple distinct mapping types for match field '{}' - indices({}) types({})",
policy.getMatchField(),
Strings.collectionToCommaDelimitedString(policy.getIndices()),
Strings.collectionToCommaDelimitedString(types)
);
}

private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping) {
// Enable _source on enrich index. Explicitly mark key mapping type.
try {
XContentBuilder builder = JsonXContent.contentBuilder();
Expand Down Expand Up @@ -283,7 +359,7 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
}
}

private void prepareAndCreateEnrichIndex() {
private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings) {
long nowTimestamp = nowSupplier.getAsLong();
String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
Settings enrichIndexSettings = Settings.builder()
Expand All @@ -295,7 +371,7 @@ private void prepareAndCreateEnrichIndex() {
.put("index.warmer.enabled", false)
.build();
CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings);
createEnrichIndexRequest.mapping(resolveEnrichMapping(policy));
createEnrichIndexRequest.mapping(resolveEnrichMapping(policy, mappings));
logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName);
enrichOriginClient().admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner = createSearchRunner(client, enrichCache);
switch (policyType) {
case EnrichPolicy.MATCH_TYPE:
case EnrichPolicy.RANGE_TYPE:
return new MatchProcessor(
tag,
description,
Expand Down
Loading

0 comments on commit c6d4dd8

Please sign in to comment.