From 6b01fa9dce0273a22e39f93cb72df5e52209e01a Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Fri, 28 Apr 2017 09:08:12 +0200 Subject: [PATCH] Heartbeat event format (#4091) See PR #4091 for detailed list of event format changes and sample events. - export transport.DialWith - update heartbeat look: - add look.Status - add some missing godoc - rewrite heartbeat dialchain package - simplify package - have connection layers add standardized connection data to the final event - add some more helpers - add some godocs - update event format. See PR #4091 - add more detailed duration measure to HTTP module - update job settings - introduce explicit structs for job settings - job settings can contain static monitor fields to be added to every event - helpers can add more static monitor fields to settings - update fields.yml structure - update kibana dashboard --- CHANGELOG.asciidoc | 1 + heartbeat/_meta/fields.yml | 335 +++++++++++++----- .../02014c80-29d2-11e7-a68f-bfaa2341cc52.json | 24 ++ .../c49bd160-eb17-11e6-be20-559646f8b9ba.json | 23 -- .../091c3a90-eb1e-11e6-be20-559646f8b9ba.json | 8 +- .../0f4c0560-eb20-11e6-9f11-159ff202874a.json | 8 +- .../1738dbc0-eb1d-11e6-be20-559646f8b9ba.json | 8 +- .../920e8140-eb1a-11e6-be20-559646f8b9ba.json | 8 +- .../c65ef340-eb19-11e6-be20-559646f8b9ba.json | 6 +- heartbeat/beater/manager.go | 48 ++- heartbeat/docs/fields.asciidoc | 335 +++++++++++++++--- heartbeat/look/look.go | 19 +- .../monitors/active/dialchain/dialchain.go | 178 ++-------- heartbeat/monitors/active/dialchain/net.go | 94 +++++ heartbeat/monitors/active/dialchain/socks5.go | 38 ++ heartbeat/monitors/active/dialchain/tls.go | 42 +++ heartbeat/monitors/active/dialchain/util.go | 111 ++++++ .../monitors/active/http/simple_transp.go | 15 +- heartbeat/monitors/active/http/task.go | 118 +++--- heartbeat/monitors/active/icmp/icmp.go | 31 +- heartbeat/monitors/active/icmp/loop.go | 2 +- heartbeat/monitors/active/tcp/task.go | 94 +++-- heartbeat/monitors/util.go | 314 +++++++++++----- libbeat/outputs/transport/proxy.go | 2 +- libbeat/outputs/transport/tcp.go | 2 +- libbeat/outputs/transport/tls.go | 1 + libbeat/outputs/transport/util.go | 5 +- 27 files changed, 1321 insertions(+), 549 deletions(-) create mode 100644 heartbeat/_meta/kibana/search/02014c80-29d2-11e7-a68f-bfaa2341cc52.json delete mode 100644 heartbeat/_meta/kibana/search/c49bd160-eb17-11e6-be20-559646f8b9ba.json create mode 100644 heartbeat/monitors/active/dialchain/net.go create mode 100644 heartbeat/monitors/active/dialchain/socks5.go create mode 100644 heartbeat/monitors/active/dialchain/tls.go create mode 100644 heartbeat/monitors/active/dialchain/util.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a077f61861b..2dc7db9db68 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -30,6 +30,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] - Add Icinga module. {pull}3904[3904] *Heartbeat* +- Event format and field naming changes in Heartbeat and sample Dashboard. {pull}4091[4091] *Metricbeat* - Linux cgroup metrics are now enabled by default for the system process diff --git a/heartbeat/_meta/fields.yml b/heartbeat/_meta/fields.yml index 763874c139b..640b2dcbe33 100644 --- a/heartbeat/_meta/fields.yml +++ b/heartbeat/_meta/fields.yml @@ -1,132 +1,283 @@ - key: common - title: "Common monitoring fields" + title: "Common Heartbeat Monitor" description: fields: - - name: type - type: keyword - required: true - description: > - The monitor type. - - name: monitor - type: keyword + type: group description: > - Monitor job name. + Common monitor fields. - - name: scheme - type: keyword - description: > - Address url scheme. For example `tcp`, `ssl`, `http`, and `https`. + fields: + - name: type + type: keyword + description: > + The monitor type. - - name: host - type: keyword - description: > - Hostname of service being monitored. Can be missing, if service is - monitored by IP. + - name: name + type: keyword + description: > + The monitors configured name - - name: port - type: integer - description: > - Service port number. + - name: id + type: keyword + description: > + The monitors full job ID as used by heartbeat. - - name: url - type: text - description: > - Service url used by monitor. + - name: duration + type: group + description: total monitoring test duration + fields: + - name: us + type: long + description: Duration in microseconds - - name: ip - type: ip - description: > - IP of service being monitored. If service is monitored by hostname, - the `ip` field contains the resolved ip address for the current host. + - name: scheme + type: keyword + description: > + Address url scheme. For example `tcp`, `tls`, `http`, and `https`. - - name: duration - type: group - description: total monitoring test duration - fields: - - name: us - type: long - description: Duration in microseconds + - name: host + type: keyword + description: > + Hostname of service being monitored. Can be missing, if service is + monitored by IP. - - name: resolve_rtt - type: group - description: Duration required to resolve an IP from hostname. - fields: - - name: us - type: long - description: Duration in microseconds + - name: ip + type: ip + description: > + IP of service being monitored. If service is monitored by hostname, + the `ip` field contains the resolved ip address for the current host. - - name: icmp_rtt - type: group - description: ICMP Echo Request and Reply round trip time - fields: - - name: us - type: long - description: Duration in microseconds + - name: status + required: true + type: keyword + description: > + Indicator if monitor could validate the service to be available. - - name: tcp_connect_rtt +- key: resolve + title: "Host Lookup" + description: + fields: + - name: resolve type: group description: > - Duration required to establish a TCP connection based on already - available IP address. + Host lookup fields. fields: - - name: us - type: long - description: Duration in microseconds + - name: host + type: keyword + description: > + Hostname of service being monitored. + + - name: ip + type: ip + description: > + IP address found for the given host. - - name: socks5_connect_rtt + - name: rtt + type: group + description: Duration required to resolve an IP from hostname. + fields: + - name: us + type: long + description: Duration in microseconds + +- key: icmp + title: "ICMP" + description: + fields: + - name: icmp type: group description: > - Time required to establish a connection via SOCKS5 to endpoint based on available - connection to SOCKS5 proxy. + IP ping fields. fields: - - name: us - type: long - description: Duration in microseconds + - name: requests + type: integer + description: > + Number if ICMP EchoRequests send. - - name: tls_handshake_rtt + - name: rtt + type: group + description: ICMP Echo Request and Reply round trip time + fields: + - name: us + type: long + description: Duration in microseconds + +- key: tcp + title: "TCP Layer" + description: + fields: + - name: tcp type: group description: > - Time required to finish TLS handshake based on already available network - connection. + TCP network layer related fields. fields: - - name: us - type: long - description: Duration in microseconds + - name: port + type: integer + description: > + Service port number. + + - name: rtt + type: group + description: > + TCP layer round trip times. + fields: + - name: connect + type: group + description: > + Duration required to establish a TCP connection based on already + available IP address. + fields: + - name: us + type: long + description: Duration in microseconds + + - name: validate + type: group + description: > + Duration of validation step based on existing TCP connection. + fields: + - name: us + type: long + description: Duration in microseconds - - name: http_rtt +- key: socks5 + title: "SOCKS5 Proxy" + description: + fields: + - name: socks5 type: group description: > - Time required between sending the HTTP request and first by from HTTP - response being read. Duration based on already available network connection. + SOCKS5 proxy related fields: fields: - - name: us - type: long - description: Duration in microseconds + - name: rtt + type: group + description: > + TLS layer round trip times. + fields: + - name: connect + type: group + description: > + Time required to establish a connection via SOCKS5 to endpoint + based on available connection to SOCKS5 proxy. + fields: + - name: us + type: long + description: Duration in microseconds + - - name: validate_rtt +- key: tls + title: "TLS Encryption Layer" + description: + fields: + - name: tls type: group description: > - Time required for validating the connection if connection checks are configured. + TLS layer related fields. fields: - - name: us - type: long - description: Duration in microseconds + - name: rtt + type: group + description: > + TLS layer round trip times. + fields: + - name: handshake + type: group + description: > + Time required to finish TLS handshake based on already available network + connection. + fields: + - name: us + type: long + description: Duration in microseconds - - name: response +- key: http + title: "HTTP Monitor" + description: + fields: + - name: http type: group description: > - Service response parameters. - + HTTP related fields. fields: - - name: status - type: integer + - name: url + type: text description: > - Response status code. + Service url used by monitor. - - name: up - required: true - type: boolean - description: > - Boolean indicator if monitor could validate the service to be available. + - name: response + type: group + description: > + Service response parameters. + fields: + - name: status + type: integer + description: > + Response status code. + - name: rtt + type: group + description: > + HTTP layer round trip times. + fields: + - name: validate + type: group + description: | + Duration between first byte of HTTP request being written and + response being processed by validator. Duration based on already + available network connection. + + Note: if validator is not reading body or only a prefix, this + number does not fully represent the total time needed + to read the body. + fields: + - name: us + type: long + description: Duration in microseconds + + - name: validate_body + type: group + description: | + Duration of validator required to read and validate the response + body. + + Note: if validator is not reading body or only a prefix, this + number does not fully represent the total time needed + to read the body. + fields: + - name: us + type: long + description: Duration in microseconds + + - name: write_request + type: group + description: + Duration of sending the complete HTTP request. Duration based on + already available network connection. + fields: + - name: us + type: long + description: Duration in microseconds + + - name: response_header + type: group + description: + Time required between sending the start of sending the HTTP + request and first by from HTTP response being read. Duration + based on already available network connection. + fields: + - name: us + type: long + description: Duration in microseconds + + - name: total + type: group + description: | + Duration required to process the HTTP transaction. Starts with + the initial TCP connection attempt. Ends with after validator + did check the response. + Note: if validator is not reading body or only a prefix, this + number does not fully represent the total time needed. + fields: + - name: us + type: long + description: Duration in microseconds diff --git a/heartbeat/_meta/kibana/search/02014c80-29d2-11e7-a68f-bfaa2341cc52.json b/heartbeat/_meta/kibana/search/02014c80-29d2-11e7-a68f-bfaa2341cc52.json new file mode 100644 index 00000000000..53ba45f044e --- /dev/null +++ b/heartbeat/_meta/kibana/search/02014c80-29d2-11e7-a68f-bfaa2341cc52.json @@ -0,0 +1,24 @@ +{ + "sort": [ + "@timestamp", + "desc" + ], + "hits": 0, + "description": "", + "title": "Heartbeat HTTP pings", + "version": 1, + "kibanaSavedObjectMeta": { + "searchSourceJSON": "{\n \"index\": \"heartbeat-*\",\n \"highlightAll\": true,\n \"query\": {\n \"query_string\": {\n \"analyze_wildcard\": true,\n \"query\": \"*\"\n }\n },\n \"filter\": [\n {\n \"$state\": {\n \"store\": \"appState\"\n },\n \"meta\": {\n \"alias\": null,\n \"disabled\": false,\n \"index\": \"heartbeat-*\",\n \"key\": \"monitor.name\",\n \"negate\": false,\n \"value\": \"http\"\n },\n \"query\": {\n \"match\": {\n \"monitor.name\": {\n \"query\": \"http\",\n \"type\": \"phrase\"\n }\n }\n }\n }\n ]\n}" + }, + "columns": [ + "monitor.id", + "http.url", + "monitor.status", + "http.response.status", + "monitor.duration.us", + "tcp.rtt.connect.us", + "tls.rtt.handshake.us", + "resolve.rtt.us", + "http.rtt.content.us" + ] +} \ No newline at end of file diff --git a/heartbeat/_meta/kibana/search/c49bd160-eb17-11e6-be20-559646f8b9ba.json b/heartbeat/_meta/kibana/search/c49bd160-eb17-11e6-be20-559646f8b9ba.json deleted file mode 100644 index bb7b085dc25..00000000000 --- a/heartbeat/_meta/kibana/search/c49bd160-eb17-11e6-be20-559646f8b9ba.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "sort": [ - "@timestamp", - "desc" - ], - "hits": 0, - "description": "", - "title": "Heartbeat HTTP pings", - "version": 1, - "kibanaSavedObjectMeta": { - "searchSourceJSON": "{\"index\":\"heartbeat-*\",\"query\":{\"query_string\":{\"analyze_wildcard\":true,\"query\":\"*\"}},\"filter\":[{\"meta\":{\"negate\":false,\"index\":\"heartbeat-*\",\"key\":\"type\",\"value\":\"http\",\"disabled\":false,\"alias\":null},\"query\":{\"match\":{\"type\":{\"query\":\"http\",\"type\":\"phrase\"}}},\"$state\":{\"store\":\"appState\"}}],\"highlight\":{\"pre_tags\":[\"@kibana-highlighted-field@\"],\"post_tags\":[\"@/kibana-highlighted-field@\"],\"fields\":{\"*\":{}},\"require_field_match\":false,\"fragment_size\":2147483647}}" - }, - "columns": [ - "monitor", - "up", - "response.status", - "duration.us", - "tcp_connect_rtt.us", - "tls_handshake_rtt.us", - "resolve_rtt.us", - "http_rtt.us" - ] -} \ No newline at end of file diff --git a/heartbeat/_meta/kibana/visualization/091c3a90-eb1e-11e6-be20-559646f8b9ba.json b/heartbeat/_meta/kibana/visualization/091c3a90-eb1e-11e6-be20-559646f8b9ba.json index e953f3f9a7f..dec5f466c78 100644 --- a/heartbeat/_meta/kibana/visualization/091c3a90-eb1e-11e6-be20-559646f8b9ba.json +++ b/heartbeat/_meta/kibana/visualization/091c3a90-eb1e-11e6-be20-559646f8b9ba.json @@ -1,11 +1,11 @@ { - "visState": "{\"title\":\"HTTP up status\",\"type\":\"area\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"scale\":\"linear\",\"interpolate\":\"linear\",\"mode\":\"percentage\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{}}},{\"id\":\"3\",\"enabled\":true,\"type\":\"filters\",\"schema\":\"group\",\"params\":{\"filters\":[{\"input\":{\"query\":{\"query_string\":{\"query\":\"up:true\",\"analyze_wildcard\":true}}},\"label\":\"\"},{\"input\":{\"query\":{\"query_string\":{\"query\":\"up:false\",\"analyze_wildcard\":true}}}}]}}],\"listeners\":{}}", + "visState": "{\n \"title\": \"HTTP up status\",\n \"type\": \"area\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": true,\n \"legendPosition\": \"right\",\n \"scale\": \"linear\",\n \"interpolate\": \"linear\",\n \"mode\": \"percentage\",\n \"times\": [],\n \"addTimeMarker\": false,\n \"defaultYExtents\": false,\n \"setYExtents\": true,\n \"yAxis\": {\n \"max\": 100,\n \"min\": 0\n }\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"date_histogram\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"@timestamp\",\n \"interval\": \"auto\",\n \"customInterval\": \"2h\",\n \"min_doc_count\": 1,\n \"extended_bounds\": {}\n }\n },\n {\n \"id\": \"3\",\n \"enabled\": true,\n \"type\": \"filters\",\n \"schema\": \"group\",\n \"params\": {\n \"filters\": [\n {\n \"input\": {\n \"query\": {\n \"query_string\": {\n \"query\": \"monitor.status: down\",\n \"analyze_wildcard\": true\n }\n }\n },\n \"label\": \"\"\n },\n {\n \"input\": {\n \"query\": {\n \"query_string\": {\n \"query\": \"monitor.status: up\",\n \"analyze_wildcard\": true\n }\n }\n }\n }\n ]\n }\n }\n ],\n \"listeners\": {}\n}", "description": "", "title": "HTTP up status", - "uiStateJSON": "{\"vis\":{\"colors\":{\"up:false\":\"#BF1B00\",\"up:true\":\"#629E51\"}}}", + "uiStateJSON": "{\n \"vis\": {\n \"colors\": {\n \"monitor.status: up\": \"#629E51\",\n \"monitor.status: down\": \"#E24D42\"\n }\n }\n}", "version": 1, - "savedSearchId": "c49bd160-eb17-11e6-be20-559646f8b9ba", + "savedSearchId": "02014c80-29d2-11e7-a68f-bfaa2341cc52", "kibanaSavedObjectMeta": { - "searchSourceJSON": "{\"filter\":[]}" + "searchSourceJSON": "{\n \"filter\": []\n}" } } \ No newline at end of file diff --git a/heartbeat/_meta/kibana/visualization/0f4c0560-eb20-11e6-9f11-159ff202874a.json b/heartbeat/_meta/kibana/visualization/0f4c0560-eb20-11e6-9f11-159ff202874a.json index 087f16bacbe..f9942e48916 100644 --- a/heartbeat/_meta/kibana/visualization/0f4c0560-eb20-11e6-9f11-159ff202874a.json +++ b/heartbeat/_meta/kibana/visualization/0f4c0560-eb20-11e6-9f11-159ff202874a.json @@ -1,11 +1,11 @@ { - "visState": "{\"title\":\"HTTP duration heatmap\",\"type\":\"heatmap\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"enableHover\":false,\"legendPosition\":\"right\",\"times\":[],\"colorsNumber\":10,\"colorSchema\":\"Blues\",\"setColorRange\":false,\"colorsRange\":[],\"invertColors\":false,\"percentageMode\":false,\"valueAxes\":[{\"show\":false,\"id\":\"ValueAxis-1\",\"type\":\"value\",\"scale\":{\"type\":\"linear\",\"defaultYExtents\":false},\"labels\":{\"show\":false,\"rotate\":0,\"color\":\"#555\"}}]},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{}}},{\"id\":\"3\",\"enabled\":true,\"type\":\"histogram\",\"schema\":\"group\",\"params\":{\"field\":\"duration.us\",\"interval\":50000,\"extended_bounds\":{}}}],\"listeners\":{}}", + "visState": "{\n \"title\": \"HTTP duration heatmap\",\n \"type\": \"heatmap\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": true,\n \"enableHover\": false,\n \"legendPosition\": \"right\",\n \"times\": [],\n \"colorsNumber\": 10,\n \"colorSchema\": \"Blues\",\n \"setColorRange\": false,\n \"colorsRange\": [],\n \"invertColors\": false,\n \"percentageMode\": false,\n \"valueAxes\": [\n {\n \"show\": false,\n \"id\": \"ValueAxis-1\",\n \"type\": \"value\",\n \"scale\": {\n \"type\": \"linear\",\n \"defaultYExtents\": false\n },\n \"labels\": {\n \"show\": false,\n \"rotate\": 0,\n \"color\": \"#555\"\n }\n }\n ]\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"count\",\n \"schema\": \"metric\",\n \"params\": {}\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"date_histogram\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"@timestamp\",\n \"interval\": \"auto\",\n \"customInterval\": \"2h\",\n \"min_doc_count\": 1,\n \"extended_bounds\": {}\n }\n },\n {\n \"id\": \"3\",\n \"enabled\": true,\n \"type\": \"histogram\",\n \"schema\": \"group\",\n \"params\": {\n \"field\": \"monitor.duration.us\",\n \"interval\": 50000,\n \"extended_bounds\": {}\n }\n }\n ],\n \"listeners\": {}\n}", "description": "", "title": "HTTP duration heatmap", - "uiStateJSON": "{\"vis\":{\"defaultColors\":{\"0 - 2\":\"rgb(247,251,255)\",\"2 - 3\":\"rgb(227,238,249)\",\"3 - 4\":\"rgb(208,225,242)\",\"4 - 5\":\"rgb(182,212,233)\",\"5 - 6\":\"rgb(148,196,223)\",\"6 - 8\":\"rgb(107,174,214)\",\"8 - 9\":\"rgb(74,152,201)\",\"9 - 10\":\"rgb(46,126,188)\",\"10 - 11\":\"rgb(23,100,171)\",\"11 - 12\":\"rgb(8,74,145)\"}}}", + "uiStateJSON": "{\n \"vis\": {\n \"defaultColors\": {\n \"0 - 2\": \"rgb(247,251,255)\",\n \"2 - 3\": \"rgb(227,238,249)\",\n \"3 - 4\": \"rgb(208,225,242)\",\n \"4 - 5\": \"rgb(182,212,233)\",\n \"5 - 6\": \"rgb(148,196,223)\",\n \"6 - 8\": \"rgb(107,174,214)\",\n \"8 - 9\": \"rgb(74,152,201)\",\n \"9 - 10\": \"rgb(46,126,188)\",\n \"10 - 11\": \"rgb(23,100,171)\",\n \"11 - 12\": \"rgb(8,74,145)\"\n }\n }\n}", "version": 1, - "savedSearchId": "c49bd160-eb17-11e6-be20-559646f8b9ba", + "savedSearchId": "02014c80-29d2-11e7-a68f-bfaa2341cc52", "kibanaSavedObjectMeta": { - "searchSourceJSON": "{\"filter\":[]}" + "searchSourceJSON": "{\n \"filter\": []\n}" } } \ No newline at end of file diff --git a/heartbeat/_meta/kibana/visualization/1738dbc0-eb1d-11e6-be20-559646f8b9ba.json b/heartbeat/_meta/kibana/visualization/1738dbc0-eb1d-11e6-be20-559646f8b9ba.json index dca9fa71bff..f6fc8d18e27 100644 --- a/heartbeat/_meta/kibana/visualization/1738dbc0-eb1d-11e6-be20-559646f8b9ba.json +++ b/heartbeat/_meta/kibana/visualization/1738dbc0-eb1d-11e6-be20-559646f8b9ba.json @@ -1,11 +1,11 @@ { - "visState": "{\"title\":\"HTTP monitors\",\"type\":\"table\",\"params\":{\"perPage\":10,\"showPartialRows\":false,\"showMeticsAtAllLevels\":false,\"sort\":{\"columnIndex\":null,\"direction\":null},\"showTotal\":false,\"totalFunc\":\"sum\"},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"max\",\"schema\":\"metric\",\"params\":{\"field\":\"duration.us\"}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"bucket\",\"params\":{\"field\":\"monitor\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}},{\"id\":\"5\",\"enabled\":true,\"type\":\"max\",\"schema\":\"metric\",\"params\":{\"field\":\"resolve_rtt.us\"}},{\"id\":\"6\",\"enabled\":true,\"type\":\"max\",\"schema\":\"metric\",\"params\":{\"field\":\"tcp_connect_rtt.us\"}},{\"id\":\"7\",\"enabled\":true,\"type\":\"max\",\"schema\":\"metric\",\"params\":{\"field\":\"tls_handshake_rtt.us\"}},{\"id\":\"8\",\"enabled\":true,\"type\":\"max\",\"schema\":\"metric\",\"params\":{\"field\":\"http_rtt.us\"}}],\"listeners\":{}}", + "visState": "{\n \"title\": \"HTTP monitors\",\n \"type\": \"table\",\n \"params\": {\n \"perPage\": 10,\n \"showPartialRows\": false,\n \"showMeticsAtAllLevels\": false,\n \"sort\": {\n \"columnIndex\": null,\n \"direction\": null\n },\n \"showTotal\": false,\n \"totalFunc\": \"sum\"\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"max\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"monitor.duration.us\"\n }\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"terms\",\n \"schema\": \"bucket\",\n \"params\": {\n \"field\": \"monitor.id\",\n \"size\": 5,\n \"order\": \"desc\",\n \"orderBy\": \"1\"\n }\n },\n {\n \"id\": \"5\",\n \"enabled\": true,\n \"type\": \"max\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"resolve.rtt.us\"\n }\n },\n {\n \"id\": \"6\",\n \"enabled\": true,\n \"type\": \"max\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"tcp.rtt.connect.us\"\n }\n },\n {\n \"id\": \"7\",\n \"enabled\": true,\n \"type\": \"max\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"tls.rtt.handshake.us\"\n }\n },\n {\n \"id\": \"8\",\n \"enabled\": true,\n \"type\": \"max\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"http.rtt.response_header.us\"\n }\n }\n ],\n \"listeners\": {}\n}", "description": "", "title": "HTTP monitors", - "uiStateJSON": "{\"vis\":{\"params\":{\"sort\":{\"columnIndex\":null,\"direction\":null}}}}", + "uiStateJSON": "{\n \"vis\": {\n \"params\": {\n \"sort\": {\n \"columnIndex\": null,\n \"direction\": null\n }\n }\n }\n}", "version": 1, - "savedSearchId": "c49bd160-eb17-11e6-be20-559646f8b9ba", + "savedSearchId": "02014c80-29d2-11e7-a68f-bfaa2341cc52", "kibanaSavedObjectMeta": { - "searchSourceJSON": "{\"filter\":[]}" + "searchSourceJSON": "{\n \"filter\": []\n}" } } \ No newline at end of file diff --git a/heartbeat/_meta/kibana/visualization/920e8140-eb1a-11e6-be20-559646f8b9ba.json b/heartbeat/_meta/kibana/visualization/920e8140-eb1a-11e6-be20-559646f8b9ba.json index a5b25108c21..4b9caec1d3e 100644 --- a/heartbeat/_meta/kibana/visualization/920e8140-eb1a-11e6-be20-559646f8b9ba.json +++ b/heartbeat/_meta/kibana/visualization/920e8140-eb1a-11e6-be20-559646f8b9ba.json @@ -1,11 +1,11 @@ { - "visState": "{\"title\":\"HTTP monitors status\",\"type\":\"pie\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"bottom\",\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"cardinality\",\"schema\":\"metric\",\"params\":{\"field\":\"monitor\"}},{\"id\":\"3\",\"enabled\":true,\"type\":\"filters\",\"schema\":\"segment\",\"params\":{\"filters\":[{\"input\":{\"query\":{\"query_string\":{\"query\":\"up: true\",\"analyze_wildcard\":true}}},\"label\":\"\"},{\"input\":{\"query\":{\"query_string\":{\"query\":\"up: false\",\"analyze_wildcard\":true}}}}]}},{\"id\":\"2\",\"enabled\":true,\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"response.status\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}", + "visState": "{\n \"title\": \"HTTP monitors status\",\n \"type\": \"pie\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": true,\n \"legendPosition\": \"bottom\",\n \"isDonut\": false\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"cardinality\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"monitor.id\"\n }\n },\n {\n \"id\": \"3\",\n \"enabled\": true,\n \"type\": \"filters\",\n \"schema\": \"segment\",\n \"params\": {\n \"filters\": [\n {\n \"input\": {\n \"query\": {\n \"query_string\": {\n \"query\": \"monitor.status: up\",\n \"analyze_wildcard\": true\n }\n }\n },\n \"label\": \"\"\n },\n {\n \"input\": {\n \"query\": {\n \"query_string\": {\n \"query\": \"monitor.status: down\",\n \"analyze_wildcard\": true\n }\n }\n }\n }\n ]\n }\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"terms\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"http.response.status\",\n \"size\": 5,\n \"order\": \"desc\",\n \"orderBy\": \"1\"\n }\n }\n ],\n \"listeners\": {}\n}", "description": "", "title": "HTTP monitors status", - "uiStateJSON": "{\"vis\":{\"colors\":{\"200\":\"#B7DBAB\",\"up: true\":\"#629E51\",\"up: false\":\"#E24D42\"},\"legendOpen\":true}}", + "uiStateJSON": "{\n \"vis\": {\n \"colors\": {\n \"200\": \"#B7DBAB\",\n \"monitor.status: up\": \"#629E51\",\n \"monitor.status: down\": \"#E24D42\"\n },\n \"legendOpen\": true\n }\n}", "version": 1, - "savedSearchId": "c49bd160-eb17-11e6-be20-559646f8b9ba", + "savedSearchId": "02014c80-29d2-11e7-a68f-bfaa2341cc52", "kibanaSavedObjectMeta": { - "searchSourceJSON": "{\"filter\":[]}" + "searchSourceJSON": "{\n \"filter\": []\n}" } } \ No newline at end of file diff --git a/heartbeat/_meta/kibana/visualization/c65ef340-eb19-11e6-be20-559646f8b9ba.json b/heartbeat/_meta/kibana/visualization/c65ef340-eb19-11e6-be20-559646f8b9ba.json index d62474012ac..4bc9b3c19d1 100644 --- a/heartbeat/_meta/kibana/visualization/c65ef340-eb19-11e6-be20-559646f8b9ba.json +++ b/heartbeat/_meta/kibana/visualization/c65ef340-eb19-11e6-be20-559646f8b9ba.json @@ -1,11 +1,11 @@ { - "visState": "{\"title\":\"HTTP ping times\",\"type\":\"area\",\"params\":{\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"scale\":\"linear\",\"interpolate\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"max\",\"schema\":\"metric\",\"params\":{\"field\":\"resolve_rtt.us\",\"customLabel\":\"\"}},{\"id\":\"3\",\"enabled\":true,\"type\":\"max\",\"schema\":\"metric\",\"params\":{\"field\":\"tcp_connect_rtt.us\"}},{\"id\":\"5\",\"enabled\":true,\"type\":\"max\",\"schema\":\"metric\",\"params\":{\"field\":\"tls_handshake_rtt.us\"}},{\"id\":\"4\",\"enabled\":true,\"type\":\"max\",\"schema\":\"metric\",\"params\":{\"field\":\"http_rtt.us\"}},{\"id\":\"2\",\"enabled\":true,\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{}}}],\"listeners\":{}}", + "visState": "{\n \"title\": \"HTTP ping times\",\n \"type\": \"area\",\n \"params\": {\n \"addTooltip\": true,\n \"addLegend\": true,\n \"legendPosition\": \"right\",\n \"scale\": \"linear\",\n \"interpolate\": \"linear\",\n \"mode\": \"stacked\",\n \"times\": [],\n \"addTimeMarker\": false,\n \"defaultYExtents\": false,\n \"setYExtents\": false\n },\n \"aggs\": [\n {\n \"id\": \"1\",\n \"enabled\": true,\n \"type\": \"max\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"resolve.rtt.us\",\n \"customLabel\": \"\"\n }\n },\n {\n \"id\": \"3\",\n \"enabled\": true,\n \"type\": \"max\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"tcp.rtt.connect.us\"\n }\n },\n {\n \"id\": \"5\",\n \"enabled\": true,\n \"type\": \"max\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"tls.rtt.handshake.us\"\n }\n },\n {\n \"id\": \"4\",\n \"enabled\": true,\n \"type\": \"max\",\n \"schema\": \"metric\",\n \"params\": {\n \"field\": \"http.rtt.response_header.us\"\n }\n },\n {\n \"id\": \"2\",\n \"enabled\": true,\n \"type\": \"date_histogram\",\n \"schema\": \"segment\",\n \"params\": {\n \"field\": \"@timestamp\",\n \"interval\": \"auto\",\n \"customInterval\": \"2h\",\n \"min_doc_count\": 1,\n \"extended_bounds\": {}\n }\n }\n ],\n \"listeners\": {}\n}", "description": "", "title": "HTTP ping times", "uiStateJSON": "{}", "version": 1, - "savedSearchId": "c49bd160-eb17-11e6-be20-559646f8b9ba", + "savedSearchId": "02014c80-29d2-11e7-a68f-bfaa2341cc52", "kibanaSavedObjectMeta": { - "searchSourceJSON": "{\"filter\":[]}" + "searchSourceJSON": "{\n \"filter\": []\n}" } } \ No newline at end of file diff --git a/heartbeat/beater/manager.go b/heartbeat/beater/manager.go index 489d8b1aa1f..90804087dac 100644 --- a/heartbeat/beater/manager.go +++ b/heartbeat/beater/manager.go @@ -36,6 +36,8 @@ type Monitor struct { } type MonitorTask struct { + name, typ string + job monitors.Job schedule scheduler.Schedule cancel JobCanceller @@ -49,6 +51,8 @@ type JobCanceller func() error var defaultFilePollInterval = 5 * time.Second +const defaultEventType = "monitor" + func newMonitorManager( client publisher.Client, jobControl JobControl, @@ -151,6 +155,8 @@ func (m *Monitor) Update(configs []*common.Config) error { } shared := struct { + Name string `config:"name"` + Type string `config:"type"` Schedule *schedule.Schedule `config:"schedule" validate:"required"` }{} if err := config.Unpack(&shared); err != nil { @@ -164,8 +170,15 @@ func (m *Monitor) Update(configs []*common.Config) error { return err } + name := shared.Name + if name == "" { + name = shared.Type + } + for _, job := range jobs { all[job.Name()] = MonitorTask{ + name: name, + typ: shared.Type, job: job, schedule: shared.Schedule, } @@ -179,10 +192,10 @@ func (m *Monitor) Update(configs []*common.Config) error { m.active = map[string]MonitorTask{} // start new and reconfigured tasks - for name, t := range all { - job := createJob(m.manager.client, name, t.job) - t.cancel = m.manager.jobControl.Add(t.schedule, name, job) - m.active[name] = t + for id, t := range all { + job := createJob(m.manager.client, t.job, t.name, t.typ) + t.cancel = m.manager.jobControl.Add(t.schedule, id, job) + m.active[id] = t } return nil @@ -221,19 +234,11 @@ func createWatchUpdater(monitor *Monitor) func(content []byte) { } } -func createJob( - client publisher.Client, - name string, - r monitors.Job, -) scheduler.TaskFunc { - return createJobTask(client, name, r) +func createJob(client publisher.Client, r monitors.Job, name, typ string) scheduler.TaskFunc { + return createJobTask(client, r, name, typ) } -func createJobTask( - client publisher.Client, - name string, - r monitors.TaskRunner, -) scheduler.TaskFunc { +func createJobTask(client publisher.Client, r monitors.TaskRunner, name, typ string) scheduler.TaskFunc { return func() []scheduler.TaskFunc { event, next, err := r.Run() if err != nil { @@ -241,7 +246,16 @@ func createJobTask( } if event != nil { - event["monitor"] = name + event.DeepUpdate(common.MapStr{ + "monitor": common.MapStr{ + "name": name, + "type": typ, + }, + }) + + if _, exists := event["type"]; !exists { + event["type"] = defaultEventType + } client.PublishEvent(event) } @@ -251,7 +265,7 @@ func createJobTask( cont := make([]scheduler.TaskFunc, len(next)) for i, n := range next { - cont[i] = createJobTask(client, name, n) + cont[i] = createJobTask(client, n, name, typ) } return cont } diff --git a/heartbeat/docs/fields.asciidoc b/heartbeat/docs/fields.asciidoc index b64cdbd166a..8a3ced1467f 100644 --- a/heartbeat/docs/fields.asciidoc +++ b/heartbeat/docs/fields.asciidoc @@ -15,7 +15,13 @@ grouped in the following categories: * <> * <> * <> +* <> +* <> * <> +* <> +* <> +* <> +* <> -- [[exported-fields-beat]] @@ -160,63 +166,73 @@ Region in which this host is running. [[exported-fields-common]] -== Common monitoring fields Fields +== Common Heartbeat Monitor Fields None [float] -=== type +== monitor Fields -type: keyword +Common monitor fields. -required: True + + +[float] +=== monitor.type + +type: keyword The monitor type. [float] -=== monitor +=== monitor.name type: keyword -Monitor job name. +The monitors configured name [float] -=== scheme +=== monitor.id type: keyword -Address url scheme. For example `tcp`, `ssl`, `http`, and `https`. +The monitors full job ID as used by heartbeat. [float] -=== host +== duration Fields -type: keyword +total monitoring test duration -Hostname of service being monitored. Can be missing, if service is monitored by IP. +[float] +=== monitor.duration.us + +type: long + +Duration in microseconds [float] -=== port +=== monitor.scheme -type: integer +type: keyword -Service port number. +Address url scheme. For example `tcp`, `tls`, `http`, and `https`. [float] -=== url +=== monitor.host -type: text +type: keyword -Service url used by monitor. +Hostname of service being monitored. Can be missing, if service is monitored by IP. [float] -=== ip +=== monitor.ip type: ip @@ -224,138 +240,175 @@ IP of service being monitored. If service is monitored by hostname, the `ip` fie [float] -== duration Fields +=== monitor.status -total monitoring test duration +type: keyword + +required: True + +Indicator if monitor could validate the service to be available. + + +[[exported-fields-http]] +== HTTP Monitor Fields + +None [float] -=== duration.us +== http Fields + +HTTP related fields. -type: long -Duration in microseconds [float] -== resolve_rtt Fields +=== http.url -Duration required to resolve an IP from hostname. +type: text + +Service url used by monitor. [float] -=== resolve_rtt.us +== response Fields + +Service response parameters. -type: long -Duration in microseconds [float] -== icmp_rtt Fields +=== http.response.status -ICMP Echo Request and Reply round trip time +type: integer + +Response status code. [float] -=== icmp_rtt.us +== rtt Fields + +HTTP layer round trip times. -type: long -Duration in microseconds [float] -== tcp_connect_rtt Fields +== validate Fields -Duration required to establish a TCP connection based on already available IP address. +Duration between first byte of HTTP request being written and +response being processed by validator. Duration based on already +available network connection. + +Note: if validator is not reading body or only a prefix, this + number does not fully represent the total time needed + to read the body. [float] -=== tcp_connect_rtt.us +=== http.rtt.validate.us type: long Duration in microseconds [float] -== socks5_connect_rtt Fields +== validate_body Fields -Time required to establish a connection via SOCKS5 to endpoint based on available connection to SOCKS5 proxy. +Duration of validator required to read and validate the response +body. + +Note: if validator is not reading body or only a prefix, this + number does not fully represent the total time needed + to read the body. [float] -=== socks5_connect_rtt.us +=== http.rtt.validate_body.us type: long Duration in microseconds [float] -== tls_handshake_rtt Fields - -Time required to finish TLS handshake based on already available network connection. +== write_request Fields +Duration of sending the complete HTTP request. Duration based on already available network connection. [float] -=== tls_handshake_rtt.us +=== http.rtt.write_request.us type: long Duration in microseconds [float] -== http_rtt Fields - -Time required between sending the HTTP request and first by from HTTP response being read. Duration based on already available network connection. +== response_header Fields +Time required between sending the start of sending the HTTP request and first by from HTTP response being read. Duration based on already available network connection. [float] -=== http_rtt.us +=== http.rtt.response_header.us type: long Duration in microseconds [float] -== validate_rtt Fields +== total Fields + +Duration required to process the HTTP transaction. Starts with +the initial TCP connection attempt. Ends with after validator +did check the response. -Time required for validating the connection if connection checks are configured. +Note: if validator is not reading body or only a prefix, this + number does not fully represent the total time needed. [float] -=== validate_rtt.us +=== http.rtt.total.us type: long Duration in microseconds +[[exported-fields-icmp]] +== ICMP Fields + +None + + [float] -== response Fields +== icmp Fields -Service response parameters. +IP ping fields. [float] -=== response.status +=== icmp.requests type: integer -Response status code. +Number if ICMP EchoRequests send. [float] -=== up +== rtt Fields -type: boolean +ICMP Echo Request and Reply round trip time -required: True -Boolean indicator if monitor could validate the service to be available. +[float] +=== icmp.rtt.us + +type: long +Duration in microseconds [[exported-fields-kubernetes]] == Kubernetes info Fields @@ -404,3 +457,169 @@ type: keyword Kubernetes container name +[[exported-fields-resolve]] +== Host Lookup Fields + +None + + +[float] +== resolve Fields + +Host lookup fields. + + + +[float] +=== resolve.host + +type: keyword + +Hostname of service being monitored. + + +[float] +=== resolve.ip + +type: ip + +IP address found for the given host. + + +[float] +== rtt Fields + +Duration required to resolve an IP from hostname. + + +[float] +=== resolve.rtt.us + +type: long + +Duration in microseconds + +[[exported-fields-socks5]] +== SOCKS5 Proxy Fields + +None + + +[float] +== socks5 Fields + +SOCKS5 proxy related fields: + + + +[float] +== rtt Fields + +TLS layer round trip times. + + + +[float] +== connect Fields + +Time required to establish a connection via SOCKS5 to endpoint based on available connection to SOCKS5 proxy. + + + +[float] +=== socks5.rtt.connect.us + +type: long + +Duration in microseconds + +[[exported-fields-tcp]] +== TCP Layer Fields + +None + + +[float] +== tcp Fields + +TCP network layer related fields. + + + +[float] +=== tcp.port + +type: integer + +Service port number. + + +[float] +== rtt Fields + +TCP layer round trip times. + + + +[float] +== connect Fields + +Duration required to establish a TCP connection based on already available IP address. + + + +[float] +=== tcp.rtt.connect.us + +type: long + +Duration in microseconds + +[float] +== validate Fields + +Duration of validation step based on existing TCP connection. + + + +[float] +=== tcp.rtt.validate.us + +type: long + +Duration in microseconds + +[[exported-fields-tls]] +== TLS Encryption Layer Fields + +None + + +[float] +== tls Fields + +TLS layer related fields. + + + +[float] +== rtt Fields + +TLS layer round trip times. + + + +[float] +== handshake Fields + +Time required to finish TLS handshake based on already available network connection. + + + +[float] +=== tls.rtt.handshake.us + +type: long + +Duration in microseconds + diff --git a/heartbeat/look/look.go b/heartbeat/look/look.go index c9e64d23f04..0fd0e4a3c3d 100644 --- a/heartbeat/look/look.go +++ b/heartbeat/look/look.go @@ -1,5 +1,5 @@ // Package look defines common formatters for fields/types to be used when -// generating custom events. +// generating heartbeat events. package look import ( @@ -10,12 +10,19 @@ import ( "github.com/elastic/beats/heartbeat/reason" ) +// RTT formats a round-trip-time given as time.Duration into an +// event field. The duration is stored in `{"us": rtt}`. func RTT(rtt time.Duration) common.MapStr { + if rtt < 0 { + rtt = 0 + } + return common.MapStr{ "us": rtt / (time.Microsecond / time.Nanosecond), } } +// Reason formats an error into an error event field. func Reason(err error) common.MapStr { if r, ok := err.(reason.Reason); ok { return reason.Fail(r) @@ -23,6 +30,16 @@ func Reason(err error) common.MapStr { return reason.FailIO(err) } +// Timestamp converts an event timestamp into an compatible event timestamp for +// reporting. func Timestamp(t time.Time) common.Time { return common.Time(t) } + +// Status creates a service status message from an error value. +func Status(err error) string { + if err == nil { + return "up" + } + return "down" +} diff --git a/heartbeat/monitors/active/dialchain/dialchain.go b/heartbeat/monitors/active/dialchain/dialchain.go index 096ed6922f7..ceeab0a4a60 100644 --- a/heartbeat/monitors/active/dialchain/dialchain.go +++ b/heartbeat/monitors/active/dialchain/dialchain.go @@ -1,43 +1,31 @@ package dialchain import ( - "fmt" - "net" - "time" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs/transport" - - "github.com/elastic/beats/heartbeat/look" ) -type DialLayerCallback interface { - Start() - Done(err error) -} - -type MeasureLayerRTTCB struct { - Callback MeasureCallback - start time.Time -} - -type MeasureCallback func(start, end time.Time) - +// DialerChain composes builders for multiple network layers, used to build +// the final transport.Dialer object based on the network layers. +// Each layer can hold individual configurations. Use 'Clone' to copy and replace/wrap +// layers at will. +// Once all Layers have been prepared, use Build to build a transport.Dialer that can +// used with any go library network packages relying on standard library based dialers. +// +// For Additional Layering capabilities, DialerChain implements the NetDialer interface. type DialerChain struct { Net NetDialer Layers []Layer } -type NetDialer struct { - Name string - Dialer transport.Dialer -} +// NetDialer provides the most low-level network layer for setting up a network +// connection. NetDialer objects do not support wrapping any lower network layers. +type NetDialer func(common.MapStr) (transport.Dialer, error) -type Layer struct { - Name string - Builder func(transport.Dialer) (transport.Dialer, error) -} +// Layer is a configured network layer, wrapping any lower-level network layers. +type Layer func(common.MapStr, transport.Dialer) (transport.Dialer, error) +// Clone create a shallow copy of c. func (c *DialerChain) Clone() *DialerChain { d := &DialerChain{ Net: c.Net, @@ -47,141 +35,39 @@ func (c *DialerChain) Clone() *DialerChain { return d } -func (c *DialerChain) BuildWith(makeCB func(string) DialLayerCallback) (d transport.Dialer, err error) { - d = LayerCBDialer(makeCB(c.Net.Name), c.Net.Dialer) - for _, layer := range c.Layers { - if d, err = LayerDeltaCBDialer(makeCB(layer.Name), d, layer.Builder); err != nil { - return nil, err - } +// Build create a new transport.Dialer for use with other networking libraries. +func (c *DialerChain) Build(event common.MapStr) (d transport.Dialer, err error) { + d, err = c.Net.build(event) + if err != nil { + return } - return -} - -func (c *DialerChain) BuildWithMeasures(event common.MapStr) (transport.Dialer, error) { - return c.BuildWith(func(name string) DialLayerCallback { - return measureEventRTT(event, name) - }) -} -func (c *DialerChain) Build() (d transport.Dialer, err error) { - d = c.Net.Dialer for _, layer := range c.Layers { - if d, err = layer.Builder(d); err != nil { + if d, err = layer.build(event, d); err != nil { return nil, err } } return } -func (c *DialerChain) TestBuild() error { - _, err := c.Build() - return err -} - -func (c *DialerChain) DialWithMeasurements(network, host string) (fields common.MapStr, conn net.Conn, err error) { - var dialer transport.Dialer - fields = common.MapStr{} - if dialer, err = c.BuildWithMeasures(fields); err == nil { - conn, err = dialer.Dial(network, host) - } - return -} - -func (c *DialerChain) Dial(network, host string) (conn net.Conn, err error) { - var dialer transport.Dialer - if dialer, err = c.Build(); err == nil { - return dialer.Dial(network, host) - } - return -} - +// AddLayer adds another layer to the dialer chain. +// The layer being added is the new topmost network layer using the other +// already present layers on dial. func (c *DialerChain) AddLayer(l Layer) { c.Layers = append(c.Layers, l) } -func measureEventRTT(event common.MapStr, name string) DialLayerCallback { - return &MeasureLayerRTTCB{Callback: func(start, end time.Time) { - event[name] = look.RTT(end.Sub(start)) - }} -} - -func LayerCBDialer(cb DialLayerCallback, d transport.Dialer) transport.Dialer { - return transport.DialerFunc(func(network, address string) (net.Conn, error) { - cb.Start() - c, err := d.Dial(network, address) - cb.Done(err) - return c, err - }) -} - -func LayerDeltaCBDialer( - cb DialLayerCallback, - dialer transport.Dialer, - layer func(transport.Dialer) (transport.Dialer, error), -) (transport.DialerFunc, error) { - starter := transport.DialerFunc(func(network, address string) (net.Conn, error) { - c, err := dialer.Dial(network, address) - cb.Start() - return c, err - }) - - layerInstance, err := layer(starter) - if err != nil { - return nil, err - } - - return func(network, address string) (net.Conn, error) { - c, err := layerInstance.Dial(network, address) - cb.Done(err) - return c, err - }, nil -} - -func ConstAddrDialer(name, addr string, to time.Duration) NetDialer { - return NetDialer{name, transport.DialerFunc(func(network, _ string) (net.Conn, error) { - switch network { - case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6": - default: - return nil, fmt.Errorf("unsupported network type %v", network) - } - - dialer := &net.Dialer{Timeout: to} - return dialer.Dial(network, addr) - })} -} - -func ConstAddrLayer(addr string, l Layer) Layer { - return Layer{l.Name, func(d transport.Dialer) (transport.Dialer, error) { - forward, err := l.Builder(d) - if err != nil { - return nil, err - } - - return transport.DialerFunc(func(network, _ string) (net.Conn, error) { - return forward.Dial(network, addr) - }), nil - }} -} - -func TCPDialer(name string, to time.Duration) NetDialer { - return NetDialer{name, transport.NetDialer(to)} -} - -func UDPDialer(name string, to time.Duration) NetDialer { - return NetDialer{name, transport.NetDialer(to)} +// TestBuild tries to build the DialerChain and reports any error reported by +// one of the layers. +func (c *DialerChain) TestBuild() error { + _, err := c.Build(common.MapStr{}) + return err } -func SOCKS5Layer(name string, config *transport.ProxyConfig) Layer { - return Layer{name, func(d transport.Dialer) (transport.Dialer, error) { - return transport.ProxyDialer(config, d) - }} +func (d NetDialer) build(event common.MapStr) (transport.Dialer, error) { + return d(event) } -func TLSLayer(name string, config *transport.TLSConfig, timeout time.Duration) Layer { - return Layer{name, func(d transport.Dialer) (transport.Dialer, error) { - return transport.TLSDialer(d, config, timeout) - }} +func (l Layer) build(event common.MapStr, next transport.Dialer) (transport.Dialer, error) { + return l(event, next) } - -func (cb *MeasureLayerRTTCB) Start() { cb.start = time.Now() } -func (cb *MeasureLayerRTTCB) Done(_ error) { cb.Callback(cb.start, time.Now()) } diff --git a/heartbeat/monitors/active/dialchain/net.go b/heartbeat/monitors/active/dialchain/net.go new file mode 100644 index 00000000000..295ccf68f9c --- /dev/null +++ b/heartbeat/monitors/active/dialchain/net.go @@ -0,0 +1,94 @@ +package dialchain + +import ( + "fmt" + "net" + "time" + + "github.com/elastic/beats/heartbeat/look" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/transport" +) + +// TCPDialer creates a new NetDialer with constant event fields and default +// connection timeout. +// The fields parameter holds additonal constants to be added to the final event +// structure. +// +// The dialer will update the active events with: +// +// { +// "tcp": { +// "port": ..., +// "rtt": { "connect": { "us": ... }} +// } +// } +func TCPDialer(to time.Duration) NetDialer { + return netDialer(to) +} + +// UDPDialer creates a new NetDialer with constant event fields and default +// connection timeout. +// The fields parameter holds additonal constants to be added to the final event +// structure. +// +// The dialer will update the active events with: +// +// { +// "udp": { +// "port": ..., +// "rtt": { "connect": { "us": ... }} +// } +// } +func UDPDialer(to time.Duration) NetDialer { + return netDialer(to) +} + +func netDialer(timeout time.Duration) NetDialer { + return func(event common.MapStr) (transport.Dialer, error) { + return makeDialer(func(network, address string) (net.Conn, error) { + namespace := "" + + switch network { + case "tcp", "tcp4", "tcp6": + namespace = "tcp" + case "udp", "udp4", "udp6": + namespace = "udp" + default: + return nil, fmt.Errorf("unsupported network type %v", network) + } + + host, port, err := net.SplitHostPort(address) + if err != nil { + return nil, err + } + + addresses, err := net.LookupHost(host) + if err != nil { + logp.Warn(`DNS lookup failure "%s": %v`, host, err) + return nil, err + } + + // dial via host IP by randomized iteration of known IPs + dialer := &net.Dialer{Timeout: timeout} + + start := time.Now() + conn, err := transport.DialWith(dialer, network, host, addresses, port) + if err != nil { + return nil, err + } + + end := time.Now() + event.DeepUpdate(common.MapStr{ + namespace: common.MapStr{ + "rtt": common.MapStr{ + "connect": look.RTT(end.Sub(start)), + }, + }, + }) + + return conn, nil + }), nil + } +} diff --git a/heartbeat/monitors/active/dialchain/socks5.go b/heartbeat/monitors/active/dialchain/socks5.go new file mode 100644 index 00000000000..59023a855f9 --- /dev/null +++ b/heartbeat/monitors/active/dialchain/socks5.go @@ -0,0 +1,38 @@ +package dialchain + +import ( + "net" + + "github.com/elastic/beats/heartbeat/look" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs/transport" +) + +// SOCKS5Layer configures a SOCKS5 proxy layer in a DialerChain. +// +// The layer will update the active event with: +// +// { +// "socks5": { +// "rtt": { "connect": { "us": ... }} +// } +// } +func SOCKS5Layer(config *transport.ProxyConfig) Layer { + return func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) { + var timer timer + + dialer, err := transport.ProxyDialer(config, startTimerAfterDial(&timer, next)) + if err != nil { + return nil, err + } + + return afterDial(dialer, func(conn net.Conn) (net.Conn, error) { + // TODO: extract connection parameter from connection object? + // TODO: add proxy url to event? + + timer.stop() + event.Put("socks5.rtt.connect", look.RTT(timer.duration())) + return conn, nil + }), nil + } +} diff --git a/heartbeat/monitors/active/dialchain/tls.go b/heartbeat/monitors/active/dialchain/tls.go new file mode 100644 index 00000000000..e3354b5449c --- /dev/null +++ b/heartbeat/monitors/active/dialchain/tls.go @@ -0,0 +1,42 @@ +package dialchain + +import ( + "net" + "time" + + "github.com/elastic/beats/heartbeat/look" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs/transport" +) + +// TLSLayer configures the TLS layer in a DialerChain. +// +// The layer will update the active event with: +// +// { +// "tls": { +// "rtt": { "handshake": { "us": ... }} +// } +// } +func TLSLayer(cfg *transport.TLSConfig, to time.Duration) Layer { + return func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) { + var timer timer + + // Wrap next dialer so to start the timer when 'next' returns. + // This gets us the timestamp for when the TLS layer will start the handshake. + next = startTimerAfterDial(&timer, next) + + dialer, err := transport.TLSDialer(next, cfg, to) + if err != nil { + return nil, err + } + + return afterDial(dialer, func(conn net.Conn) (net.Conn, error) { + // TODO: extract TLS connection parameters from connection object. + + timer.stop() + event.Put("tls.rtt.handshake", look.RTT(timer.duration())) + return conn, nil + }), nil + } +} diff --git a/heartbeat/monitors/active/dialchain/util.go b/heartbeat/monitors/active/dialchain/util.go new file mode 100644 index 00000000000..b5ea4885dd8 --- /dev/null +++ b/heartbeat/monitors/active/dialchain/util.go @@ -0,0 +1,111 @@ +package dialchain + +import ( + "net" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs/transport" +) + +type timer struct { + s, e time.Time +} + +// IDLayer creates an empty placeholder layer. +func IDLayer() Layer { + return _idLayer +} + +var _idLayer = Layer(func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) { + return next, nil +}) + +// ConstAddrLayer introduces a network layer always passing a constant address +// to the underlying layer. +func ConstAddrLayer(address string) Layer { + build := constAddr(address) + + return func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) { + return build(next), nil + } +} + +// MakeConstAddrLayer always passes the same address to the original Layer. +// This is usefull if a lookup did return multiple IPs for the same hostname, +// but the IP use to connect shall be fixed. +func MakeConstAddrLayer(addr string, origLayer Layer) Layer { + return withLayerDialer(origLayer, constAddr(addr)) +} + +// MakeConstAddrDialer always passes the same address to the original NetDialer. +// This is usefull if a lookup did return multiple IPs for the same hostname, +// but the IP use to connect shall be fixed. +func MakeConstAddrDialer(addr string, origNet NetDialer) NetDialer { + return withNetDialer(origNet, constAddr(addr)) +} + +func (t *timer) start() { t.s = time.Now() } +func (t *timer) stop() { t.e = time.Now() } +func (t *timer) duration() time.Duration { return t.e.Sub(t.s) } + +// makeDialer aliases transport.DialerFunc +func makeDialer(fn func(network, address string) (net.Conn, error)) transport.Dialer { + return transport.DialerFunc(fn) +} + +// beforeDial will always call fn before executing the underlying dialer. +// The callback must return the original or a new address to be used with +// the dialer. +func beforeDial(dialer transport.Dialer, fn func(string) string) transport.Dialer { + return makeDialer(func(network, address string) (net.Conn, error) { + address = fn(address) + return dialer.Dial(network, address) + }) +} + +// afterDial will run fn after the dialer did successfully return a connection. +func afterDial(dialer transport.Dialer, fn func(net.Conn) (net.Conn, error)) transport.Dialer { + return makeDialer(func(network, address string) (net.Conn, error) { + conn, err := dialer.Dial(network, address) + if err == nil { + conn, err = fn(conn) + } + return conn, err + }) +} + +func startTimerAfterDial(t *timer, dialer transport.Dialer) transport.Dialer { + return afterDial(dialer, func(c net.Conn) (net.Conn, error) { + t.start() + return c, nil + }) +} + +func constAddr(addr string) func(transport.Dialer) transport.Dialer { + return func(dialer transport.Dialer) transport.Dialer { + return beforeDial(dialer, func(_ string) string { + return addr + }) + } +} + +func withNetDialer(layer NetDialer, fn func(transport.Dialer) transport.Dialer) NetDialer { + return func(event common.MapStr) (transport.Dialer, error) { + origDialer, err := layer.build(event) + if err != nil { + return nil, err + } + return fn(origDialer), nil + } +} + +func withLayerDialer(layer Layer, fn func(transport.Dialer) transport.Dialer) Layer { + return func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) { + origDialer, err := layer.build(event, next) + if err != nil { + return nil, err + } + return fn(origDialer), nil + } +} diff --git a/heartbeat/monitors/active/http/simple_transp.go b/heartbeat/monitors/active/http/simple_transp.go index ca0a7d0fe34..4a97f997fd6 100644 --- a/heartbeat/monitors/active/http/simple_transp.go +++ b/heartbeat/monitors/active/http/simple_transp.go @@ -26,6 +26,7 @@ type SimpleTransport struct { DisableCompression bool OnStartWrite func() + OnEndWrite func() OnStartRead func() } @@ -125,6 +126,7 @@ func (t *SimpleTransport) writeRequest(conn net.Conn, req *http.Request) error { if err == nil { err = writer.Flush() } + t.sigEndWrite() return err } @@ -138,6 +140,7 @@ func (t *SimpleTransport) readResponse( if err != nil { return nil, err } + t.sigStartRead() if requestedGzip && resp.Header.Get("Content-Encoding") == gzipEncoding { @@ -160,14 +163,12 @@ func (t *SimpleTransport) readResponse( return resp, nil } -func (t *SimpleTransport) sigStartRead() { - if f := t.OnStartRead; f != nil { - f() - } -} +func (t *SimpleTransport) sigStartRead() { call(t.OnStartRead) } +func (t *SimpleTransport) sigStartWrite() { call(t.OnStartWrite) } +func (t *SimpleTransport) sigEndWrite() { call(t.OnEndWrite) } -func (t *SimpleTransport) sigStartWrite() { - if f := t.OnStartWrite; f != nil { +func call(f func()) { + if f != nil { f() } } diff --git a/heartbeat/monitors/active/http/task.go b/heartbeat/monitors/active/http/task.go index a22d1d8b6bd..f48ae831a70 100644 --- a/heartbeat/monitors/active/http/task.go +++ b/heartbeat/monitors/active/http/task.go @@ -47,19 +47,22 @@ func newHTTPMonitorHostJob( } timeout := config.Timeout - fields := common.MapStr{ - "scheme": request.URL.Scheme, - "host": hostname, - "port": port, - "url": request.URL.String(), - } - return monitors.MakeSimpleJob(jobName, typ, func() (common.MapStr, error) { - event, err := execPing(client, request, body, timeout, validator) - if event == nil { - event = common.MapStr{} - } - event.Update(fields) + settings := monitors.MakeJobSetting(jobName).WithFields(common.MapStr{ + "monitor": common.MapStr{ + "scheme": request.URL.Scheme, + "host": hostname, + }, + "http": common.MapStr{ + "url": request.URL.String(), + }, + "tcp": common.MapStr{ + "port": port, + }, + }) + + return monitors.MakeSimpleJob(settings, func() (common.MapStr, error) { + _, _, event, err := execPing(client, request, body, timeout, validator) return event, err }), nil } @@ -85,11 +88,21 @@ func newHTTPMonitorIPsJob( return nil, err } + settings := monitors.MakeHostJobSettings(jobName, hostname, config.Mode) + settings = settings.WithFields(common.MapStr{ + "monitor": common.MapStr{ + "scheme": req.URL.Scheme, + }, + "http": common.MapStr{ + "url": req.URL.String(), + }, + "tcp": common.MapStr{ + "port": port, + }, + }) + pingFactory := createPingFactory(config, hostname, port, tls, req, body, validator) - if ip := net.ParseIP(hostname); ip != nil { - return monitors.MakeByIPJob(jobName, typ, ip, pingFactory) - } - return monitors.MakeByHostJob(jobName, typ, hostname, config.Mode, pingFactory) + return monitors.MakeByHostJob(settings, pingFactory) } func createPingFactory( @@ -101,53 +114,61 @@ func createPingFactory( body []byte, validator RespCheck, ) func(*net.IPAddr) monitors.TaskRunner { - fields := common.MapStr{ - "scheme": request.URL.Scheme, - "port": port, - "url": request.URL.String(), - } - timeout := config.Timeout isTLS := request.URL.Scheme == "https" checkRedirect := makeCheckRedirect(config.MaxRedirects) - return monitors.MakePingIPFactory(fields, func(ip *net.IPAddr) (common.MapStr, error) { + return monitors.MakePingIPFactory(func(ip *net.IPAddr) (common.MapStr, error) { + event := common.MapStr{} addr := net.JoinHostPort(ip.String(), strconv.Itoa(int(port))) d := &dialchain.DialerChain{ - Net: dialchain.ConstAddrDialer("tcp_connect_rtt", addr, timeout), + Net: dialchain.MakeConstAddrDialer(addr, dialchain.TCPDialer(timeout)), } + + // TODO: add socks5 proxy? + if isTLS { - d.AddLayer(dialchain.TLSLayer("tls_handshake_rtt", tls, timeout)) + d.AddLayer(dialchain.TLSLayer(tls, timeout)) } - measures := common.MapStr{} - dialer, err := d.BuildWithMeasures(measures) + dialer, err := d.Build(event) if err != nil { return nil, err } - var httpStart, httpEnd time.Time + var ( + writeStart, readStart, writeEnd time.Time + ) client := &http.Client{ CheckRedirect: checkRedirect, Timeout: timeout, Transport: &SimpleTransport{ Dialer: dialer, - OnStartWrite: func() { httpStart = time.Now() }, - OnStartRead: func() { httpEnd = time.Now() }, + OnStartWrite: func() { writeStart = time.Now() }, + OnEndWrite: func() { writeEnd = time.Now() }, + OnStartRead: func() { readStart = time.Now() }, }, } - event, err := execPing(client, request, body, timeout, validator) - if event == nil { - event = measures - } else { - event.Update(measures) + _, end, result, err := execPing(client, request, body, timeout, validator) + event.DeepUpdate(result) + + if !readStart.IsZero() { + event.DeepUpdate(common.MapStr{ + "http": common.MapStr{ + "rtt": common.MapStr{ + "write_request": look.RTT(writeEnd.Sub(writeStart)), + "response_header": look.RTT(readStart.Sub(writeStart)), + }, + }, + }) } - - if !httpEnd.IsZero() { - event["http_rtt"] = look.RTT(httpEnd.Sub(httpStart)) + if !writeStart.IsZero() { + event.Put("http.rtt.validate", look.RTT(end.Sub(writeStart))) + event.Put("http.rtt.content", look.RTT(end.Sub(readStart))) } + return event, err }) } @@ -180,7 +201,7 @@ func execPing( body []byte, timeout time.Duration, validator func(*http.Response) error, -) (common.MapStr, reason.Reason) { +) (time.Time, time.Time, common.MapStr, reason.Reason) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -194,22 +215,27 @@ func execPing( resp, err := client.Do(req) end := time.Now() if err != nil { - return nil, reason.IOFailed(err) + return start, end, nil, reason.IOFailed(err) } defer resp.Body.Close() - if err := validator(resp); err != nil { - return nil, reason.ValidateFailed(err) - } + err = validator(resp) + end = time.Now() rtt := end.Sub(start) - event := common.MapStr{ + event := common.MapStr{"http": common.MapStr{ "response": common.MapStr{ "status": resp.StatusCode, }, - "rtt": look.RTT(rtt), + "rtt": common.MapStr{ + "total": look.RTT(rtt), + }, + }} + + if err != nil { + return start, end, event, reason.ValidateFailed(err) } - return event, nil + return start, end, event, nil } func splitHostnamePort(requ *http.Request) (string, uint16, error) { diff --git a/heartbeat/monitors/active/icmp/icmp.go b/heartbeat/monitors/active/icmp/icmp.go index b06d3c2ba42..2b85f6805f6 100644 --- a/heartbeat/monitors/active/icmp/icmp.go +++ b/heartbeat/monitors/active/icmp/icmp.go @@ -61,23 +61,17 @@ func create( return nil, err } - typ := config.Name network := config.Mode.Network() - pingFactory := monitors.MakePingIPFactory(nil, createPingIPFactory(&config)) + pingFactory := monitors.MakePingIPFactory(createPingIPFactory(&config)) for _, host := range config.Hosts { - ip := net.ParseIP(host) - if ip != nil { - name := fmt.Sprintf("icmp-ip@%v", ip.String()) - err := addJob(monitors.MakeByIPJob(name, typ, ip, pingFactory)) - if err != nil { - return nil, err - } - continue + jobName := fmt.Sprintf("icmp-%v-host-%v@%v", config.Name, network, host) + if ip := net.ParseIP(host); ip != nil { + jobName = fmt.Sprintf("icmp-%v-ip@%v", config.Name, ip.String()) } - name := fmt.Sprintf("%v-host-%v@%v", config.Name, network, host) - err := addJob(monitors.MakeByHostJob(name, typ, host, config.Mode, pingFactory)) + settings := monitors.MakeHostJobSettings(jobName, host, config.Mode) + err := addJob(monitors.MakeByHostJob(settings, pingFactory)) if err != nil { return nil, err } @@ -88,13 +82,14 @@ func create( func createPingIPFactory(config *Config) func(*net.IPAddr) (common.MapStr, error) { return func(ip *net.IPAddr) (common.MapStr, error) { - rtt, _, err := loop.ping(ip, config.Timeout, config.Wait) - if err != nil { - return nil, err + rtt, n, err := loop.ping(ip, config.Timeout, config.Wait) + + fields := common.MapStr{"requests": n} + if err == nil { + fields["rtt"] = look.RTT(rtt) } - return common.MapStr{ - "icmp_rtt": look.RTT(rtt), - }, nil + event := common.MapStr{"icmp": fields} + return event, err } } diff --git a/heartbeat/monitors/active/icmp/loop.go b/heartbeat/monitors/active/icmp/loop.go index e1ef007c961..8254260c7fe 100644 --- a/heartbeat/monitors/active/icmp/loop.go +++ b/heartbeat/monitors/active/icmp/loop.go @@ -253,7 +253,7 @@ func (l *icmpLoop) ping( } if !success { - return 0, 0, timeoutError{} + return 0, requests, timeoutError{} } return rtt, requests, nil } diff --git a/heartbeat/monitors/active/tcp/task.go b/heartbeat/monitors/active/tcp/task.go index f8f6ca41b3e..e600061af3b 100644 --- a/heartbeat/monitors/active/tcp/task.go +++ b/heartbeat/monitors/active/tcp/task.go @@ -31,19 +31,25 @@ func newTCPMonitorHostJob( return nil, err } - return monitors.MakeSimpleJob(jobName, typ, func() (common.MapStr, error) { - event := common.MapStr{ - "scheme": scheme, - "port": port, + settings := monitors.MakeJobSetting(jobName).WithFields(common.MapStr{ + "monitor": common.MapStr{ "host": host, - } - dialer, err := taskDialer.BuildWithMeasures(event) + "scheme": scheme, + }, + "tcp": common.MapStr{ + "port": port, + }, + }) + + return monitors.MakeSimpleJob(settings, func() (common.MapStr, error) { + event := common.MapStr{} + dialer, err := taskDialer.Build(event) if err != nil { return event, err } results, err := pingHost(dialer, pingAddr, timeout, validator) - event.Update(results) + event.DeepUpdate(results) return event, err }), nil } @@ -64,14 +70,16 @@ func newTCPMonitorIPsJob( return nil, err } - pingFactory := createPingFactory(dialerFactory, addr, timeout, validator) - if ip := net.ParseIP(addr.Host); ip != nil { - debugf("Make TCP by IP job: %v:%v", ip, addr.Ports) - return monitors.MakeByIPJob(jobName, typ, ip, pingFactory) - } + settings := monitors.MakeHostJobSettings(jobName, addr.Host, config.Mode) + settings = settings.WithFields(common.MapStr{ + "monitor": common.MapStr{ + "scheme": addr.Scheme, + }, + }) - debugf("Make TCP by Host job: %v:%v (mode=%#v)", addr.Host, addr.Ports, config.Mode) - return monitors.MakeByHostJob(jobName, typ, addr.Host, config.Mode, pingFactory) + debugf("Make TCP job: %v:%v", addr.Host, addr.Ports) + pingFactory := createPingFactory(dialerFactory, addr, timeout, validator) + return monitors.MakeByHostJob(settings, pingFactory) } func createPingFactory( @@ -80,21 +88,25 @@ func createPingFactory( timeout time.Duration, validator ConnCheck, ) func(*net.IPAddr) monitors.TaskRunner { - fields := common.MapStr{"scheme": addr.Scheme} - - return monitors.MakePingAllIPPortFactory(fields, addr.Ports, + return monitors.MakePingAllIPPortFactory(addr.Ports, func(ip *net.IPAddr, port uint16) (common.MapStr, error) { - host := net.JoinHostPort(ip.String(), strconv.Itoa(int(port))) + ipStr := ip.String() + host := net.JoinHostPort(ipStr, strconv.Itoa(int(port))) pingAddr := net.JoinHostPort(addr.Host, strconv.Itoa(int(port))) - event := common.MapStr{} - dialer, err := makeDialerChain(host).BuildWithMeasures(event) + event := common.MapStr{ + "tcp": common.MapStr{ + "port": port, + }, + } + + dialer, err := makeDialerChain(host).Build(event) if err != nil { return event, err } results, err := pingHost(dialer, pingAddr, timeout, validator) - event.Update(results) + event.DeepUpdate(results) return event, err }) } @@ -133,7 +145,11 @@ func pingHost( end := time.Now() event := common.MapStr{ - "validate_rtt": look.RTT(end.Sub(validateStart)), + "tcp": common.MapStr{ + "rtt": common.MapStr{ + "validate": look.RTT(end.Sub(validateStart)), + }, + }, } if err != nil { event["error"] = reason.FailValidate(err) @@ -169,13 +185,20 @@ func buildDialerChain( config *Config, ) (*dialchain.DialerChain, error) { d := &dialchain.DialerChain{ - Net: dialchain.TCPDialer("tcp_connect_rtt", config.Timeout), + Net: dialchain.TCPDialer(config.Timeout), } - if config.Socks5.URL != "" { - d.AddLayer(dialchain.SOCKS5Layer("socks5_connect_rtt", &config.Socks5)) + + withProxy := config.Socks5.URL != "" + if withProxy { + d.AddLayer(dialchain.SOCKS5Layer(&config.Socks5)) } + + // insert empty placeholder, so address can be replaced in dialer chain + // by replacing this placeholder dialer + d.AddLayer(dialchain.IDLayer()) + if isTLSAddr(scheme) { - d.AddLayer(dialchain.TLSLayer("tls_handshake_rtt", tls, config.Timeout)) + d.AddLayer(dialchain.TLSLayer(tls, config.Timeout)) } if err := d.TestBuild(); err != nil { @@ -195,16 +218,15 @@ func buildHostDialerChainFactory( } withProxy := config.Socks5.URL != "" - return func(addr string) *dialchain.DialerChain { - if withProxy { - d := template.Clone() - d.Layers[0] = dialchain.ConstAddrLayer(addr, d.Layers[0]) - return d - } + addrIndex := 0 + if withProxy { + addrIndex = 1 + } - return &dialchain.DialerChain{ - Net: dialchain.ConstAddrDialer("tcp_connect_rtt", addr, config.Timeout), - Layers: template.Layers, - } + return func(addr string) *dialchain.DialerChain { + // replace IDLayer placeholder in template with ConstAddrLayer + d := template.Clone() + d.Layers[addrIndex] = dialchain.ConstAddrLayer(addr) + return d }, nil } diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index 9045d6a6b71..0b4184da3ae 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "net" - "strconv" "time" "github.com/elastic/beats/libbeat/common" @@ -13,7 +12,7 @@ import ( ) type funcJob struct { - name, typ string + settings JobSettings funcTask } @@ -21,12 +20,31 @@ type funcTask struct { run func() (common.MapStr, []TaskRunner, error) } +// IPSettings provides common configuration settings for IP resolution and ping +// mode. type IPSettings struct { IPv4 bool `config:"ipv4"` IPv6 bool `config:"ipv6"` Mode PingMode `config:"mode"` } +// JobSettings configures a Job name and global fields to be added to every +// event. +type JobSettings struct { + Name string + Fields common.MapStr +} + +// HostJobSettings configures a Job including Host lookups and global fields to be added +// to every event. +type HostJobSettings struct { + Name string + Host string + IP IPSettings + Fields common.MapStr +} + +// PingMode enumeration for configuring `any` or `all` IPs pinging. type PingMode uint8 const ( @@ -35,12 +53,16 @@ const ( PingAll ) +// DefaultIPSettings provides an instance of default IPSettings to be copied +// when unpacking settings from a common.Config object. var DefaultIPSettings = IPSettings{ IPv4: true, IPv6: true, Mode: PingAny, } +// Network determines the Network type used for IP name resolution, based on the +// provided settings. func (s IPSettings) Network() string { switch { case s.IPv4 && !s.IPv6: @@ -53,23 +75,40 @@ func (s IPSettings) Network() string { return "" } -func MakeSimpleJob(name, typ string, f func() (common.MapStr, error)) Job { - return MakeJob(name, typ, func() (common.MapStr, []TaskRunner, error) { +// MakeSimpleJob creates a new Job from a callback function. The callback should +// return an valid event and can not create any sub-tasks to be executed after +// completion. +func MakeSimpleJob(settings JobSettings, f func() (common.MapStr, error)) Job { + return MakeJob(settings, func() (common.MapStr, []TaskRunner, error) { event, err := f() return event, nil, err }) } -func MakeJob(name, typ string, f func() (common.MapStr, []TaskRunner, error)) Job { - return &funcJob{name, typ, funcTask{func() (common.MapStr, []TaskRunner, error) { - return annotated(time.Now(), typ, f).Run() +// MakeJob create a new Job from a callback function. The callback can +// optionally return an event to be published and a set of derived sub-tasks to be +// scheduled. The sub-tasks will be run only once and removed from the scheduler +// after completion. +func MakeJob(settings JobSettings, f func() (common.MapStr, []TaskRunner, error)) Job { + settings.AddFields(common.MapStr{ + "monitor": common.MapStr{ + "id": settings.Name, + }, + }) + + return &funcJob{settings, funcTask{func() (common.MapStr, []TaskRunner, error) { + return annotated(settings, time.Now(), f).Run() }}} } +// MakeCont wraps a function into an executable TaskRunner. The task being generated +// can optionally return an event and/or sub-tasks. func MakeCont(f func() (common.MapStr, []TaskRunner, error)) TaskRunner { return funcTask{f} } +// MakeSimpleCont wraps a function into an executable TaskRunner. The task bein generated +// should return an event to be reported. func MakeSimpleCont(f func() (common.MapStr, error)) TaskRunner { return MakeCont(func() (common.MapStr, []TaskRunner, error) { event, err := f() @@ -77,44 +116,33 @@ func MakeSimpleCont(f func() (common.MapStr, error)) TaskRunner { }) } +// MakePingIPFactory creates a factory for building a Task from a new IP address. func MakePingIPFactory( - fields common.MapStr, f func(*net.IPAddr) (common.MapStr, error), ) func(*net.IPAddr) TaskRunner { return func(ip *net.IPAddr) TaskRunner { - r := MakeSimpleCont(func() (common.MapStr, error) { return f(ip) }) - if len(fields) > 0 { - r = WithFields(fields, r) - } - return r + return MakeSimpleCont(func() (common.MapStr, error) { return f(ip) }) } } var emptyTask = MakeSimpleCont(func() (common.MapStr, error) { return nil, nil }) +// MakePingAllIPFactory wraps a function for building a recursive Task Runner from function callbacks. func MakePingAllIPFactory( - fields common.MapStr, f func(*net.IPAddr) []func() (common.MapStr, error), ) func(*net.IPAddr) TaskRunner { - makeTask := func(f func() (common.MapStr, error)) TaskRunner { - if len(fields) > 0 { - return WithFields(fields, MakeSimpleCont(f)) - } - return MakeSimpleCont(f) - } - return func(ip *net.IPAddr) TaskRunner { cont := f(ip) switch len(cont) { case 0: return emptyTask case 1: - return makeTask(cont[0]) + return MakeSimpleCont(cont[0]) } tasks := make([]TaskRunner, len(cont)) for i, c := range cont { - tasks[i] = makeTask(c) + tasks[i] = MakeSimpleCont(c) } return MakeCont(func() (common.MapStr, []TaskRunner, error) { return nil, tasks, nil @@ -122,39 +150,38 @@ func MakePingAllIPFactory( } } +// MakePingAllIPPortFactory builds a set of TaskRunner supporting a set of +// IP/port-pairs. func MakePingAllIPPortFactory( - fields common.MapStr, ports []uint16, f func(*net.IPAddr, uint16) (common.MapStr, error), ) func(*net.IPAddr) TaskRunner { if len(ports) == 1 { port := ports[0] - fields := fields.Clone() - fields["port"] = strconv.Itoa(int(port)) - return MakePingIPFactory(fields, func(ip *net.IPAddr) (common.MapStr, error) { + return MakePingIPFactory(func(ip *net.IPAddr) (common.MapStr, error) { return f(ip, port) }) } - return MakePingAllIPFactory(fields, func(ip *net.IPAddr) []func() (common.MapStr, error) { + return MakePingAllIPFactory(func(ip *net.IPAddr) []func() (common.MapStr, error) { funcs := make([]func() (common.MapStr, error), len(ports)) for i := range ports { port := ports[i] funcs[i] = func() (common.MapStr, error) { - event, err := f(ip, port) - if event == nil { - event = common.MapStr{} - } - event["port"] = strconv.Itoa(int(port)) - return event, err + return f(ip, port) } } return funcs }) } +// MakeByIPJob builds a new Job based on already known IP. Similar to +// MakeByHostJob, the pingFactory will be used to build the tasks run by the job. +// +// A pingFactory instance is normally build with MakePingIPFactory, +// MakePingAllIPFactory or MakePingAllIPPortFactory. func MakeByIPJob( - name, typ string, + settings JobSettings, ip net.IP, pingFactory func(ip *net.IPAddr) TaskRunner, ) (Job, error) { @@ -165,86 +192,144 @@ func MakeByIPJob( return nil, err } - fields := common.MapStr{"ip": addr.String()} - return MakeJob(name, typ, WithFields(fields, pingFactory(addr)).Run), nil + fields := common.MapStr{ + "monitor": common.MapStr{"ip": addr.String()}, + } + return MakeJob(settings, WithFields(fields, pingFactory(addr)).Run), nil } +// MakeByHostJob creates a new Job including host lookup. The pingFactory will be used to +// build one or multiple Tasks after name lookup according to settings. +// +// A pingFactory instance is normally build with MakePingIPFactory, +// MakePingAllIPFactory or MakePingAllIPPortFactory. func MakeByHostJob( - name, typ string, - host string, - settings IPSettings, + settings HostJobSettings, pingFactory func(ip *net.IPAddr) TaskRunner, ) (Job, error) { - network := settings.Network() + host := settings.Host + + if ip := net.ParseIP(host); ip != nil { + return MakeByIPJob(settings.jobSettings(), ip, pingFactory) + } + + network := settings.IP.Network() if network == "" { return nil, errors.New("pinging hosts requires ipv4 or ipv6 mode enabled") } - mode := settings.Mode + mode := settings.IP.Mode + + settings.AddFields(common.MapStr{ + "monitor": common.MapStr{ + "host": host, + }, + }) + if mode == PingAny { - return MakeJob(name, typ, func() (common.MapStr, []TaskRunner, error) { - event := common.MapStr{"host": host} + return makeByHostAnyIPJob(settings, host, pingFactory), nil + } + return makeByHostAllIPJob(settings, host, pingFactory), nil +} - dnsStart := time.Now() - ip, err := net.ResolveIPAddr(network, host) - if err != nil { - return event, nil, err - } +func makeByHostAnyIPJob( + settings HostJobSettings, + host string, + pingFactory func(ip *net.IPAddr) TaskRunner, +) Job { + network := settings.IP.Network() - dnsEnd := time.Now() - dnsRTT := dnsEnd.Sub(dnsStart) - event["resolve_rtt"] = look.RTT(dnsRTT) - event["ip"] = ip.String() + return MakeJob(settings.jobSettings(), func() (common.MapStr, []TaskRunner, error) { + resolveStart := time.Now() + ip, err := net.ResolveIPAddr(network, host) + if err != nil { + return resolveErr(host, err) + } - return WithFields(event, pingFactory(ip)).Run() - }), nil - } + resolveEnd := time.Now() + resolveRTT := resolveEnd.Sub(resolveStart) + + event := resolveIPEvent(host, ip.String(), resolveRTT) + return WithFields(event, pingFactory(ip)).Run() + }) +} +func makeByHostAllIPJob( + settings HostJobSettings, + host string, + pingFactory func(ip *net.IPAddr) TaskRunner, +) Job { + network := settings.IP.Network() filter := makeIPFilter(network) - return MakeJob(name, typ, func() (common.MapStr, []TaskRunner, error) { - event := common.MapStr{"host": host} + return MakeJob(settings.jobSettings(), func() (common.MapStr, []TaskRunner, error) { // TODO: check for better DNS IP lookup support: // - The net.LookupIP drops ipv6 zone index // - dnsStart := time.Now() + resolveStart := time.Now() ips, err := net.LookupIP(host) if err != nil { - return event, nil, err + return resolveErr(host, err) } - dnsEnd := time.Now() - dnsRTT := dnsEnd.Sub(dnsStart) + resolveEnd := time.Now() + resolveRTT := resolveEnd.Sub(resolveStart) - event["resolve_rtt"] = look.RTT(dnsRTT) if filter != nil { ips = filterIPs(ips, filter) } if len(ips) == 0 { err := fmt.Errorf("no %v address resolvable for host %v", network, host) - return event, nil, err + return resolveErr(host, err) } // create ip ping tasks cont := make([]TaskRunner, len(ips)) for i, ip := range ips { addr := &net.IPAddr{IP: ip} - fields := event.Clone() - fields["ip"] = ip.String() - cont[i] = WithFields(fields, pingFactory(addr)) + event := resolveIPEvent(host, ip.String(), resolveRTT) + cont[i] = WithFields(event, pingFactory(addr)) } return nil, cont, nil - }), nil + }) +} + +func resolveIPEvent(host, ip string, rtt time.Duration) common.MapStr { + return common.MapStr{ + "monitor": common.MapStr{ + "host": host, + "ip": ip, + }, + "resolve": common.MapStr{ + "host": host, + "ip": ip, + "rtt": look.RTT(rtt), + }, + } } +func resolveErr(host string, err error) (common.MapStr, []TaskRunner, error) { + event := common.MapStr{ + "monitor": common.MapStr{ + "host": host, + }, + "resolve": common.MapStr{ + "host": host, + }, + } + return event, nil, err +} + +// WithFields wraps a TaskRunner, updating all events returned with the set of +// fields configured. func WithFields(fields common.MapStr, r TaskRunner) TaskRunner { return MakeCont(func() (common.MapStr, []TaskRunner, error) { event, cont, err := r.Run() if event == nil { event = common.MapStr{} } - event.Update(fields) + event.DeepUpdate(fields) for i := range cont { cont[i] = WithFields(fields, cont[i]) @@ -253,9 +338,11 @@ func WithFields(fields common.MapStr, r TaskRunner) TaskRunner { }) } -func WithDuration(name string, r TaskRunner) TaskRunner { +// WithDuration wraps a TaskRunner, measuring the duration between creation and +// finish of the actual task and sub-tasks. +func WithDuration(field string, r TaskRunner) TaskRunner { return MakeCont(func() (common.MapStr, []TaskRunner, error) { - return withStart(name, time.Now(), r).Run() + return withStart(field, time.Now(), r).Run() }) } @@ -263,7 +350,7 @@ func withStart(field string, start time.Time, r TaskRunner) TaskRunner { return MakeCont(func() (common.MapStr, []TaskRunner, error) { event, cont, err := r.Run() if event != nil { - event[field] = look.RTT(time.Now().Sub(start)) + event.Put(field, look.RTT(time.Since(start))) } for i := range cont { @@ -273,14 +360,16 @@ func withStart(field string, start time.Time, r TaskRunner) TaskRunner { }) } -func (f *funcJob) Name() string { return f.name } +func (f *funcJob) Name() string { return f.settings.Name } func (f funcTask) Run() (common.MapStr, []TaskRunner, error) { return f.run() } -func (f funcTask) annotated(start time.Time, typ string) TaskRunner { - return annotated(start, typ, f.run) +func (f funcTask) annotated(settings JobSettings, start time.Time) TaskRunner { + return annotated(settings, start, f.run) } +// Unpack sets PingMode from a constant string. Unpack will be called by common.Unpack when +// unpacking into an IPSettings type. func (p *PingMode) Unpack(s string) error { switch s { case "all": @@ -293,7 +382,11 @@ func (p *PingMode) Unpack(s string) error { return nil } -func annotated(start time.Time, typ string, fn func() (common.MapStr, []TaskRunner, error)) TaskRunner { +func annotated( + settings JobSettings, + start time.Time, + fn func() (common.MapStr, []TaskRunner, error), +) TaskRunner { return MakeCont(func() (common.MapStr, []TaskRunner, error) { event, cont, err := fn() if err != nil { @@ -304,19 +397,24 @@ func annotated(start time.Time, typ string, fn func() (common.MapStr, []TaskRunn } if event != nil { - event.Update(common.MapStr{ + status := look.Status(err) + event.DeepUpdate(common.MapStr{ "@timestamp": look.Timestamp(start), - "duration": look.RTT(time.Now().Sub(start)), - "type": typ, - "up": err == nil, + "monitor": common.MapStr{ + "duration": look.RTT(time.Since(start)), + "status": status, + }, }) + if fields := settings.Fields; fields != nil { + event.DeepUpdate(fields) + } } for i := range cont { if fcont, ok := cont[i].(funcTask); ok { - cont[i] = fcont.annotated(start, typ) + cont[i] = fcont.annotated(settings, start) } else { - cont[i] = annotated(start, typ, cont[i].Run) + cont[i] = annotated(settings, start, cont[i].Run) } } return event, cont, nil @@ -342,3 +440,55 @@ func filterIPs(ips []net.IP, filt func(net.IP) bool) []net.IP { } return out } + +// MakeJobSetting creates a new JobSettings structure without any global event fields. +func MakeJobSetting(name string) JobSettings { + return JobSettings{Name: name} +} + +// WithFields adds new event fields to a Job. Existing fields will be +// overwritten. +// The fields map will be updated (no copy). +func (s JobSettings) WithFields(m common.MapStr) JobSettings { + s.AddFields(m) + return s +} + +// AddFields adds new event fields to a Job. Existing fields will be +// overwritten. +func (s *JobSettings) AddFields(m common.MapStr) { addFields(&s.Fields, m) } + +// MakeHostJobSettings creates a new HostJobSettings structure without any global +// event fields. +func MakeHostJobSettings(name, host string, ip IPSettings) HostJobSettings { + return HostJobSettings{Name: name, Host: host, IP: ip} +} + +// WithFields adds new event fields to a Job. Existing fields will be +// overwritten. +// The fields map will be updated (no copy). +func (s HostJobSettings) WithFields(m common.MapStr) HostJobSettings { + s.AddFields(m) + return s +} + +// AddFields adds new event fields to a Job. Existing fields will be +// overwritten. +func (s *HostJobSettings) AddFields(m common.MapStr) { addFields(&s.Fields, m) } + +func addFields(to *common.MapStr, m common.MapStr) { + if m == nil { + return + } + + fields := *to + if fields == nil { + fields = common.MapStr{} + *to = fields + } + fields.DeepUpdate(m) +} + +func (s *HostJobSettings) jobSettings() JobSettings { + return JobSettings{Name: s.Name, Fields: s.Fields} +} diff --git a/libbeat/outputs/transport/proxy.go b/libbeat/outputs/transport/proxy.go index 4fe4a8847b9..6f7b1fdd95a 100644 --- a/libbeat/outputs/transport/proxy.go +++ b/libbeat/outputs/transport/proxy.go @@ -76,6 +76,6 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) { if err != nil { return nil, err } - return dialWith(dialer, network, host, addresses, port) + return DialWith(dialer, network, host, addresses, port) }), nil } diff --git a/libbeat/outputs/transport/tcp.go b/libbeat/outputs/transport/tcp.go index 857e2d00225..78b76634428 100644 --- a/libbeat/outputs/transport/tcp.go +++ b/libbeat/outputs/transport/tcp.go @@ -29,6 +29,6 @@ func NetDialer(timeout time.Duration) Dialer { // dial via host IP by randomized iteration of known IPs dialer := &net.Dialer{Timeout: timeout} - return dialWith(dialer, network, host, addresses, port) + return DialWith(dialer, network, host, addresses, port) }) } diff --git a/libbeat/outputs/transport/tls.go b/libbeat/outputs/transport/tls.go index fff921ccb74..ebb0f88985e 100644 --- a/libbeat/outputs/transport/tls.go +++ b/libbeat/outputs/transport/tls.go @@ -13,6 +13,7 @@ import ( ) type TLSConfig struct { + // List of allowed SSL/TLS protocol versions. Connections might be dropped // after handshake succeeded, if TLS version in use is not listed. Versions []TLSVersion diff --git a/libbeat/outputs/transport/util.go b/libbeat/outputs/transport/util.go index 59e746511b2..41e4be6d8ef 100644 --- a/libbeat/outputs/transport/util.go +++ b/libbeat/outputs/transport/util.go @@ -20,7 +20,10 @@ func fullAddress(host string, defaultPort int) string { return fmt.Sprintf("%v:%v", host, defaultPort) } -func dialWith( +// DialWith randomly dials one of a number of addresses with a given dialer. +// +// Use this to select and dial one IP being known for one host name. +func DialWith( dialer Dialer, network, host string, addresses []string,