diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CommunityIdProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CommunityIdProcessor.java index 3d7e43fc8b028..629135ebfe925 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CommunityIdProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CommunityIdProcessor.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.function.Supplier; import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; @@ -33,6 +34,15 @@ public final class CommunityIdProcessor extends AbstractProcessor { public static final String TYPE = "community_id"; + private static final ThreadLocal MESSAGE_DIGEST = ThreadLocal.withInitial(() -> { + try { + return MessageDigest.getInstance("SHA-1"); + } catch (NoSuchAlgorithmException e) { + // should never happen, SHA-1 must be available in all JDKs + throw new IllegalStateException(e); + } + }); + private final String sourceIpField; private final String sourcePortField; private final String destinationIpField; @@ -42,7 +52,6 @@ public final class CommunityIdProcessor extends AbstractProcessor { private final String icmpTypeField; private final String icmpCodeField; private final String targetField; - private final ThreadLocal messageDigest; private final byte[] seed; private final boolean ignoreMissing; @@ -58,7 +67,6 @@ public final class CommunityIdProcessor extends AbstractProcessor { String icmpTypeField, String icmpCodeField, String targetField, - ThreadLocal messageDigest, byte[] seed, boolean ignoreMissing ) { @@ -72,7 +80,6 @@ public final class CommunityIdProcessor extends AbstractProcessor { this.icmpTypeField = icmpTypeField; this.icmpCodeField = icmpCodeField; this.targetField = targetField; - this.messageDigest = messageDigest; this.seed = seed; this.ignoreMissing = ignoreMissing; } @@ -113,10 +120,6 @@ public String getTargetField() { return targetField; } - public MessageDigest getMessageDigest() { - return messageDigest.get(); - } - public byte[] getSeed() { return seed; } @@ -127,7 +130,15 @@ public boolean getIgnoreMissing() { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - Flow flow = buildFlow(ingestDocument); + String sourceIp = ingestDocument.getFieldValue(sourceIpField, String.class, ignoreMissing); + String destinationIp = ingestDocument.getFieldValue(destinationIpField, String.class, ignoreMissing); + Object ianaNumber = ingestDocument.getFieldValue(ianaNumberField, Object.class, true); + Supplier transport = () -> ingestDocument.getFieldValue(transportField, Object.class, ignoreMissing); + Supplier sourcePort = () -> ingestDocument.getFieldValue(sourcePortField, Object.class, ignoreMissing); + Supplier destinationPort = () -> ingestDocument.getFieldValue(destinationPortField, Object.class, ignoreMissing); + Object icmpType = ingestDocument.getFieldValue(icmpTypeField, Object.class, true); + Object icmpCode = ingestDocument.getFieldValue(icmpCodeField, Object.class, true); + Flow flow = buildFlow(sourceIp, destinationIp, ianaNumber, transport, sourcePort, destinationPort, icmpType, icmpCode); if (flow == null) { if (ignoreMissing) { return ingestDocument; @@ -136,19 +147,50 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { } } - MessageDigest md = messageDigest.get(); - md.reset(); - ingestDocument.setFieldValue(targetField, flow.toCommunityId(md, seed)); + ingestDocument.setFieldValue(targetField, flow.toCommunityId(seed)); return ingestDocument; } - private Flow buildFlow(IngestDocument d) { - String sourceIpAddrString = d.getFieldValue(sourceIpField, String.class, ignoreMissing); + public static String apply( + String sourceIpAddrString, + String destIpAddrString, + Object ianaNumber, + Object transport, + Object sourcePort, + Object destinationPort, + Object icmpType, + Object icmpCode, + int seed) { + + Flow flow = buildFlow(sourceIpAddrString, destIpAddrString, ianaNumber, () -> transport, () -> sourcePort, () -> destinationPort, + icmpType, icmpCode); + + if (flow == null) { + throw new IllegalArgumentException("unable to construct flow from document"); + } else { + return flow.toCommunityId(toUint16(seed)); + } + } + + public static String apply( + String sourceIpAddrString, + String destIpAddrString, + Object ianaNumber, + Object transport, + Object sourcePort, + Object destinationPort, + Object icmpType, + Object icmpCode) { + return apply(sourceIpAddrString, destIpAddrString, ianaNumber, transport, sourcePort, destinationPort, icmpType, icmpCode, 0); + } + + private static Flow buildFlow(String sourceIpAddrString, String destIpAddrString, Object ianaNumber, + Supplier transport, Supplier sourcePort, Supplier destinationPort, + Object icmpType, Object icmpCode) { if (sourceIpAddrString == null) { return null; } - String destIpAddrString = d.getFieldValue(destinationIpField, String.class, ignoreMissing); if (destIpAddrString == null) { return null; } @@ -157,9 +199,9 @@ private Flow buildFlow(IngestDocument d) { flow.source = InetAddresses.forString(sourceIpAddrString); flow.destination = InetAddresses.forString(destIpAddrString); - Object protocol = d.getFieldValue(ianaNumberField, Object.class, true); + Object protocol = ianaNumber; if (protocol == null) { - protocol = d.getFieldValue(transportField, Object.class, ignoreMissing); + protocol = transport.get(); if (protocol == null) { return null; } @@ -170,23 +212,21 @@ private Flow buildFlow(IngestDocument d) { case Tcp: case Udp: case Sctp: - Object sourcePortValue = d.getFieldValue(sourcePortField, Object.class, ignoreMissing); - flow.sourcePort = parseIntFromObjectOrString(sourcePortValue, "source port"); + flow.sourcePort = parseIntFromObjectOrString(sourcePort.get(), "source port"); if (flow.sourcePort < 1 || flow.sourcePort > 65535) { - throw new IllegalArgumentException("invalid source port [" + sourcePortValue + "]"); + throw new IllegalArgumentException("invalid source port [" + sourcePort.get() + "]"); } - Object destinationPortValue = d.getFieldValue(destinationPortField, Object.class, ignoreMissing); - flow.destinationPort = parseIntFromObjectOrString(destinationPortValue, "destination port"); + flow.destinationPort = parseIntFromObjectOrString(destinationPort.get(), "destination port"); if (flow.destinationPort < 1 || flow.destinationPort > 65535) { - throw new IllegalArgumentException("invalid destination port [" + destinationPortValue + "]"); + throw new IllegalArgumentException("invalid destination port [" + destinationPort.get() + "]"); } break; case Icmp: case IcmpIpV6: // tolerate missing or invalid ICMP types and codes - flow.icmpType = parseIntFromObjectOrString(d.getFieldValue(icmpTypeField, Object.class, true), "icmp type"); - flow.icmpCode = parseIntFromObjectOrString(d.getFieldValue(icmpCodeField, Object.class, true), "icmp code"); + flow.icmpType = parseIntFromObjectOrString(icmpType, "icmp type"); + flow.icmpCode = parseIntFromObjectOrString(icmpCode, "icmp code"); break; } @@ -258,13 +298,6 @@ public CommunityIdProcessor create( if (seedInt < 0 || seedInt > 65535) { throw newConfigurationException(TYPE, processorTag, "seed", "must be a value between 0 and 65535"); } - ThreadLocal messageDigest = ThreadLocal.withInitial(() -> { - try { - return MessageDigest.getInstance("SHA-1"); - } catch (NoSuchAlgorithmException e) { - throw new IllegalStateException("unable to obtain SHA-1 hasher", e); - } - }); boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", true); return new CommunityIdProcessor( @@ -279,7 +312,6 @@ public CommunityIdProcessor create( icmpTypeField, icmpCodeField, targetField, - messageDigest, toUint16(seedInt), ignoreMissing ); @@ -343,7 +375,9 @@ byte[] toBytes() { return bb.array(); } - String toCommunityId(MessageDigest md, byte[] seed) { + String toCommunityId(byte[] seed) { + MessageDigest md = MESSAGE_DIGEST.get(); + md.reset(); md.update(seed); byte[] encodedBytes = Base64.getEncoder().encode(md.digest(toBytes())); return "1:" + new String(encodedBytes, StandardCharsets.UTF_8); diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java index 90276ce1dba69..c1d56b6c333df 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/Processors.java @@ -86,6 +86,75 @@ public static String urlDecode(String value) { } /** + * Uses {@link CommunityIdProcessor} to compute community ID for network flow data. + * + * @param sourceIpAddrString source IP address + * @param destIpAddrString destination IP address + * @param ianaNumber IANA number + * @param transport transport protocol + * @param sourcePort source port + * @param destinationPort destination port + * @param icmpType ICMP type + * @param icmpCode ICMP code + * @param seed hash seed (must be between 0 and 65535) + * @return Community ID + */ + public static String communityId( + String sourceIpAddrString, + String destIpAddrString, + Object ianaNumber, + Object transport, + Object sourcePort, + Object destinationPort, + Object icmpType, + Object icmpCode, + int seed) { + return CommunityIdProcessor.apply( + sourceIpAddrString, + destIpAddrString, + ianaNumber, + transport, + sourcePort, + destinationPort, + icmpType, + icmpCode, + seed + ); + } + + /** + * Uses {@link CommunityIdProcessor} to compute community ID for network flow data. + * + * @param sourceIpAddrString source IP address + * @param destIpAddrString destination IP address + * @param ianaNumber IANA number + * @param transport transport protocol + * @param sourcePort source port + * @param destinationPort destination port + * @param icmpType ICMP type + * @param icmpCode ICMP code + * @return Community ID + */ + public static String communityId( + String sourceIpAddrString, + String destIpAddrString, + Object ianaNumber, + Object transport, + Object sourcePort, + Object destinationPort, + Object icmpType, + Object icmpCode) { + return CommunityIdProcessor.apply(sourceIpAddrString, + destIpAddrString, + ianaNumber, + transport, + sourcePort, + destinationPort, + icmpType, + icmpCode); + } + + /* * Uses {@link UriPartsProcessor} to decompose an URI into its constituent parts. * * @param uri string to decode diff --git a/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt b/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt index 66e490f9ea429..4d05bdfc283bb 100644 --- a/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt +++ b/modules/ingest-common/src/main/resources/org/elasticsearch/ingest/common/processors_whitelist.txt @@ -15,5 +15,7 @@ class org.elasticsearch.ingest.common.Processors { Object json(Object) void json(Map, String) String urlDecode(String) + String communityId(String, String, Object, Object, Object, Object, Object, Object, int) + String communityId(String, String, Object, Object, Object, Object, Object, Object) Map uriParts(String) } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CommunityIdProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CommunityIdProcessorTests.java index 77a1b0fb205ec..ba777771b8c2e 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CommunityIdProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/CommunityIdProcessorTests.java @@ -12,8 +12,6 @@ import org.elasticsearch.test.ESTestCase; import org.junit.Before; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.Map; @@ -38,17 +36,9 @@ public class CommunityIdProcessorTests extends ESTestCase { // https://github.com/elastic/beats/blob/master/libbeat/processors/communityid/communityid_test.go private Map event; - private ThreadLocal messageDigest; @Before public void setup() throws Exception { - messageDigest = ThreadLocal.withInitial(() -> { - try { - return MessageDigest.getInstance("SHA-1"); - } catch (NoSuchAlgorithmException e) { - throw new IllegalStateException("unable to obtain SHA-1 hasher", e); - } - }); event = buildEvent(); } @@ -323,6 +313,19 @@ public void testIgnoreMissing() throws Exception { testCommunityIdProcessor(event, 0, null, true); } + public void testIgnoreMissingIsFalse() throws Exception { + @SuppressWarnings("unchecked") + var source = (Map) event.get("source"); + source.remove("ip"); + + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> testCommunityIdProcessor(event, 0, null, false) + ); + + assertThat(e.getMessage(), containsString("field [ip] not present as part of path [source.ip]")); + } + private void testCommunityIdProcessor(Map source, String expectedHash) throws Exception { testCommunityIdProcessor(source, 0, expectedHash); } @@ -346,7 +349,6 @@ private void testCommunityIdProcessor(Map source, int seed, Stri DEFAULT_ICMP_TYPE, DEFAULT_ICMP_CODE, DEFAULT_TARGET, - messageDigest, CommunityIdProcessor.toUint16(seed), ignoreMissing ); diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml index a4dcc218cb598..85ef086ace51e 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml @@ -203,6 +203,46 @@ teardown: - match: { _source.source_field: "foo%20bar" } - match: { _source.target_field: "foo bar" } +--- +"Test invoke community_id processor": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "script" : { + "lang": "painless", + "source" : "ctx.target_field1 = Processors.communityId('128.232.110.120', '66.35.250.204', null, 'TCP', 34855, 80, null, null, 123);" + } + }, + { + "script" : { + "lang": "painless", + "source" : "ctx.target_field2 = Processors.communityId('128.232.110.120', '66.35.250.204', null, 'TCP', 34855, 80, null, null);" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: {source_field: "foo"} + + - do: + get: + index: test + id: 1 + - match: { _source.source_field: "foo" } + - match: { _source.target_field1: "1:hTSGlFQnR58UCk+NfKRZzA32dPg=" } + - match: { _source.target_field2: "1:LQU9qZlK+B5F3KDmev6m5PMibrg=" } + --- "Test invoke uri_parts processor": - do: