Skip to content

Commit

Permalink
ingest: Introduce the dissect processor (elastic#32884)
Browse files Browse the repository at this point in the history
The ingest node dissect processor is an alternative to Grok
to split a string based on a pattern. Dissect differs from
Grok such that regular expressions are not used to split the
string.

Dissect can be used to parse a source text field with a
simpler pattern, and is often faster the Grok for basic string
parsing. This processor uses the dissect library which
does most of the work.
  • Loading branch information
jakelandis committed Sep 5, 2018
1 parent 1722bc1 commit d1a9a14
Show file tree
Hide file tree
Showing 8 changed files with 688 additions and 121 deletions.
431 changes: 312 additions & 119 deletions docs/reference/ingest/ingest-node.asciidoc

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion libs/dissect/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies {
}

forbiddenApisMain {
signaturesURLs = [PrecommitTasks.getResource('/forbidden/jdk-signatures.txt')]
replaceSignatureFiles 'jdk-signatures'
}

if (isEclipse) {
Expand Down
1 change: 1 addition & 0 deletions modules/ingest-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ esplugin {
dependencies {
compileOnly project(':modules:lang-painless')
compile project(':libs:grok')
compile project(':libs:dissect')
}

compileJava.options.compilerArgs << "-Xlint:-unchecked,-rawtypes"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.dissect.DissectParser;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.Map;

public final class DissectProcessor extends AbstractProcessor {

public static final String TYPE = "dissect";
//package private members for testing
final String field;
final boolean ignoreMissing;
final String pattern;
final String appendSeparator;
final DissectParser dissectParser;

DissectProcessor(String tag, String field, String pattern, String appendSeparator, boolean ignoreMissing) {
super(tag);
this.field = field;
this.ignoreMissing = ignoreMissing;
this.pattern = pattern;
this.appendSeparator = appendSeparator;
this.dissectParser = new DissectParser(pattern, appendSeparator);
}

@Override
public void execute(IngestDocument ingestDocument) {
String input = ingestDocument.getFieldValue(field, String.class, ignoreMissing);
if (input == null && ignoreMissing) {
return;
} else if (input == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot process it.");
}
dissectParser.parse(input).forEach(ingestDocument::setFieldValue);
}

@Override
public String getType() {
return TYPE;
}

public static final class Factory implements Processor.Factory {

@Override
public DissectProcessor create(Map<String, Processor.Factory> registry, String processorTag, Map<String, Object> config) {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
String pattern = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pattern");
String appendSeparator = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "append_separator", "");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new DissectProcessor(processorTag, field, pattern, appendSeparator, ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory());
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory());
return Collections.unmodifiableMap(processors);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.dissect.DissectException;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.is;

public class DissectProcessorFactoryTests extends ESTestCase {

public void testCreate() {
DissectProcessor.Factory factory = new DissectProcessor.Factory();
String fieldName = RandomDocumentPicks.randomFieldName(random());
String processorTag = randomAlphaOfLength(10);
String pattern = "%{a},%{b},%{c}";
String appendSeparator = ":";

Map<String, Object> config = new HashMap<>();
config.put("field", fieldName);
config.put("pattern", pattern);
config.put("append_separator", appendSeparator);
config.put("ignore_missing", true);

DissectProcessor processor = factory.create(null, processorTag, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.field, equalTo(fieldName));
assertThat(processor.pattern, equalTo(pattern));
assertThat(processor.appendSeparator, equalTo(appendSeparator));
assertThat(processor.dissectParser, is(notNullValue()));
assertThat(processor.ignoreMissing, is(true));
}

public void testCreateMissingField() {
DissectProcessor.Factory factory = new DissectProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("pattern", "%{a},%{b},%{c}");
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, "_tag", config));
assertThat(e.getMessage(), Matchers.equalTo("[field] required property is missing"));
}

public void testCreateMissingPattern() {
DissectProcessor.Factory factory = new DissectProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", randomAlphaOfLength(10));
Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create(null, "_tag", config));
assertThat(e.getMessage(), Matchers.equalTo("[pattern] required property is missing"));
}

public void testCreateMissingOptionals() {
DissectProcessor.Factory factory = new DissectProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("pattern", "%{a},%{b},%{c}");
config.put("field", randomAlphaOfLength(10));
DissectProcessor processor = factory.create(null, "_tag", config);
assertThat(processor.appendSeparator, equalTo(""));
assertThat(processor.ignoreMissing, is(false));
}

public void testCreateBadPattern() {
DissectProcessor.Factory factory = new DissectProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("pattern", "no keys defined");
config.put("field", randomAlphaOfLength(10));
expectThrows(DissectException.class, () -> factory.create(null, "_tag", config));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.dissect.DissectException;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.CoreMatchers;

import java.util.Collections;
import java.util.HashMap;

import static org.elasticsearch.ingest.IngestDocumentMatcher.assertIngestDocument;
import static org.hamcrest.Matchers.equalTo;

/**
* Basic tests for the {@link DissectProcessor}. See the {@link org.elasticsearch.dissect.DissectParser} test suite for a comprehensive
* set of dissect tests.
*/
public class DissectProcessorTests extends ESTestCase {

public void testMatch() {
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("message", "foo,bar,baz"));
DissectProcessor dissectProcessor = new DissectProcessor("", "message", "%{a},%{b},%{c}", "", true);
dissectProcessor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("a", String.class), equalTo("foo"));
assertThat(ingestDocument.getFieldValue("b", String.class), equalTo("bar"));
assertThat(ingestDocument.getFieldValue("c", String.class), equalTo("baz"));
}

public void testMatchOverwrite() {
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null,
MapBuilder.<String, Object>newMapBuilder()
.put("message", "foo,bar,baz")
.put("a", "willgetstompped")
.map());
assertThat(ingestDocument.getFieldValue("a", String.class), equalTo("willgetstompped"));
DissectProcessor dissectProcessor = new DissectProcessor("", "message", "%{a},%{b},%{c}", "", true);
dissectProcessor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("a", String.class), equalTo("foo"));
assertThat(ingestDocument.getFieldValue("b", String.class), equalTo("bar"));
assertThat(ingestDocument.getFieldValue("c", String.class), equalTo("baz"));
}

public void testAdvancedMatch() {
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("message", "foo bar,,,,,,,baz nope:notagain 😊 πŸ‡ πŸ™ƒ"));
DissectProcessor dissectProcessor =
new DissectProcessor("", "message", "%{a->} %{*b->},%{&b} %{}:%{?skipme} %{+smile/2} πŸ‡ %{+smile/1}", "::::", true);
dissectProcessor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue("a", String.class), equalTo("foo"));
assertThat(ingestDocument.getFieldValue("bar", String.class), equalTo("baz"));
expectThrows(IllegalArgumentException.class, () -> ingestDocument.getFieldValue("nope", String.class));
expectThrows(IllegalArgumentException.class, () -> ingestDocument.getFieldValue("notagain", String.class));
assertThat(ingestDocument.getFieldValue("smile", String.class), equalTo("πŸ™ƒ::::😊"));
}

public void testMiss() {
IngestDocument ingestDocument = new IngestDocument("_index", "_type", "_id", null, null, null, null,
Collections.singletonMap("message", "foo:bar,baz"));
DissectProcessor dissectProcessor = new DissectProcessor("", "message", "%{a},%{b},%{c}", "", true);
DissectException e = expectThrows(DissectException.class, () -> dissectProcessor.execute(ingestDocument));
assertThat(e.getMessage(), CoreMatchers.containsString("Unable to find match for dissect pattern"));
}

public void testNonStringValueWithIgnoreMissing() {
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new DissectProcessor("", fieldName, "%{a},%{b},%{c}", "", true);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setFieldValue(fieldName, randomInt());
Exception e = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(e.getMessage(), equalTo("field [" + fieldName + "] of type [java.lang.Integer] cannot be cast to [java.lang.String]"));
}

public void testNullValueWithIgnoreMissing() throws Exception {
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new DissectProcessor("", fieldName, "%{a},%{b},%{c}", "", true);
IngestDocument originalIngestDocument = RandomDocumentPicks
.randomIngestDocument(random(), Collections.singletonMap(fieldName, null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
processor.execute(ingestDocument);
assertIngestDocument(originalIngestDocument, ingestDocument);
}

public void testNullValueWithOutIgnoreMissing() {
String fieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = new DissectProcessor("", fieldName, "%{a},%{b},%{c}", "", false);
IngestDocument originalIngestDocument = RandomDocumentPicks
.randomIngestDocument(random(), Collections.singletonMap(fieldName, null));
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
}
}
Loading

0 comments on commit d1a9a14

Please sign in to comment.