From 301ff5683c124926eec7e7a5cdbbf48330a6cfa6 Mon Sep 17 00:00:00 2001 From: Seth Grover Date: Fri, 5 Jan 2024 12:07:19 -0700 Subject: [PATCH] more consistently differentiate between uploaded and live-captured traffic idaholab/Malcolm#321 WIP --- arkime/scripts/live_capture.sh | 2 +- arkime/scripts/viewer_service.sh | 4 ++-- filebeat/filebeat.yml | 6 +++--- .../pipelines/enrichment/97_arkimize.conf | 11 +++++++++- .../pipelines/enrichment/98_finalize.conf | 16 ++++++++++++--- .../pipelines/suricata/01_input_suricata.conf | 19 ++++-------------- logstash/pipelines/zeek/01_input_zeek.conf | 20 +++++-------------- shared/bin/pcap_processor.py | 8 ++++---- 8 files changed, 42 insertions(+), 44 deletions(-) diff --git a/arkime/scripts/live_capture.sh b/arkime/scripts/live_capture.sh index 19072706a..04069f2cf 100755 --- a/arkime/scripts/live_capture.sh +++ b/arkime/scripts/live_capture.sh @@ -8,7 +8,7 @@ KEY_FILE="${ARKIME_DIR}"/etc/viewer.key ARKIME_PACKET_THREADS=${ARKIME_PACKET_THREADS:-1} PUSER=${PUSER:-"arkime"} PGROUP=${PGROUP:-"arkime"} -NODE_NAME=${PCAP_NODE_NAME:-"malcolm"}-live +NODE_NAME=${PCAP_NODE_NAME:-"malcolm"} NODE_HOST=${ARKIME_LIVE_NODE_HOST:-""} OPENSEARCH_PRIMARY=${OPENSEARCH_PRIMARY:-"opensearch-local"} diff --git a/arkime/scripts/viewer_service.sh b/arkime/scripts/viewer_service.sh index 00314c460..04184390a 100755 --- a/arkime/scripts/viewer_service.sh +++ b/arkime/scripts/viewer_service.sh @@ -2,14 +2,14 @@ # Copyright (c) 2024 Battelle Energy Alliance, LLC. All rights reserved. -[[ "${ARKIME_LIVE_CAPTURE:-false}" == "true" ]] && LIVE_NODE_FLAG=-live || LIVE_NODE_FLAG= +[[ "${ARKIME_LIVE_CAPTURE:-false}" == "true" ]] && NODE_NAME_FLAG= || NODE_NAME_FLAG=-upload while true; do if [[ -f /var/run/arkime/initialized && "$VIEWER" == "on" ]]; then echo "Launch viewer..." rm -f $ARKIME_DIR/logs/viewer* pushd $ARKIME_DIR/viewer >/dev/null 2>&1 - $ARKIME_DIR/bin/node viewer.js --insecure -n "${PCAP_NODE_NAME:-malcolm}${LIVE_NODE_FLAG}" -c $ARKIME_DIR/etc/config.ini + $ARKIME_DIR/bin/node viewer.js --insecure -n "${PCAP_NODE_NAME:-malcolm}${NODE_NAME_FLAG}" -c $ARKIME_DIR/etc/config.ini popd >/dev/null 2>&1 fi sleep 5 diff --git a/filebeat/filebeat.yml b/filebeat/filebeat.yml index ba5590419..454a443e4 100644 --- a/filebeat/filebeat.yml +++ b/filebeat/filebeat.yml @@ -15,7 +15,7 @@ filebeat.inputs: exclude_files: ['signatures\(_carved.*\)\.log$'] symlinks: true fields_under_root: true - tags: ["_filebeat_zeek"] + tags: ["_filebeat_zeek_malcolm_upload"] compression_level: 0 exclude_lines: ['^\s*#'] scan_frequency: ${FILEBEAT_SCAN_FREQUENCY:10s} @@ -57,7 +57,7 @@ filebeat.inputs: - ${FILEBEAT_ZEEK_LOG_PATH:/zeek/current}/signatures(_carved*).log symlinks: true fields_under_root: true - tags: ["_filebeat_zeek"] + tags: ["_filebeat_zeek_malcolm_live"] compression_level: 0 exclude_lines: ['^\s*#'] scan_frequency: ${FILEBEAT_SCAN_FREQUENCY:10s} @@ -75,7 +75,7 @@ filebeat.inputs: - ${FILEBEAT_SURICATA_LOG_PATH:/suricata}/eve-*.json symlinks: true fields_under_root: true - tags: ["_filebeat_suricata"] + tags: ["_filebeat_suricata_malcolm_upload"] compression_level: 0 scan_frequency: ${FILEBEAT_SCAN_FREQUENCY:10s} clean_inactive: ${FILEBEAT_CLEAN_INACTIVE:180m} diff --git a/logstash/pipelines/enrichment/97_arkimize.conf b/logstash/pipelines/enrichment/97_arkimize.conf index 384f9a455..a94c58696 100644 --- a/logstash/pipelines/enrichment/97_arkimize.conf +++ b/logstash/pipelines/enrichment/97_arkimize.conf @@ -39,7 +39,7 @@ filter { # note that if so, the arkime "user" field may conflict with ECS here } - # this identifies which node the log came from + # this identifies which node the log came from in Arkime if ([beat][name]) { mutate { id => "mutate_add_field_beat_name_node" add_field => { "[node]" => "%{[beat][name]}" } } @@ -51,4 +51,13 @@ filter { add_field => { "[node]" => "malcolm" } } } + # for Arkime's node name, add -upload for uploaded PCAP data to be consistent with Arkime sessions + if ("_filebeat_zeek_malcolm_upload" in [tags]) or + ("_filebeat_suricata_malcolm_upload" in [tags]) or + ("_filebeat_zeek_upload" in [tags]) or + ("_filebeat_suricata_upload" in [tags]) { + mutate { id => "mutate_replace_node_name_uploaded" + replace => {"[node]" => "%{[node]}-upload" } } + } + } \ No newline at end of file diff --git a/logstash/pipelines/enrichment/98_finalize.conf b/logstash/pipelines/enrichment/98_finalize.conf index f292b9202..5655f6908 100644 --- a/logstash/pipelines/enrichment/98_finalize.conf +++ b/logstash/pipelines/enrichment/98_finalize.conf @@ -61,7 +61,17 @@ filter { "_jsonparsefailure", "_dissectfailure", "_ouilookupfailure", - "_geoip_lookup_failure" ] } - - + "_geoip_lookup_failure", + "_filebeat_suricata", + "_filebeat_suricata_hedgehog_live", + "_filebeat_suricata_live", + "_filebeat_suricata_malcolm_live", + "_filebeat_suricata_malcolm_upload", + "_filebeat_suricata_upload", + "_filebeat_zeek", + "_filebeat_zeek_hedgehog_live", + "_filebeat_zeek_live", + "_filebeat_zeek_malcolm_live", + "_filebeat_zeek_malcolm_upload", + "_filebeat_zeek_upload" ] } } \ No newline at end of file diff --git a/logstash/pipelines/suricata/01_input_suricata.conf b/logstash/pipelines/suricata/01_input_suricata.conf index b04ccc79d..80e796ce3 100644 --- a/logstash/pipelines/suricata/01_input_suricata.conf +++ b/logstash/pipelines/suricata/01_input_suricata.conf @@ -9,23 +9,12 @@ filter { # this pipeline only needs to see suricata logs forwarded from filebeat if ("_filebeat_suricata" in [tags]) or ("_filebeat_suricata_live" in [tags]) or + ("_filebeat_suricata_upload" in [tags]) or ("_filebeat_suricata_hedgehog_live" in [tags]) or - ("_filebeat_suricata_malcolm_live" in [tags]) { + ("_filebeat_suricata_malcolm_live" in [tags]) or + ("_filebeat_suricata_malcolm_upload" in [tags]) { - if ("_filebeat_suricata_malcolm_live" in [tags]) { - # the shipper name comes from PCAP_NODE_NAME in filebeat.yml, but for consistency - # with Arkime live capture append -live to the name for traffic captured live - if ([beat][name]) { mutate { id => "mutate_replace_beat_name_suricata_live" - replace => {"[beat][name]" => "%{[beat][name]}-live" } } } - if ([host][name]) { mutate { id => "mutate_replace_host_name_suricata_live" - replace => {"[host][name]" => "%{[host][name]}-live" } } } - } - - mutate { id => "mutate_filebeat_suricata_forward_tag_remove" - remove_tag => [ "_filebeat_suricata", - "_filebeat_suricata_live", - "_filebeat_suricata_hedgehog_live", - "_filebeat_suricata_malcolm_live" ] } + mutate { id => "mutate_filebeat_suricata_forward_noop" } } else { drop { id => "drop_not_filebeat_suricata" } diff --git a/logstash/pipelines/zeek/01_input_zeek.conf b/logstash/pipelines/zeek/01_input_zeek.conf index ddf4c13dd..6731613bc 100644 --- a/logstash/pipelines/zeek/01_input_zeek.conf +++ b/logstash/pipelines/zeek/01_input_zeek.conf @@ -9,25 +9,15 @@ filter { # this pipeline only needs to see zeek logs forwarded from filebeat if ("_filebeat_zeek" in [tags]) or ("_filebeat_zeek_live" in [tags]) or + ("_filebeat_zeek_upload" in [tags]) or ("_filebeat_zeek_hedgehog_live" in [tags]) or - ("_filebeat_zeek_malcolm_live" in [tags]) { + ("_filebeat_zeek_malcolm_live" in [tags]) or + ("_filebeat_zeek_malcolm_upload" in [tags]) { - if ("_filebeat_zeek_malcolm_live" in [tags]) { - # the shipper name comes from PCAP_NODE_NAME in filebeat.yml, but for consistency - # with Arkime live capture append -live to the name for traffic captured live - if ([beat][name]) { mutate { id => "mutate_replace_beat_name_zeek_live" - replace => {"[beat][name]" => "%{[beat][name]}-live" } } } - if ([host][name]) { mutate { id => "mutate_replace_host_name_zeek_live" - replace => {"[host][name]" => "%{[host][name]}-live" } } } - } - - mutate { id => "mutate_filebeat_zeek_forward_tag_remove" - remove_tag => [ "_filebeat_zeek", - "_filebeat_zeek_live", - "_filebeat_zeek_hedgehog_live", - "_filebeat_zeek_malcolm_live" ] } + mutate { id => "mutate_filebeat_zeek_forward_noop" } } else { drop { id => "drop_not_filebeat_zeek" } } } + diff --git a/shared/bin/pcap_processor.py b/shared/bin/pcap_processor.py index adf07fcb3..40e62d1c0 100755 --- a/shared/bin/pcap_processor.py +++ b/shared/bin/pcap_processor.py @@ -166,11 +166,11 @@ def arkimeCaptureFileWorker(arkimeWorkerArgs): ) logger.info(f"{scriptName}[{scanWorkerId}]:\tšŸ”Ž\t{fileInfo}") - # if this is a "live" rotated PCAP captured by netsniff-ng or tcpdump, - # append -live to the node name used (which originates from PCAP_NODE_NAME) + # if this is an uploaded PCAP (not captured "live"") + # append -upload to the node name used (which originates from PCAP_NODE_NAME) tmpNodeName = fileInfo[FILE_INFO_DICT_NODE] if (FILE_INFO_DICT_NODE in fileInfo) else nodeName - if (FILE_INFO_DICT_LIVE in fileInfo) and fileInfo[FILE_INFO_DICT_LIVE]: - tmpNodeName = tmpNodeName + '-live' + if (not (FILE_INFO_DICT_LIVE in fileInfo)) or (not fileInfo[FILE_INFO_DICT_LIVE]): + tmpNodeName = tmpNodeName + '-upload' # put together arkime execution command cmd = [