Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Community ID processor in Painless #73963

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;

import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
Expand All @@ -33,6 +34,15 @@ public final class CommunityIdProcessor extends AbstractProcessor {

public static final String TYPE = "community_id";

private static final ThreadLocal<MessageDigest> MESSAGE_DIGEST = ThreadLocal.withInitial(() -> {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
// should never happen, SHA-1 must be available in all JDKs
throw new IllegalStateException(e);
}
});

private final String sourceIpField;
private final String sourcePortField;
private final String destinationIpField;
Expand All @@ -42,7 +52,6 @@ public final class CommunityIdProcessor extends AbstractProcessor {
private final String icmpTypeField;
private final String icmpCodeField;
private final String targetField;
private final ThreadLocal<MessageDigest> messageDigest;
private final byte[] seed;
private final boolean ignoreMissing;

Expand All @@ -58,7 +67,6 @@ public final class CommunityIdProcessor extends AbstractProcessor {
String icmpTypeField,
String icmpCodeField,
String targetField,
ThreadLocal<MessageDigest> messageDigest,
byte[] seed,
boolean ignoreMissing
) {
Expand All @@ -72,7 +80,6 @@ public final class CommunityIdProcessor extends AbstractProcessor {
this.icmpTypeField = icmpTypeField;
this.icmpCodeField = icmpCodeField;
this.targetField = targetField;
this.messageDigest = messageDigest;
this.seed = seed;
this.ignoreMissing = ignoreMissing;
}
Expand Down Expand Up @@ -113,10 +120,6 @@ public String getTargetField() {
return targetField;
}

public MessageDigest getMessageDigest() {
return messageDigest.get();
}

public byte[] getSeed() {
return seed;
}
Expand All @@ -127,28 +130,65 @@ public boolean getIgnoreMissing() {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Flow flow = buildFlow(ingestDocument);
String sourceIp = ingestDocument.getFieldValue(sourceIpField, String.class, ignoreMissing);
String destinationIp = ingestDocument.getFieldValue(destinationIpField, String.class, ignoreMissing);
Object ianaNumber = ingestDocument.getFieldValue(ianaNumberField, Object.class, true);
Supplier<Object> transport = () -> ingestDocument.getFieldValue(transportField, Object.class, ignoreMissing);
Supplier<Object> sourcePort = () -> ingestDocument.getFieldValue(sourcePortField, Object.class, ignoreMissing);
Supplier<Object> destinationPort = () -> ingestDocument.getFieldValue(destinationPortField, Object.class, ignoreMissing);
Object icmpType = ingestDocument.getFieldValue(icmpTypeField, Object.class, true);
Object icmpCode = ingestDocument.getFieldValue(icmpCodeField, Object.class, true);
Flow flow = buildFlow(sourceIp, destinationIp, ianaNumber, transport, sourcePort, destinationPort, icmpType, icmpCode);
if (flow == null) {
if (ignoreMissing) {
return ingestDocument;
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should still be here?

throw new IllegalArgumentException("unable to construct flow from document");
}
}

MessageDigest md = messageDigest.get();
md.reset();
ingestDocument.setFieldValue(targetField, flow.toCommunityId(md, seed));
ingestDocument.setFieldValue(targetField, flow.toCommunityId(seed));
return ingestDocument;
}

private Flow buildFlow(IngestDocument d) {
String sourceIpAddrString = d.getFieldValue(sourceIpField, String.class, ignoreMissing);
public static String apply(
String sourceIpAddrString,
String destIpAddrString,
Object ianaNumber,
Object transport,
Object sourcePort,
Object destinationPort,
Object icmpType,
Object icmpCode,
int seed) {

Flow flow = buildFlow(sourceIpAddrString, destIpAddrString, ianaNumber, () -> transport, () -> sourcePort, () -> destinationPort,
icmpType, icmpCode);

if (flow == null) {
throw new IllegalArgumentException("unable to construct flow from document");
} else {
return flow.toCommunityId(toUint16(seed));
}
}

public static String apply(
String sourceIpAddrString,
String destIpAddrString,
Object ianaNumber,
Object transport,
Object sourcePort,
Object destinationPort,
Object icmpType,
Object icmpCode) {
return apply(sourceIpAddrString, destIpAddrString, ianaNumber, transport, sourcePort, destinationPort, icmpType, icmpCode, 0);
}

private static Flow buildFlow(String sourceIpAddrString, String destIpAddrString, Object ianaNumber,
Supplier<Object> transport, Supplier<Object> sourcePort, Supplier<Object> destinationPort,
Object icmpType, Object icmpCode) {
if (sourceIpAddrString == null) {
return null;
}

String destIpAddrString = d.getFieldValue(destinationIpField, String.class, ignoreMissing);
if (destIpAddrString == null) {
return null;
}
Expand All @@ -157,9 +197,9 @@ private Flow buildFlow(IngestDocument d) {
flow.source = InetAddresses.forString(sourceIpAddrString);
flow.destination = InetAddresses.forString(destIpAddrString);

Object protocol = d.getFieldValue(ianaNumberField, Object.class, true);
Object protocol = ianaNumber;
if (protocol == null) {
protocol = d.getFieldValue(transportField, Object.class, ignoreMissing);
protocol = transport.get();
if (protocol == null) {
return null;
}
Expand All @@ -170,23 +210,21 @@ private Flow buildFlow(IngestDocument d) {
case Tcp:
case Udp:
case Sctp:
Object sourcePortValue = d.getFieldValue(sourcePortField, Object.class, ignoreMissing);
flow.sourcePort = parseIntFromObjectOrString(sourcePortValue, "source port");
flow.sourcePort = parseIntFromObjectOrString(sourcePort.get(), "source port");
if (flow.sourcePort < 1 || flow.sourcePort > 65535) {
throw new IllegalArgumentException("invalid source port [" + sourcePortValue + "]");
throw new IllegalArgumentException("invalid source port [" + sourcePort.get() + "]");
}

Object destinationPortValue = d.getFieldValue(destinationPortField, Object.class, ignoreMissing);
flow.destinationPort = parseIntFromObjectOrString(destinationPortValue, "destination port");
flow.destinationPort = parseIntFromObjectOrString(destinationPort.get(), "destination port");
if (flow.destinationPort < 1 || flow.destinationPort > 65535) {
throw new IllegalArgumentException("invalid destination port [" + destinationPortValue + "]");
throw new IllegalArgumentException("invalid destination port [" + destinationPort.get() + "]");
}
break;
case Icmp:
case IcmpIpV6:
// tolerate missing or invalid ICMP types and codes
flow.icmpType = parseIntFromObjectOrString(d.getFieldValue(icmpTypeField, Object.class, true), "icmp type");
flow.icmpCode = parseIntFromObjectOrString(d.getFieldValue(icmpCodeField, Object.class, true), "icmp code");
flow.icmpType = parseIntFromObjectOrString(icmpType, "icmp type");
flow.icmpCode = parseIntFromObjectOrString(icmpCode, "icmp code");
break;
}

Expand Down Expand Up @@ -258,13 +296,6 @@ public CommunityIdProcessor create(
if (seedInt < 0 || seedInt > 65535) {
throw newConfigurationException(TYPE, processorTag, "seed", "must be a value between 0 and 65535");
}
ThreadLocal<MessageDigest> messageDigest = ThreadLocal.withInitial(() -> {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("unable to obtain SHA-1 hasher", e);
}
});

boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", true);
return new CommunityIdProcessor(
Expand All @@ -279,7 +310,6 @@ public CommunityIdProcessor create(
icmpTypeField,
icmpCodeField,
targetField,
messageDigest,
toUint16(seedInt),
ignoreMissing
);
Expand Down Expand Up @@ -343,7 +373,9 @@ byte[] toBytes() {
return bb.array();
}

String toCommunityId(MessageDigest md, byte[] seed) {
String toCommunityId(byte[] seed) {
MessageDigest md = MESSAGE_DIGEST.get();
md.reset();
md.update(seed);
byte[] encodedBytes = Base64.getEncoder().encode(md.digest(toBytes()));
return "1:" + new String(encodedBytes, StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,75 @@ public static String urlDecode(String value) {
}

/**
* Uses {@link CommunityIdProcessor} to compute community ID for network flow data.
*
* @param sourceIpAddrString source IP address
* @param destIpAddrString destination IP address
* @param ianaNumber IANA number
* @param transport transport protocol
* @param sourcePort source port
* @param destinationPort destination port
* @param icmpType ICMP type
* @param icmpCode ICMP code
* @param seed hash seed (must be between 0 and 65535)
* @return Community ID
*/
public static String communityId(
String sourceIpAddrString,
String destIpAddrString,
Object ianaNumber,
Object transport,
Object sourcePort,
Object destinationPort,
Object icmpType,
Object icmpCode,
int seed) {
return CommunityIdProcessor.apply(
sourceIpAddrString,
destIpAddrString,
ianaNumber,
transport,
sourcePort,
destinationPort,
icmpType,
icmpCode,
seed
);
}

/**
* Uses {@link CommunityIdProcessor} to compute community ID for network flow data.
*
* @param sourceIpAddrString source IP address
* @param destIpAddrString destination IP address
* @param ianaNumber IANA number
* @param transport transport protocol
* @param sourcePort source port
* @param destinationPort destination port
* @param icmpType ICMP type
* @param icmpCode ICMP code
* @return Community ID
*/
public static String communityId(
String sourceIpAddrString,
String destIpAddrString,
Object ianaNumber,
Object transport,
Object sourcePort,
Object destinationPort,
Object icmpType,
Object icmpCode) {
return CommunityIdProcessor.apply(sourceIpAddrString,
destIpAddrString,
ianaNumber,
transport,
sourcePort,
destinationPort,
icmpType,
icmpCode);
}

/*
* Uses {@link UriPartsProcessor} to decompose an URI into its constituent parts.
*
* @param uri string to decode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ class org.elasticsearch.ingest.common.Processors {
Object json(Object)
void json(Map, String)
String urlDecode(String)
String communityId(String, String, Object, Object, Object, Object, Object, Object, int)
String communityId(String, String, Object, Object, Object, Object, Object, Object)
Map uriParts(String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -38,17 +36,9 @@ public class CommunityIdProcessorTests extends ESTestCase {
// https://github.com/elastic/beats/blob/master/libbeat/processors/communityid/communityid_test.go

private Map<String, Object> event;
private ThreadLocal<MessageDigest> messageDigest;

@Before
public void setup() throws Exception {
messageDigest = ThreadLocal.withInitial(() -> {
try {
return MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException("unable to obtain SHA-1 hasher", e);
}
});
event = buildEvent();
}

Expand Down Expand Up @@ -323,6 +313,19 @@ public void testIgnoreMissing() throws Exception {
testCommunityIdProcessor(event, 0, null, true);
}

public void testIgnoreMissingIsFalse() throws Exception {
@SuppressWarnings("unchecked")
var source = (Map<String, Object>) event.get("source");
source.remove("ip");

IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> testCommunityIdProcessor(event, 0, null, false)
);

assertThat(e.getMessage(), containsString("field [ip] not present as part of path [source.ip]"));
}

private void testCommunityIdProcessor(Map<String, Object> source, String expectedHash) throws Exception {
testCommunityIdProcessor(source, 0, expectedHash);
}
Expand All @@ -346,7 +349,6 @@ private void testCommunityIdProcessor(Map<String, Object> source, int seed, Stri
DEFAULT_ICMP_TYPE,
DEFAULT_ICMP_CODE,
DEFAULT_TARGET,
messageDigest,
CommunityIdProcessor.toUint16(seed),
ignoreMissing
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,46 @@ teardown:
- match: { _source.source_field: "foo%20bar" }
- match: { _source.target_field: "foo bar" }

---
"Test invoke community_id processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field1 = Processors.communityId('128.232.110.120', '66.35.250.204', null, 'TCP', 34855, 80, null, null, 123);"
}
},
{
"script" : {
"lang": "painless",
"source" : "ctx.target_field2 = Processors.communityId('128.232.110.120', '66.35.250.204', null, 'TCP', 34855, 80, null, null);"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {source_field: "foo"}

- do:
get:
index: test
id: 1
- match: { _source.source_field: "foo" }
- match: { _source.target_field1: "1:hTSGlFQnR58UCk+NfKRZzA32dPg=" }
- match: { _source.target_field2: "1:LQU9qZlK+B5F3KDmev6m5PMibrg=" }

---
"Test invoke uri_parts processor":
- do:
Expand Down