Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow range types to be used for enrich matching #76110

Merged
merged 26 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
78f5d54
Allowing enrich to MATCH range types
mjmbischoff Aug 4, 2021
1efd225
Allowing enrich to MATCH range types - fixing unchecked cast
mjmbischoff Aug 4, 2021
bc7ad72
Allowing enrich to MATCH range types - fixing checkstyle rule violations
mjmbischoff Aug 4, 2021
4b1ad44
Allowing enrich to MATCH range types - running spotless
mjmbischoff Aug 4, 2021
d744f03
Splitting MATCH_POLICY into MATCH_POLICY / RANGE_POLICY
mjmbischoff Aug 22, 2021
b439d95
spotless fix
mjmbischoff Aug 22, 2021
1f6e090
fixing test
mjmbischoff Aug 22, 2021
e36daa9
spotless again
mjmbischoff Aug 22, 2021
f32d5d5
Merge branch 'elastic:master' into range_enrich
mjmbischoff Aug 22, 2021
46b213a
Added tests and fix for date_range format
mjmbischoff Aug 28, 2021
4da607d
spotless
mjmbischoff Aug 28, 2021
0dd200c
Merge branch 'elastic:master' into range_enrich
mjmbischoff Aug 28, 2021
910db97
Fixing review comments
mjmbischoff Sep 3, 2021
946e8e2
Merge branch 'elastic:master' into range_enrich
mjmbischoff Sep 6, 2021
8dc44eb
pulling in https://github.com/elastic/elasticsearch/pull/65781/files#…
mjmbischoff Sep 6, 2021
fc81fdf
pulling in https://github.com/elastic/elasticsearch/pull/65781/files#…
mjmbischoff Sep 6, 2021
e7bb2d8
fixing strict checking
mjmbischoff Sep 6, 2021
8acc5ee
Fixing test
mjmbischoff Sep 6, 2021
f4835fd
Dammit, use refactor.
mjmbischoff Sep 6, 2021
4e50367
Fixing tests
mjmbischoff Sep 6, 2021
44cab8e
Cleaning up IndexResponse handling
mjmbischoff Sep 7, 2021
b6e71e8
Eval returns null, filter null values
mjmbischoff Sep 7, 2021
bd3c620
Merge branch 'master' into range_enrich
elasticmachine Sep 7, 2021
114a452
switching clauses around to get happy flows on top.
mjmbischoff Sep 7, 2021
2924d2b
removing callback
mjmbischoff Sep 7, 2021
0866a8c
prettier exception description
mjmbischoff Sep 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ public final class EnrichPolicy implements Writeable, ToXContentFragment {

public static final String MATCH_TYPE = "match";
public static final String GEO_MATCH_TYPE = "geo_match";
public static final String RANGE_TYPE = "range";
public static final String[] SUPPORTED_POLICY_TYPES = new String[]{
MATCH_TYPE,
GEO_MATCH_TYPE
GEO_MATCH_TYPE,
RANGE_TYPE
};

private static final ParseField QUERY = new ParseField("query");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public abstract class CommonEnrichRestTestCase extends ESRestTestCase {

@After
public void deletePolicies() throws Exception {
Map<String, Object> responseMap = toMap(adminClient().performRequest(new Request("GET", "/_enrich/policy")));
@SuppressWarnings("unchecked")
List<Map<?, ?>> policies = (List<Map<?, ?>>) responseMap.get("policies");
List<Map<?, ?>> policies = unsafeGetProperty(responseMap, "policies");

for (Map<?, ?> entry : policies) {
client().performRequest(new Request("DELETE", "/_enrich/policy/" + XContentMapValues.extractValue("config.match.name", entry)));
Map<?, Map<String, ?>> config = unsafeGetProperty(entry, "config");
Map<String, ?> policy = config.values().iterator().next();
String endpoint = "/_enrich/policy/" + policy.get("name");
assertOK(client().performRequest(new Request("DELETE", endpoint)));

List<?> sourceIndices = (List<?>) XContentMapValues.extractValue("config.match.indices", entry);
List<?> sourceIndices = (List<?>) policy.get("indices");
for (Object sourceIndex : sourceIndices) {
try {
client().performRequest(new Request("DELETE", "/" + sourceIndex));
Expand All @@ -51,17 +54,49 @@ public void deletePolicies() throws Exception {
}
}

private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception {
@SuppressWarnings("unchecked")
private <Property> Property unsafeGetProperty(Map<?, ?> map, String key) {
return (Property) map.get(key);
}

private void setupGenericLifecycleTest(boolean deletePipeilne, String field, String type, String value) throws Exception {
// Create source index:
createSourceIndex("my-source-index");

// Create the policy:
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index"));
putPolicyRequest.setJsonEntity(generatePolicySource("my-source-index", field, type));
assertOK(client().performRequest(putPolicyRequest));

// Add entry to source index and then refresh:
Request indexRequest = new Request("PUT", "/my-source-index/_doc/elastic.co");
indexRequest.setJsonEntity("{\"host\": \"elastic.co\",\"globalRank\": 25,\"tldRank\": 7,\"tld\": \"co\"}");
indexRequest.setJsonEntity("{" +
"\"host\": \"elastic.co\"," +
"\"globalRank\": 25," +
"\"tldRank\": 7," +
"\"tld\": \"co\", " +
"\"date\": {" +
"\"gte\" : \"2021-09-05\"," +
"\"lt\" : \"2021-09-07\"" +
"}, " +
"\"integer\": {" +
"\"gte\" : 40," +
"\"lt\" : 42" +
"}, " +
"\"long\": {" +
"\"gte\" : 8000000," +
"\"lt\" : 9000000" +
"}, " +
"\"double\": {" +
"\"gte\" : 10.10," +
"\"lt\" : 20.20" +
"}, " +
"\"float\": {" +
"\"gte\" : 10000.5," +
"\"lt\" : 10000.7" +
"}, " +
"\"ip\": \"100.0.0.0/4\"" +
"}");
assertOK(client().performRequest(indexRequest));
Request refreshRequest = new Request("POST", "/my-source-index/_refresh");
assertOK(client().performRequest(refreshRequest));
Expand All @@ -73,34 +108,66 @@ private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception
// Create pipeline
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/my_pipeline");
putPipelineRequest.setJsonEntity(
"{\"processors\":[" + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\"host\",\"target_field\":\"entry\"}}" + "]}"
"{\"processors\":[" + "{\"enrich\":{\"policy_name\":\"my_policy\",\"field\":\""+field+"\",\"target_field\":\"entry\"}}" + "]}"
);
assertOK(client().performRequest(putPipelineRequest));

// Index document using pipeline with enrich processor:
indexRequest = new Request("PUT", "/my-index/_doc/1");
indexRequest.addParameter("pipeline", "my_pipeline");
indexRequest.setJsonEntity("{\"host\": \"elastic.co\"}");
indexRequest.setJsonEntity("{\""+field+"\": \""+value+"\"}");
assertOK(client().performRequest(indexRequest));

// Check if document has been enriched
Request getRequest = new Request("GET", "/my-index/_doc/1");
Map<String, Object> response = toMap(client().performRequest(getRequest));
Map<?, ?> entry = (Map<?, ?>) ((Map<?, ?>) response.get("_source")).get("entry");
assertThat(entry.size(), equalTo(4));
assertThat(entry.get("host"), equalTo("elastic.co"));
assertThat(entry.get(field), notNullValue());
assertThat(entry.get("tld"), equalTo("co"));
assertThat(entry.get("globalRank"), equalTo(25));
assertThat(entry.get("tldRank"), equalTo(7));
Object originalMatchValue = ((Map<?, ?>) response.get("_source")).get(field);
assertThat(originalMatchValue, equalTo(value));

if (deletePipeilne) {
// delete the pipeline so the policies can be deleted
client().performRequest(new Request("DELETE", "/_ingest/pipeline/my_pipeline"));
}
}

public void testBasicFlow() throws Exception {
setupGenericLifecycleTest(true);
public void testBasicFlowKeyword() throws Exception {
setupGenericLifecycleTest(true, "host", "match", "elastic.co");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowDate() throws Exception {
setupGenericLifecycleTest(true, "date", "range", "2021-09-06");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowInteger() throws Exception {
setupGenericLifecycleTest(true, "integer", "range", "41");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowLong() throws Exception {
setupGenericLifecycleTest(true, "long", "range", "8411017");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowDouble() throws Exception {
setupGenericLifecycleTest(true, "double", "range", "15.15");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowFloat() throws Exception {
setupGenericLifecycleTest(true, "float", "range", "10000.66666");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

public void testBasicFlowIp() throws Exception {
setupGenericLifecycleTest(true, "ip", "range", "100.120.140.160");
assertBusy(CommonEnrichRestTestCase::verifyEnrichMonitoring, 3, TimeUnit.MINUTES);
}

Expand Down Expand Up @@ -129,7 +196,7 @@ public void testDeleteIsCaseSensitive() throws Exception {

public void testDeleteExistingPipeline() throws Exception {
// lets not delete the pipeline at first, to test the failure
setupGenericLifecycleTest(false);
setupGenericLifecycleTest(false, "host", "match", "elastic.co");

Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline");
putPipelineRequest.setJsonEntity(
Expand All @@ -156,14 +223,18 @@ public void testDeleteExistingPipeline() throws Exception {
}

public static String generatePolicySource(String index) throws IOException {
XContentBuilder source = jsonBuilder().startObject().startObject("match");
return generatePolicySource(index, "host", "match");
}

public static String generatePolicySource(String index, String field, String type) throws IOException {
XContentBuilder source = jsonBuilder().startObject().startObject(type);
{
source.field("indices", index);
if (randomBoolean()) {
source.field("query", QueryBuilders.matchAllQuery());
}
source.field("match_field", "host");
source.field("enrich_fields", new String[] { "globalRank", "tldRank", "tld" });
source.field("match_field", field);
source.field("enrich_fields", new String[]{"globalRank", "tldRank", "tld"});
}
source.endObject().endObject();
return Strings.toString(source);
Expand All @@ -179,7 +250,13 @@ public static String createSourceIndexMapping() {
+ "{\"host\": {\"type\":\"keyword\"},"
+ "\"globalRank\":{\"type\":\"keyword\"},"
+ "\"tldRank\":{\"type\":\"keyword\"},"
+ "\"tld\":{\"type\":\"keyword\"}"
+ "\"tld\":{\"type\":\"keyword\"},"
+ "\"date\":{\"type\":\"date_range\"" + (randomBoolean() ? "" : ", \"format\": \"yyyy-MM-dd\"") + "},"
+ "\"integer\":{\"type\":\"integer_range\"},"
+ "\"long\":{\"type\":\"long_range\"},"
+ "\"double\":{\"type\":\"double_range\"},"
+ "\"float\":{\"type\":\"float_range\"},"
+ "\"ip\":{\"type\":\"ip_range\"}"
+ "}";
}

Expand Down Expand Up @@ -207,8 +284,8 @@ private static void verifyEnrichMonitoring() throws IOException {
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), greaterThanOrEqualTo(1));

for (int i = 0; i < hits.size(); i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
for (Object o : hits) {
Map<?, ?> hit = (Map<?, ?>) o;

int foundRemoteRequestsTotal = (int) XContentMapValues.extractValue(
"_source.enrich_coordinator_stats.remote_requests_total",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand All @@ -56,8 +57,11 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;

Expand Down Expand Up @@ -123,10 +127,17 @@ public void run() {
l.onFailure(e);
return;
}
prepareAndCreateEnrichIndex();
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
}));
}

private List<Map<String, Object>> toMappings(GetIndexResponse response) {
return StreamSupport.stream(response.mappings().values().spliterator(), false)
.map(cursor -> cursor.value)
.map(MappingMetadata::getSourceAsMap)
.collect(Collectors.toList());
}

private Map<String, Object> getMappings(final GetIndexResponse getIndexResponse, final String sourceIndexName) {
ImmutableOpenMap<String, MappingMetadata> mappings = getIndexResponse.mappings();
MappingMetadata indexMapping = mappings.get(sourceIndexName);
Expand Down Expand Up @@ -232,19 +243,84 @@ private static void validateField(Map<?, ?> properties, String fieldName, boolea
}
}

private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
// Currently the only supported policy type is EnrichPolicy.MATCH_TYPE, which is a keyword type
final String keyType;
final CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping;
private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy, final List<Map<String, Object>> mappings) {
if (EnrichPolicy.MATCH_TYPE.equals(policy.getType())) {
matchFieldMapping = (builder) -> builder.field("type", "keyword").field("doc_values", false);
// No need to also configure index_options, because keyword type defaults to 'docs'.
return createEnrichMappingBuilder((builder) -> builder.field("type", "keyword").field("doc_values", false));
} else if (EnrichPolicy.RANGE_TYPE.equals(policy.getType())) {
return createRangeEnrichMappingBuilder(policy, mappings);
} else if (EnrichPolicy.GEO_MATCH_TYPE.equals(policy.getType())) {
matchFieldMapping = (builder) -> builder.field("type", "geo_shape");
return createEnrichMappingBuilder((builder) -> builder.field("type", "geo_shape"));
} else {
throw new ElasticsearchException("Unrecognized enrich policy type [{}]", policy.getType());
}
}

private XContentBuilder createRangeEnrichMappingBuilder(EnrichPolicy policy, List<Map<String, Object>> mappings) {
String matchFieldPath = "properties." + policy.getMatchField().replace(".", ".properties.");
List<Map<String, String>> matchFieldMappings = mappings.stream()
.map(map -> ObjectPath.<Map<String, String>>eval(matchFieldPath, map))
.filter(Objects::nonNull)
.collect(Collectors.toList());

Set<String> types = matchFieldMappings.stream().map(map -> map.get("type")).collect(Collectors.toSet());
if (types.size() == 1) {
String type = types.iterator().next();
switch (type) {
case "integer_range":
case "float_range":
case "long_range":
case "double_range":
case "ip_range":
return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false));

// date_range types mappings allow for the format to be specified, should be preserved in the created index
case "date_range":
Set<String> formatEntries = matchFieldMappings.stream().map(map -> map.get("format")).collect(Collectors.toSet());
if (formatEntries.size() == 1) {
return createEnrichMappingBuilder((builder) -> {
builder.field("type", type).field("doc_values", false);
String format = formatEntries.iterator().next();
if (format != null) {
builder.field("format", format);
}
return builder;
});
}
if (formatEntries.isEmpty()) {
mjmbischoff marked this conversation as resolved.
Show resolved Hide resolved
// no format specify rely on default
return createEnrichMappingBuilder((builder) -> builder.field("type", type).field("doc_values", false));
}
throw new ElasticsearchException(
"Multiple distinct date format specified for match field '{}' - indices({}) format entries({})",
policy.getMatchField(),
Strings.collectionToCommaDelimitedString(policy.getIndices()),
(formatEntries.contains(null) ? "(DEFAULT), " : "") + Strings.collectionToCommaDelimitedString(formatEntries)
);

default:
throw new ElasticsearchException(
"Field '{}' has type [{}] which doesn't appear to be a range type",
policy.getMatchField(),
type
);
}
}
if (types.isEmpty()) {
throw new ElasticsearchException(
"No mapping type found for match field '{}' - indices({})",
policy.getMatchField(),
Strings.collectionToCommaDelimitedString(policy.getIndices())
);
}
throw new ElasticsearchException(
"Multiple distinct mapping types for match field '{}' - indices({}) types({})",
policy.getMatchField(),
Strings.collectionToCommaDelimitedString(policy.getIndices()),
Strings.collectionToCommaDelimitedString(types)
);
}

private XContentBuilder createEnrichMappingBuilder(CheckedFunction<XContentBuilder, XContentBuilder, IOException> matchFieldMapping) {
// Enable _source on enrich index. Explicitly mark key mapping type.
try {
XContentBuilder builder = JsonXContent.contentBuilder();
Expand Down Expand Up @@ -283,7 +359,7 @@ private XContentBuilder resolveEnrichMapping(final EnrichPolicy policy) {
}
}

private void prepareAndCreateEnrichIndex() {
private void prepareAndCreateEnrichIndex(List<Map<String, Object>> mappings) {
long nowTimestamp = nowSupplier.getAsLong();
String enrichIndexName = EnrichPolicy.getBaseName(policyName) + "-" + nowTimestamp;
Settings enrichIndexSettings = Settings.builder()
Expand All @@ -295,7 +371,7 @@ private void prepareAndCreateEnrichIndex() {
.put("index.warmer.enabled", false)
.build();
CreateIndexRequest createEnrichIndexRequest = new CreateIndexRequest(enrichIndexName, enrichIndexSettings);
mjmbischoff marked this conversation as resolved.
Show resolved Hide resolved
createEnrichIndexRequest.mapping(resolveEnrichMapping(policy));
createEnrichIndexRequest.mapping(resolveEnrichMapping(policy, mappings));
logger.debug("Policy [{}]: Creating new enrich index [{}]", policyName, enrichIndexName);
enrichOriginClient().admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner = createSearchRunner(client, enrichCache);
switch (policyType) {
case EnrichPolicy.MATCH_TYPE:
case EnrichPolicy.RANGE_TYPE:
return new MatchProcessor(
tag,
description,
Expand Down
Loading